Skip to content

Commit

Permalink
Renaming Orchestrator and Database to Feature Store (#597)
Browse files Browse the repository at this point in the history
 [ committed by @juliaputko ]
 [ reviewed by @amandarichardsonn ]
  • Loading branch information
juliaputko authored Jun 4, 2024
1 parent f21b6f8 commit 883824b
Show file tree
Hide file tree
Showing 122 changed files with 2,591 additions and 2,483 deletions.
170 changes: 85 additions & 85 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,34 @@
from __future__ import annotations

import asyncio
from collections import defaultdict
from dataclasses import dataclass
import json
import os
import pathlib
import shutil
import subprocess
import signal
import socket
import subprocess
import sys
import tempfile
import time
import typing as t
import uuid
import warnings
from collections import defaultdict
from dataclasses import dataclass
from subprocess import run
import time

import psutil
import pytest

import smartsim
from smartsim import Experiment
from smartsim._core.launcher.dragon.dragonConnector import DragonConnector
from smartsim._core.launcher.dragon.dragonLauncher import DragonLauncher
from smartsim._core.config import CONFIG
from smartsim._core.config.config import Config
from smartsim._core.launcher.dragon.dragonConnector import DragonConnector
from smartsim._core.launcher.dragon.dragonLauncher import DragonLauncher
from smartsim._core.utils.telemetry.telemetry import JobEntity
from smartsim.database import Orchestrator
from smartsim.database import FeatureStore
from smartsim.entity import Application
from smartsim.error import SSConfigError, SSInternalError
from smartsim.log import get_logger
Expand Down Expand Up @@ -468,13 +467,13 @@ def check_output_dir() -> None:


@pytest.fixture
def dbutils() -> t.Type[DBUtils]:
return DBUtils
def fsutils() -> t.Type[FSUtils]:
return FSUtils


class DBUtils:
class FSUtils:
@staticmethod
def get_db_configs() -> t.Dict[str, t.Any]:
def get_fs_configs() -> t.Dict[str, t.Any]:
config_settings = {
"enable_checkpoints": 1,
"set_max_memory": "3gb",
Expand All @@ -488,7 +487,7 @@ def get_db_configs() -> t.Dict[str, t.Any]:
return config_settings

@staticmethod
def get_smartsim_error_db_configs() -> t.Dict[str, t.Any]:
def get_smartsim_error_fs_configs() -> t.Dict[str, t.Any]:
bad_configs = {
"save": [
"-1", # frequency must be positive
Expand All @@ -515,7 +514,7 @@ def get_smartsim_error_db_configs() -> t.Dict[str, t.Any]:
return bad_configs

@staticmethod
def get_type_error_db_configs() -> t.Dict[t.Union[int, str], t.Any]:
def get_type_error_fs_configs() -> t.Dict[t.Union[int, str], t.Any]:
bad_configs: t.Dict[t.Union[int, str], t.Any] = {
"save": [2, True, ["2"]], # frequency must be specified as a string
"maxmemory": [99, True, ["99"]], # memory form must be a string
Expand All @@ -536,15 +535,15 @@ def get_type_error_db_configs() -> t.Dict[t.Union[int, str], t.Any]:

@staticmethod
def get_config_edit_method(
db: Orchestrator, config_setting: str
fs: FeatureStore, config_setting: str
) -> t.Optional[t.Callable[..., None]]:
"""Get a db configuration file edit method from a str"""
"""Get a fs configuration file edit method from a str"""
config_edit_methods: t.Dict[str, t.Callable[..., None]] = {
"enable_checkpoints": db.enable_checkpoints,
"set_max_memory": db.set_max_memory,
"set_eviction_strategy": db.set_eviction_strategy,
"set_max_clients": db.set_max_clients,
"set_max_message_size": db.set_max_message_size,
"enable_checkpoints": fs.enable_checkpoints,
"set_max_memory": fs.set_max_memory,
"set_eviction_strategy": fs.set_eviction_strategy,
"set_max_clients": fs.set_max_clients,
"set_max_message_size": fs.set_max_message_size,
}
return config_edit_methods.get(config_setting, None)

Expand Down Expand Up @@ -650,21 +649,21 @@ class ColoUtils:
@staticmethod
def setup_test_colo(
fileutils: t.Type[FileUtils],
db_type: str,
fs_type: str,
exp: Experiment,
application_file: str,
db_args: t.Dict[str, t.Any],
fs_args: t.Dict[str, t.Any],
colo_settings: t.Optional[RunSettings] = None,
colo_application_name: str = "colocated_application",
port: t.Optional[int] = None,
on_wlm: bool = False,
) -> Application:
"""Setup database needed for the colo pinning tests"""
"""Setup feature store needed for the colo pinning tests"""

# get test setup
sr_test_script = fileutils.get_test_conf_path(application_file)

# Create an app with a colo_db which uses 1 db_cpu
# Create an app with a colo_fs which uses 1 fs_cpu
if colo_settings is None:
colo_settings = exp.create_run_settings(
exe=sys.executable, exe_args=[sr_test_script]
Expand All @@ -675,28 +674,28 @@ def setup_test_colo(

colo_application = exp.create_application(colo_application_name, colo_settings)

if db_type in ["tcp", "deprecated"]:
db_args["port"] = port if port is not None else _find_free_port(test_ports)
db_args["ifname"] = "lo"
if db_type == "uds" and colo_application_name is not None:
if fs_type in ["tcp", "deprecated"]:
fs_args["port"] = port if port is not None else _find_free_port(test_ports)
fs_args["ifname"] = "lo"
if fs_type == "uds" and colo_application_name is not None:
tmp_dir = tempfile.gettempdir()
socket_suffix = str(uuid.uuid4())[:7]
socket_name = f"{colo_application_name}_{socket_suffix}.socket"
db_args["unix_socket"] = os.path.join(tmp_dir, socket_name)
fs_args["unix_socket"] = os.path.join(tmp_dir, socket_name)

colocate_fun: t.Dict[str, t.Callable[..., None]] = {
"tcp": colo_application.colocate_db_tcp,
"deprecated": colo_application.colocate_db,
"uds": colo_application.colocate_db_uds,
"tcp": colo_application.colocate_fs_tcp,
"deprecated": colo_application.colocate_fs,
"uds": colo_application.colocate_fs_uds,
}
with warnings.catch_warnings():
if db_type == "deprecated":
message = "`colocate_db` has been deprecated"
if fs_type == "deprecated":
message = "`colocate_fs` has been deprecated"
warnings.filterwarnings("ignore", message=message)
colocate_fun[db_type](**db_args)
# assert application will launch with colocated db
colocate_fun[fs_type](**fs_args)
# assert application will launch with colocated fs
assert colo_application.colocated
# Check to make sure that limit_db_cpus made it into the colo settings
# Check to make sure that limit_fs_cpus made it into the colo settings
return colo_application


Expand Down Expand Up @@ -747,7 +746,7 @@ def mock_sink() -> t.Type[MockSink]:

@pytest.fixture
def mock_con() -> t.Callable[[int, int], t.Iterable[t.Any]]:
"""Generates mock db connection telemetry"""
"""Generates mock fs connection telemetry"""

def _mock_con(min: int = 1, max: int = 254) -> t.Iterable[t.Any]:
for i in range(min, max):
Expand All @@ -761,7 +760,7 @@ def _mock_con(min: int = 1, max: int = 254) -> t.Iterable[t.Any]:

@pytest.fixture
def mock_mem() -> t.Callable[[int, int], t.Iterable[t.Any]]:
"""Generates mock db memory usage telemetry"""
"""Generates mock fs memory usage telemetry"""

def _mock_mem(min: int = 1, max: int = 1000) -> t.Iterable[t.Any]:
for i in range(min, max):
Expand Down Expand Up @@ -879,9 +878,9 @@ def details(self) -> t.List[t.Tuple[t.Tuple[t.Any, ...], t.Dict[str, t.Any]]]:
return self._details


## Reuse database across tests
## Reuse feature store across tests

database_registry: t.DefaultDict[str, t.Optional[Orchestrator]] = defaultdict(
feature_store_registry: t.DefaultDict[str, t.Optional[FeatureStore]] = defaultdict(
lambda: None
)

Expand All @@ -902,14 +901,14 @@ def wlm_experiment(test_dir: str, wlmutils: WLMUtils) -> smartsim.Experiment:
)


def _cleanup_db(name: str) -> None:
global database_registry
db = database_registry[name]
if db and db.is_active():
def _cleanup_fs(name: str) -> None:
global feature_store_registry
fs = feature_store_registry[name]
if fs and fs.is_active():
exp = Experiment("cleanup")
try:
db = exp.reconnect_orchestrator(db.checkpoint_file)
exp.stop(db)
fs = exp.reconnect_feature_store(fs.checkpoint_file)
exp.stop(fs)
except:
pass

Expand All @@ -925,15 +924,15 @@ class DBConfiguration:


@dataclass
class PrepareDatabaseOutput:
orchestrator: t.Optional[Orchestrator] # The actual orchestrator object
new_db: bool # True if a new database was created when calling prepare_db
class PrepareFeatureStoreOutput:
featurestore: t.Optional[FeatureStore] # The actual feature store object
new_fs: bool # True if a new feature store was created when calling prepare_fs


# Reuse databases
# Reuse feature stores
@pytest.fixture(scope="session")
def local_db() -> t.Generator[DBConfiguration, None, None]:
name = "local_db_fixture"
def local_fs() -> t.Generator[DBConfiguration, None, None]:
name = "local_fs_fixture"
config = DBConfiguration(
name,
"local",
Expand All @@ -943,14 +942,15 @@ def local_db() -> t.Generator[DBConfiguration, None, None]:
_find_free_port(tuple(reversed(test_ports))),
)
yield config
_cleanup_db(name)
_cleanup_fs(name)



@pytest.fixture(scope="session")
def single_db(wlmutils: WLMUtils) -> t.Generator[DBConfiguration, None, None]:
def single_fs(wlmutils: WLMUtils) -> t.Generator[DBConfiguration, None, None]:
hostlist = wlmutils.get_test_hostlist()
hostlist = hostlist[-1:] if hostlist is not None else None
name = "single_db_fixture"
name = "single_fx_fixture"
config = DBConfiguration(
name,
wlmutils.get_test_launcher(),
Expand All @@ -960,14 +960,14 @@ def single_db(wlmutils: WLMUtils) -> t.Generator[DBConfiguration, None, None]:
_find_free_port(tuple(reversed(test_ports))),
)
yield config
_cleanup_db(name)
_cleanup_fs(name)


@pytest.fixture(scope="session")
def clustered_db(wlmutils: WLMUtils) -> t.Generator[DBConfiguration, None, None]:
def clustered_fs(wlmutils: WLMUtils) -> t.Generator[DBConfiguration, None, None]:
hostlist = wlmutils.get_test_hostlist()
hostlist = hostlist[-4:-1] if hostlist is not None else None
name = "clustered_db_fixture"
name = "clustered_fs_fixture"
config = DBConfiguration(
name,
wlmutils.get_test_launcher(),
Expand All @@ -977,53 +977,53 @@ def clustered_db(wlmutils: WLMUtils) -> t.Generator[DBConfiguration, None, None]
_find_free_port(tuple(reversed(test_ports))),
)
yield config
_cleanup_db(name)
_cleanup_fs(name)


@pytest.fixture
def register_new_db() -> t.Callable[[DBConfiguration], Orchestrator]:
def _register_new_db(config: DBConfiguration) -> Orchestrator:
def register_new_fs() -> t.Callable[[DBConfiguration], FeatureStore]:
def _register_new_fs(config: DBConfiguration) -> FeatureStore:
exp_path = pathlib.Path(test_output_root, config.name)
exp_path.mkdir(exist_ok=True)
exp = Experiment(
config.name,
exp_path=str(exp_path),
launcher=config.launcher,
)
orc = exp.create_database(
feature_store = exp.create_feature_store(
port=config.port,
batch=False,
interface=config.interface,
hosts=config.hostlist,
db_nodes=config.num_nodes,
fs_nodes=config.num_nodes,
)
exp.generate(orc, overwrite=True)
exp.start(orc)
global database_registry
database_registry[config.name] = orc
return orc
exp.generate(feature_store, overwrite=True)
exp.start(feature_store)
global feature_store_registry
feature_store_registry[config.name] = feature_store
return feature_store

return _register_new_db
return _register_new_fs


@pytest.fixture(scope="function")
def prepare_db(
register_new_db: t.Callable[[DBConfiguration], Orchestrator]
) -> t.Callable[[DBConfiguration], PrepareDatabaseOutput]:
def _prepare_db(db_config: DBConfiguration) -> PrepareDatabaseOutput:
global database_registry
db = database_registry[db_config.name]
def prepare_fs(
register_new_fs: t.Callable[[DBConfiguration], FeatureStore]
) -> t.Callable[[DBConfiguration], PrepareFeatureStoreOutput]:
def _prepare_fs(fs_config: DBConfiguration) -> PrepareFeatureStoreOutput:
global feature_store_registry
fs = feature_store_registry[fs_config.name]

new_db = False
db_up = False
new_fs = False
fs_up = False

if db:
db_up = db.is_active()
if fs:
fs_up = fs.is_active()

if not db_up or db is None:
db = register_new_db(db_config)
new_db = True
if not fs_up or fs is None:
fs = register_new_fs(fs_config)
new_fs = True

return PrepareDatabaseOutput(db, new_db)
return PrepareFeatureStoreOutput(fs, new_fs)

return _prepare_db
return _prepare_fs
8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ def finalize_options(self):

class SmartSimBuild(build_py):
def run(self):
database_builder = builder.DatabaseBuilder(
feature_store_builder = builder.FeatureStoreBuilder(
build_env(), build_env.MALLOC, build_env.JOBS
)
if not database_builder.is_built:
database_builder.build_from_git(versions.REDIS_URL, versions.REDIS)
if not feature_store_builder.is_built:
feature_store_builder.build_from_git(versions.REDIS_URL, versions.REDIS)

database_builder.cleanup()
feature_store_builder.cleanup()

# run original build_py command
super().run()
Expand Down
Loading

0 comments on commit 883824b

Please sign in to comment.