Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renaming Orchestrator and Database to Feature Store #597

Merged
merged 19 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 81 additions & 81 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from smartsim._core.config import CONFIG
from smartsim._core.config.config import Config
from smartsim._core.utils.telemetry.telemetry import JobEntity
from smartsim.database import Orchestrator
from smartsim.database import FeatureStore
from smartsim.entity import Model
from smartsim.error import SSConfigError, SSInternalError
from smartsim.log import get_logger
Expand Down Expand Up @@ -467,13 +467,13 @@ def check_output_dir() -> None:


@pytest.fixture
def dbutils() -> t.Type[DBUtils]:
return DBUtils
def fsutils() -> t.Type[FSUtils]:
return FSUtils


class DBUtils:
class FSUtils:
@staticmethod
def get_db_configs() -> t.Dict[str, t.Any]:
def get_fs_configs() -> t.Dict[str, t.Any]:
config_settings = {
"enable_checkpoints": 1,
"set_max_memory": "3gb",
Expand All @@ -487,7 +487,7 @@ def get_db_configs() -> t.Dict[str, t.Any]:
return config_settings

@staticmethod
def get_smartsim_error_db_configs() -> t.Dict[str, t.Any]:
def get_smartsim_error_fs_configs() -> t.Dict[str, t.Any]:
bad_configs = {
"save": [
"-1", # frequency must be positive
Expand All @@ -514,7 +514,7 @@ def get_smartsim_error_db_configs() -> t.Dict[str, t.Any]:
return bad_configs

@staticmethod
def get_type_error_db_configs() -> t.Dict[t.Union[int, str], t.Any]:
def get_type_error_fs_configs() -> t.Dict[t.Union[int, str], t.Any]:
bad_configs: t.Dict[t.Union[int, str], t.Any] = {
"save": [2, True, ["2"]], # frequency must be specified as a string
"maxmemory": [99, True, ["99"]], # memory form must be a string
Expand All @@ -535,15 +535,15 @@ def get_type_error_db_configs() -> t.Dict[t.Union[int, str], t.Any]:

@staticmethod
def get_config_edit_method(
db: Orchestrator, config_setting: str
fs: FeatureStore, config_setting: str
) -> t.Optional[t.Callable[..., None]]:
"""Get a db configuration file edit method from a str"""
"""Get a fs configuration file edit method from a str"""
config_edit_methods: t.Dict[str, t.Callable[..., None]] = {
"enable_checkpoints": db.enable_checkpoints,
"set_max_memory": db.set_max_memory,
"set_eviction_strategy": db.set_eviction_strategy,
"set_max_clients": db.set_max_clients,
"set_max_message_size": db.set_max_message_size,
"enable_checkpoints": fs.enable_checkpoints,
"set_max_memory": fs.set_max_memory,
"set_eviction_strategy": fs.set_eviction_strategy,
"set_max_clients": fs.set_max_clients,
"set_max_message_size": fs.set_max_message_size,
}
return config_edit_methods.get(config_setting, None)

Expand Down Expand Up @@ -649,21 +649,21 @@ class ColoUtils:
@staticmethod
def setup_test_colo(
fileutils: t.Type[FileUtils],
db_type: str,
fs_type: str,
exp: Experiment,
application_file: str,
db_args: t.Dict[str, t.Any],
fs_args: t.Dict[str, t.Any],
colo_settings: t.Optional[RunSettings] = None,
colo_model_name: str = "colocated_model",
port: t.Optional[int] = None,
on_wlm: bool = False,
) -> Model:
"""Setup database needed for the colo pinning tests"""
"""Setup feature store needed for the colo pinning tests"""

# get test setup
sr_test_script = fileutils.get_test_conf_path(application_file)

# Create an app with a colo_db which uses 1 db_cpu
# Create an app with a colo_fs which uses 1 fs_cpu
if colo_settings is None:
colo_settings = exp.create_run_settings(
exe=sys.executable, exe_args=[sr_test_script]
Expand All @@ -674,28 +674,28 @@ def setup_test_colo(

colo_model = exp.create_model(colo_model_name, colo_settings)

if db_type in ["tcp", "deprecated"]:
db_args["port"] = port if port is not None else _find_free_port(test_ports)
db_args["ifname"] = "lo"
if db_type == "uds" and colo_model_name is not None:
if fs_type in ["tcp", "deprecated"]:
fs_args["port"] = port if port is not None else _find_free_port(test_ports)
fs_args["ifname"] = "lo"
if fs_type == "uds" and colo_model_name is not None:
tmp_dir = tempfile.gettempdir()
socket_suffix = str(uuid.uuid4())[:7]
socket_name = f"{colo_model_name}_{socket_suffix}.socket"
db_args["unix_socket"] = os.path.join(tmp_dir, socket_name)
fs_args["unix_socket"] = os.path.join(tmp_dir, socket_name)

colocate_fun: t.Dict[str, t.Callable[..., None]] = {
"tcp": colo_model.colocate_db_tcp,
"deprecated": colo_model.colocate_db,
"uds": colo_model.colocate_db_uds,
"tcp": colo_model.colocate_fs_tcp,
"deprecated": colo_model.colocate_fs,
"uds": colo_model.colocate_fs_uds,
}
with warnings.catch_warnings():
if db_type == "deprecated":
message = "`colocate_db` has been deprecated"
if fs_type == "deprecated":
message = "`colocate_fs` has been deprecated"
warnings.filterwarnings("ignore", message=message)
colocate_fun[db_type](**db_args)
# assert model will launch with colocated db
colocate_fun[fs_type](**fs_args)
# assert model will launch with colocated fs
assert colo_model.colocated
# Check to make sure that limit_db_cpus made it into the colo settings
# Check to make sure that limit_fs_cpus made it into the colo settings
return colo_model


Expand Down Expand Up @@ -744,7 +744,7 @@ def mock_sink() -> t.Type[MockSink]:

@pytest.fixture
def mock_con() -> t.Callable[[int, int], t.Iterable[t.Any]]:
"""Generates mock db connection telemetry"""
"""Generates mock fs connection telemetry"""

def _mock_con(min: int = 1, max: int = 254) -> t.Iterable[t.Any]:
for i in range(min, max):
Expand All @@ -758,7 +758,7 @@ def _mock_con(min: int = 1, max: int = 254) -> t.Iterable[t.Any]:

@pytest.fixture
def mock_mem() -> t.Callable[[int, int], t.Iterable[t.Any]]:
"""Generates mock db memory usage telemetry"""
"""Generates mock fs memory usage telemetry"""

def _mock_mem(min: int = 1, max: int = 1000) -> t.Iterable[t.Any]:
for i in range(min, max):
Expand Down Expand Up @@ -875,9 +875,9 @@ def num_calls(self) -> int:
def details(self) -> t.List[t.Tuple[t.Tuple[t.Any, ...], t.Dict[str, t.Any]]]:
return self._details

## Reuse database across tests
## Reuse feature store across tests

database_registry: t.DefaultDict[str, t.Optional[Orchestrator]] = defaultdict(lambda: None)
feature_store_registry: t.DefaultDict[str, t.Optional[FeatureStore]] = defaultdict(lambda: None)

@pytest.fixture(scope="function")
def local_experiment(test_dir: str) -> smartsim.Experiment:
Expand All @@ -895,14 +895,14 @@ def wlm_experiment(test_dir: str, wlmutils: WLMUtils) -> smartsim.Experiment:
launcher=wlmutils.get_test_launcher()
)

def _cleanup_db(name: str) -> None:
global database_registry
db = database_registry[name]
if db and db.is_active():
def _cleanup_fs(name: str) -> None:
global feature_store_registry
fs = feature_store_registry[name]
if fs and fs.is_active():
exp = Experiment("cleanup")
try:
db = exp.reconnect_orchestrator(db.checkpoint_file)
exp.stop(db)
fs = exp.reconnect_feature_store(fs.checkpoint_file)
exp.stop(fs)
except:
pass

Expand All @@ -916,14 +916,14 @@ class DBConfiguration:
port: int

@dataclass
class PrepareDatabaseOutput:
orchestrator: t.Optional[Orchestrator] # The actual orchestrator object
new_db: bool # True if a new database was created when calling prepare_db
class PrepareFeatureStoreOutput:
featurestore: t.Optional[FeatureStore] # The actual feature store object
new_fs: bool # True if a new feature store was created when calling prepare_fs

# Reuse databases
# Reuse feature stores
@pytest.fixture(scope="session")
def local_db() -> t.Generator[DBConfiguration, None, None]:
name = "local_db_fixture"
def local_fs() -> t.Generator[DBConfiguration, None, None]:
name = "local_fs_fixture"
config = DBConfiguration(
name,
"local",
Expand All @@ -933,13 +933,13 @@ def local_db() -> t.Generator[DBConfiguration, None, None]:
_find_free_port(tuple(reversed(test_ports))),
)
yield config
_cleanup_db(name)
_cleanup_fs(name)

@pytest.fixture(scope="session")
def single_db(wlmutils: WLMUtils) -> t.Generator[DBConfiguration, None, None]:
def single_fs(wlmutils: WLMUtils) -> t.Generator[DBConfiguration, None, None]:
hostlist = wlmutils.get_test_hostlist()
hostlist = hostlist[-1:] if hostlist is not None else None
name = "single_db_fixture"
name = "single_fx_fixture"
config = DBConfiguration(
name,
wlmutils.get_test_launcher(),
Expand All @@ -949,14 +949,14 @@ def single_db(wlmutils: WLMUtils) -> t.Generator[DBConfiguration, None, None]:
_find_free_port(tuple(reversed(test_ports)))
)
yield config
_cleanup_db(name)
_cleanup_fs(name)


@pytest.fixture(scope="session")
def clustered_db(wlmutils: WLMUtils) -> t.Generator[DBConfiguration, None, None]:
def clustered_fs(wlmutils: WLMUtils) -> t.Generator[DBConfiguration, None, None]:
hostlist = wlmutils.get_test_hostlist()
hostlist = hostlist[-4:-1] if hostlist is not None else None
name = "clustered_db_fixture"
name = "clustered_fs_fixture"
config = DBConfiguration(
name,
wlmutils.get_test_launcher(),
Expand All @@ -966,59 +966,59 @@ def clustered_db(wlmutils: WLMUtils) -> t.Generator[DBConfiguration, None, None]
_find_free_port(tuple(reversed(test_ports))),
)
yield config
_cleanup_db(name)
_cleanup_fs(name)


@pytest.fixture
def register_new_db() -> t.Callable[[DBConfiguration], Orchestrator]:
def _register_new_db(
def register_new_fs() -> t.Callable[[DBConfiguration], FeatureStore]:
def _register_new_fs(
config: DBConfiguration
) -> Orchestrator:
) -> FeatureStore:
exp_path = pathlib.Path(test_output_root, config.name)
exp_path.mkdir(exist_ok=True)
exp = Experiment(
config.name,
exp_path=str(exp_path),
launcher=config.launcher,
)
orc = exp.create_database(
feature_store = exp.create_feature_store(
port=config.port,
batch=False,
interface=config.interface,
hosts=config.hostlist,
db_nodes=config.num_nodes
fs_nodes=config.num_nodes
)
exp.generate(orc, overwrite=True)
exp.start(orc)
global database_registry
database_registry[config.name] = orc
return orc
return _register_new_db
exp.generate(feature_store, overwrite=True)
exp.start(feature_store)
global feature_store_registry
feature_store_registry[config.name] = feature_store
return feature_store
return _register_new_fs


@pytest.fixture(scope="function")
def prepare_db(
register_new_db: t.Callable[
def prepare_fs(
register_new_fs: t.Callable[
[DBConfiguration],
Orchestrator
FeatureStore
]
) -> t.Callable[
[DBConfiguration],
PrepareDatabaseOutput
PrepareFeatureStoreOutput
]:
def _prepare_db(db_config: DBConfiguration) -> PrepareDatabaseOutput:
global database_registry
db = database_registry[db_config.name]
def _prepare_fs(fs_config: DBConfiguration) -> PrepareFeatureStoreOutput:
global feature_store_registry
fs = feature_store_registry[fs_config.name]

new_db = False
db_up = False
new_fs = False
fs_up = False

if db:
db_up = db.is_active()
if fs:
fs_up = fs.is_active()

if not db_up or db is None:
db = register_new_db(db_config)
new_db = True
if not fs_up or fs is None:
fs = register_new_fs(fs_config)
new_fs = True

return PrepareDatabaseOutput(db, new_db)
return _prepare_db
return PrepareFeatureStoreOutput(fs, new_fs)
return _prepare_fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

# Create NumPy array
array = np.array([1, 2, 3, 4])
# Use SmartRedis Client to place tensor in standalone Orchestrator
# Use SmartRedis Client to place tensor in standalone Feature Store
client.put_tensor("tensor", array)
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
# Initialize the Experiment
exp = Experiment("getting-started", launcher="auto")

# Initialize a standalone Orchestrator
standalone_orch = exp.create_database(db_nodes=1)
# Initialize a standalone Feature Store
standalone_feature_store = exp.create_feature_store(fs_nodes=1)

# Initialize a RunSettings object for Ensemble
ensemble_settings = exp.create_run_settings(exe="/path/to/executable_producer_simulation")
Expand All @@ -23,10 +23,10 @@
consumer_model = exp.create_model("consumer", model_settings)

# Generate SmartSim entity folder tree
exp.generate(standalone_orch, producer_ensemble, consumer_model, overwrite=True)
exp.generate(standalone_feature_store, producer_ensemble, consumer_model, overwrite=True)

# Launch Orchestrator
exp.start(standalone_orch, summary=True)
# Launch Feature Store
exp.start(standalone_feature_store, summary=True)

# Launch Ensemble
exp.start(producer_ensemble, block=True, summary=True)
Expand All @@ -38,5 +38,5 @@
# Launch consumer Model
exp.start(consumer_model, block=True, summary=True)

# Clobber Orchestrator
exp.stop(standalone_orch)
# Clobber Feature Store
exp.stop(standalone_feature_store)
Loading
Loading