From 08dc436e477e8c99b1bf590fb49d74a4acd75d18 Mon Sep 17 00:00:00 2001 From: Sergiy Matusevych Date: Tue, 13 Feb 2024 12:02:31 -0800 Subject: [PATCH] Return status and actual timestamp from all environments (#671) This is the first step to making trials asynchronous. In this PR we just make sure every Environment method that returns status also returns the timestamp of that status. In the next PR we will get these timestamps and status values from the remote environment. --------- Co-authored-by: Brian Kroth --- .../environments/base_environment.py | 22 +++++++------ .../mlos_bench/environments/composite_env.py | 33 ++++++++++--------- .../environments/local/local_env.py | 26 ++++++++------- .../environments/local/local_fileshare_env.py | 8 ++--- .../mlos_bench/environments/mock_env.py | 11 ++++--- .../environments/remote/remote_env.py | 27 ++++++++------- mlos_bench/mlos_bench/run.py | 12 +++---- mlos_bench/mlos_bench/storage/base_storage.py | 4 ++- mlos_bench/mlos_bench/storage/sql/trial.py | 11 ++++--- .../mlos_bench/tests/environments/__init__.py | 6 ++-- .../local/local_fileshare_env_test.py | 2 +- .../tests/environments/mock_env_test.py | 10 +++--- .../optimizers/toy_optimization_loop_test.py | 2 +- .../mlos_bench/tests/storage/sql/fixtures.py | 7 ++-- .../tests/storage/trial_telemetry_test.py | 9 ++--- 15 files changed, 103 insertions(+), 87 deletions(-) diff --git a/mlos_bench/mlos_bench/environments/base_environment.py b/mlos_bench/mlos_bench/environments/base_environment.py index b987def1aa6..38b7bd11423 100644 --- a/mlos_bench/mlos_bench/environments/base_environment.py +++ b/mlos_bench/mlos_bench/environments/base_environment.py @@ -378,7 +378,7 @@ def teardown(self) -> None: assert self._in_context self._is_ready = False - def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: + def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]: """ Execute the run script for this environment. @@ -387,30 +387,32 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: Returns ------- - (status, output) : (Status, dict) - A pair of (Status, output) values, where `output` is a dict + (status, timestamp, output) : (Status, datetime, dict) + 3-tuple of (Status, timestamp, output) values, where `output` is a dict with the results or None if the status is not COMPLETED. If run script is a benchmark, then the score is usually expected to be in the `score` field. """ # Make sure we create a context before invoking setup/run/status/teardown assert self._in_context - (status, _) = self.status() - return (status, None) + (status, timestamp, _) = self.status() + return (status, timestamp, None) - def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]: + def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: """ Check the status of the benchmark environment. Returns ------- - (benchmark_status, telemetry) : (Status, list) - A pair of (benchmark status, telemetry) values. + (benchmark_status, timestamp, telemetry) : (Status, datetime, list) + 3-tuple of (benchmark status, timestamp, telemetry) values. + `timestamp` is UTC time stamp of the status; it's current time by default. `telemetry` is a list (maybe empty) of (timestamp, metric, value) triplets. """ # Make sure we create a context before invoking setup/run/status/teardown assert self._in_context + timestamp = datetime.utcnow() if self._is_ready: - return (Status.READY, []) + return (Status.READY, timestamp, []) _LOG.warning("Environment not ready: %s", self) - return (Status.PENDING, []) + return (Status.PENDING, timestamp, []) diff --git a/mlos_bench/mlos_bench/environments/composite_env.py b/mlos_bench/mlos_bench/environments/composite_env.py index 720d1c69ac8..06b4f431bee 100644 --- a/mlos_bench/mlos_bench/environments/composite_env.py +++ b/mlos_bench/mlos_bench/environments/composite_env.py @@ -179,7 +179,7 @@ def teardown(self) -> None: env_context.teardown() super().teardown() - def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: + def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]: """ Submit a new experiment to the environment. Return the result of the *last* child environment if successful, @@ -187,48 +187,50 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: Returns ------- - (status, output) : (Status, dict) - A pair of (Status, output) values, where `output` is a dict + (status, timestamp, output) : (Status, datetime, dict) + 3-tuple of (Status, timestamp, output) values, where `output` is a dict with the results or None if the status is not COMPLETED. If run script is a benchmark, then the score is usually expected to be in the `score` field. """ _LOG.info("Run: %s", self._children) - (status, metrics) = super().run() + (status, timestamp, metrics) = super().run() if not status.is_ready(): - return (status, metrics) + return (status, timestamp, metrics) joint_metrics = {} for env_context in self._child_contexts: _LOG.debug("Child env. run: %s", env_context) - (status, metrics) = env_context.run() + (status, timestamp, metrics) = env_context.run() _LOG.debug("Child env. run results: %s :: %s %s", env_context, status, metrics) if not status.is_good(): _LOG.info("Run failed: %s :: %s", self, status) - return (status, None) + return (status, timestamp, None) joint_metrics.update(metrics or {}) _LOG.info("Run completed: %s :: %s %s", self, status, joint_metrics) - return (status, joint_metrics) + # Return the status and the timestamp of the last child environment. + return (status, timestamp, joint_metrics) - def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]: + def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: """ Check the status of the benchmark environment. Returns ------- - (benchmark_status, telemetry) : (Status, list) - A pair of (benchmark status, telemetry) values. + (benchmark_status, timestamp, telemetry) : (Status, datetime, list) + 3-tuple of (benchmark status, timestamp, telemetry) values. + `timestamp` is UTC time stamp of the status; it's current time by default. `telemetry` is a list (maybe empty) of (timestamp, metric, value) triplets. """ - (status, telemetry) = super().status() + (status, timestamp, telemetry) = super().status() if not status.is_ready(): - return (status, telemetry) + return (status, timestamp, telemetry) joint_telemetry = [] final_status = None for env_context in self._child_contexts: - (status, telemetry) = env_context.status() + (status, timestamp, telemetry) = env_context.status() _LOG.debug("Child env. status: %s :: %s", env_context, status) joint_telemetry.extend(telemetry) if not status.is_good() and final_status is None: @@ -236,4 +238,5 @@ def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]: final_status = final_status or status _LOG.info("Final status: %s :: %s", self, final_status) - return (final_status, joint_telemetry) + # Return the status and the timestamp of the last child environment or the first failed child environment. + return (final_status, timestamp, joint_telemetry) diff --git a/mlos_bench/mlos_bench/environments/local/local_env.py b/mlos_bench/mlos_bench/environments/local/local_env.py index 7274da99ae6..41a003f0bf9 100644 --- a/mlos_bench/mlos_bench/environments/local/local_env.py +++ b/mlos_bench/mlos_bench/environments/local/local_env.py @@ -152,19 +152,19 @@ def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) - return self._is_ready - def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: + def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]: """ Run a script in the local scheduler environment. Returns ------- - (status, output) : (Status, dict) - A pair of (Status, output) values, where `output` is a dict + (status, timestamp, output) : (Status, datetime, dict) + 3-tuple of (Status, timestamp, output) values, where `output` is a dict with the results or None if the status is not COMPLETED. If run script is a benchmark, then the score is usually expected to be in the `score` field. """ - (status, _) = result = super().run() + (status, timestamp, _) = result = super().run() if not status.is_ready(): return result @@ -174,13 +174,13 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: if self._script_run: (return_code, output) = self._local_exec(self._script_run, self._temp_dir) if return_code != 0: - return (Status.FAILED, None) + return (Status.FAILED, timestamp, None) stdout_data = self._extract_stdout_results(output.get("stdout", "")) # FIXME: We should not be assuming that the only output file type is a CSV. if not self._read_results_file: _LOG.debug("Not reading the data at: %s", self) - return (Status.SUCCEEDED, stdout_data) + return (Status.SUCCEEDED, timestamp, stdout_data) data = self._normalize_columns(pandas.read_csv( self._config_loader_service.resolve_path( @@ -201,7 +201,7 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: stdout_data.update(data.iloc[-1].to_dict()) _LOG.info("Local run complete: %s ::\n%s", self, stdout_data) - return (Status.SUCCEEDED, stdout_data) + return (Status.SUCCEEDED, timestamp, stdout_data) @staticmethod def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame: @@ -215,17 +215,19 @@ def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame: data.rename(str.rstrip, axis='columns', inplace=True) return data - def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]: + def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: - (status, _) = super().status() + (status, timestamp, _) = super().status() if not (self._is_ready and self._read_telemetry_file): - return (status, []) + return (status, timestamp, []) assert self._temp_dir is not None try: fname = self._config_loader_service.resolve_path( self._read_telemetry_file, extra_paths=[self._temp_dir]) + # TODO: Use the timestamp of the CSV file as our status timestamp? + # FIXME: We should not be assuming that the only output file type is a CSV. data = self._normalize_columns( pandas.read_csv(fname, index_col=False, parse_dates=[0])) @@ -241,11 +243,11 @@ def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]: except FileNotFoundError as ex: _LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex) - return (status, []) + return (status, timestamp, []) _LOG.debug("Read telemetry data:\n%s", data) col_dtypes: Mapping[int, Type] = {0: datetime} - return (status, [ + return (status, timestamp, [ (pandas.Timestamp(ts).to_pydatetime(), metric, value) for (ts, metric, value) in data.to_records(index=False, column_dtypes=col_dtypes) ]) diff --git a/mlos_bench/mlos_bench/environments/local/local_fileshare_env.py b/mlos_bench/mlos_bench/environments/local/local_fileshare_env.py index 4bf7b418c2e..6aea7acfc44 100644 --- a/mlos_bench/mlos_bench/environments/local/local_fileshare_env.py +++ b/mlos_bench/mlos_bench/environments/local/local_fileshare_env.py @@ -151,15 +151,15 @@ def _download_files(self, ignore_missing: bool = False) -> None: _LOG.exception("Cannot download %s to %s", path_from, path_to) raise ex - def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: + def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]: """ Download benchmark results from the shared storage and run post-processing scripts locally. Returns ------- - (status, output) : (Status, dict) - A pair of (Status, output) values, where `output` is a dict + (status, timestamp, output) : (Status, datetime, dict) + 3-tuple of (Status, timestamp, output) values, where `output` is a dict with the results or None if the status is not COMPLETED. If run script is a benchmark, then the score is usually expected to be in the `score` field. @@ -167,6 +167,6 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: self._download_files() return super().run() - def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]: + def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: self._download_files(ignore_missing=True) return super().status() diff --git a/mlos_bench/mlos_bench/environments/mock_env.py b/mlos_bench/mlos_bench/environments/mock_env.py index c90f1f200a9..c30c1916264 100644 --- a/mlos_bench/mlos_bench/environments/mock_env.py +++ b/mlos_bench/mlos_bench/environments/mock_env.py @@ -8,6 +8,7 @@ import random import logging +from datetime import datetime from typing import Dict, Optional, Tuple import numpy @@ -61,20 +62,20 @@ def __init__(self, self._metrics = self.config.get("metrics", ["score"]) self._is_ready = True - def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: + def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]: """ Produce mock benchmark data for one experiment. Returns ------- - (status, output) : (Status, dict) - A pair of (Status, output) values, where `output` is a dict + (status, timestamp, output) : (Status, datetime, dict) + 3-tuple of (Status, timestamp, output) values, where `output` is a dict with the results or None if the status is not COMPLETED. The keys of the `output` dict are the names of the metrics specified in the config; by default it's just one metric named "score". All output metrics have the same value. """ - (status, _) = result = super().run() + (status, timestamp, _) = result = super().run() if not status.is_ready(): return result @@ -89,7 +90,7 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: if self._range: score = self._range[0] + score * (self._range[1] - self._range[0]) - return (Status.SUCCEEDED, {metric: score for metric in self._metrics}) + return (Status.SUCCEEDED, timestamp, {metric: score for metric in self._metrics}) @staticmethod def _normalized(tunable: Tunable) -> float: diff --git a/mlos_bench/mlos_bench/environments/remote/remote_env.py b/mlos_bench/mlos_bench/environments/remote/remote_env.py index 5840e03d13a..a3675d4e7a8 100644 --- a/mlos_bench/mlos_bench/environments/remote/remote_env.py +++ b/mlos_bench/mlos_bench/environments/remote/remote_env.py @@ -9,6 +9,7 @@ """ import logging +from datetime import datetime from typing import Dict, Iterable, Optional, Tuple from mlos_bench.environments.status import Status @@ -104,7 +105,7 @@ def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) - if self._script_setup: _LOG.info("Set up the remote environment: %s", self) - (status, _) = self._remote_exec(self._script_setup) + (status, _timestamp, _output) = self._remote_exec(self._script_setup) _LOG.info("Remote set up complete: %s :: %s", self, status) self._is_ready = status.is_succeeded() else: @@ -112,7 +113,7 @@ def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) - return self._is_ready - def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: + def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]: """ Runs the run script on the remote environment. @@ -122,22 +123,22 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]: Returns ------- - (status, output) : (Status, dict) - A pair of (Status, output) values, where `output` is a dict + (status, timestamp, output) : (Status, datetime, dict) + 3-tuple of (Status, timestamp, output) values, where `output` is a dict with the results or None if the status is not COMPLETED. If run script is a benchmark, then the score is usually expected to be in the `score` field. """ _LOG.info("Run script remotely on: %s", self) - (status, _) = result = super().run() + (status, timestamp, _) = result = super().run() if not (status.is_ready() and self._script_run): return result - (status, output) = self._remote_exec(self._script_run) + (status, timestamp, output) = self._remote_exec(self._script_run) if status.is_succeeded() and output is not None: output = self._extract_stdout_results(output.get("stdout", "")) _LOG.info("Remote run complete: %s :: %s = %s", self, status, output) - return (status, output) + return (status, timestamp, output) def teardown(self) -> None: """ @@ -145,11 +146,11 @@ def teardown(self) -> None: """ if self._script_teardown: _LOG.info("Remote teardown: %s", self) - (status, _) = self._remote_exec(self._script_teardown) + (status, _timestamp, _output) = self._remote_exec(self._script_teardown) _LOG.info("Remote teardown complete: %s :: %s", self, status) super().teardown() - def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, Optional[dict]]: + def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, datetime, Optional[dict]]: """ Run a script on the remote host. @@ -160,8 +161,8 @@ def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, Optional[dict]]: Returns ------- - result : (Status, dict) - A pair of Status and dict with the benchmark/script results. + result : (Status, datetime, dict) + 3-tuple of Status, timestamp, and dict with the benchmark/script results. Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT} """ env_params = self._get_env_params() @@ -172,4 +173,6 @@ def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, Optional[dict]]: if status in {Status.PENDING, Status.SUCCEEDED}: (status, output) = self._remote_exec_service.get_remote_exec_results(output) _LOG.debug("Status: %s :: %s", status, output) - return (status, output) + # FIXME: get the timestamp from the remote environment! + timestamp = datetime.utcnow() + return (status, timestamp, output) diff --git a/mlos_bench/mlos_bench/run.py b/mlos_bench/mlos_bench/run.py index a5142e980fa..dfaf0e5d217 100755 --- a/mlos_bench/mlos_bench/run.py +++ b/mlos_bench/mlos_bench/run.py @@ -178,18 +178,18 @@ def _run(env: Environment, opt: Optimizer, trial: Storage.Trial, global_config: opt.register(trial.tunables, Status.FAILED) return - (status, results) = env.run() # Block and wait for the final result. + (status, timestamp, results) = env.run() # Block and wait for the final result. _LOG.info("Results: %s :: %s\n%s", trial.tunables, status, results) # In async mode (TODO), poll the environment for status and telemetry # and update the storage with the intermediate results. - (_, telemetry) = env.status() - # Use the status from `.run()` as it is the final status of the experiment. + (_status, _timestamp, telemetry) = env.status() + + # Use the status and timestamp from `.run()` as it is the final status of the experiment. # TODO: Use the `.status()` output in async mode. - trial.update_telemetry(status, telemetry) + trial.update_telemetry(status, timestamp, telemetry) - # FIXME: Use the actual timestamp from the benchmark. - trial.update(status, datetime.utcnow(), results) + trial.update(status, timestamp, results) # Filter out non-numeric scores from the optimizer. scores = results if not isinstance(results, dict) \ else {k: float(v) for (k, v) in results.items() if isinstance(v, (int, float))} diff --git a/mlos_bench/mlos_bench/storage/base_storage.py b/mlos_bench/mlos_bench/storage/base_storage.py index 15e4fce6816..6bbf805ed67 100644 --- a/mlos_bench/mlos_bench/storage/base_storage.py +++ b/mlos_bench/mlos_bench/storage/base_storage.py @@ -395,7 +395,7 @@ def update(self, status: Status, timestamp: datetime, return {self._opt_target: metrics} if isinstance(metrics, (float, int)) else metrics @abstractmethod - def update_telemetry(self, status: Status, + def update_telemetry(self, status: Status, timestamp: datetime, metrics: List[Tuple[datetime, str, Any]]) -> None: """ Save the experiment's telemetry data and intermediate status. @@ -404,6 +404,8 @@ def update_telemetry(self, status: Status, ---------- status : Status Current status of the trial. + timestamp: datetime + Timestamp of the status (but not the metrics). metrics : List[Tuple[datetime, str, Any]] Telemetry data. """ diff --git a/mlos_bench/mlos_bench/storage/sql/trial.py b/mlos_bench/mlos_bench/storage/sql/trial.py index 3c261120a50..e933780160c 100644 --- a/mlos_bench/mlos_bench/storage/sql/trial.py +++ b/mlos_bench/mlos_bench/storage/sql/trial.py @@ -80,21 +80,22 @@ def update(self, status: Status, timestamp: datetime, raise return metrics - def update_telemetry(self, status: Status, metrics: List[Tuple[datetime, str, Any]]) -> None: - super().update_telemetry(status, metrics) + def update_telemetry(self, status: Status, timestamp: datetime, + metrics: List[Tuple[datetime, str, Any]]) -> None: + super().update_telemetry(status, timestamp, metrics) # NOTE: Not every SQLAlchemy dialect supports `Insert.on_conflict_do_nothing()` # and we need to keep `.update_telemetry()` idempotent; hence a loop instead of # a bulk upsert. # See Also: comments in - for (timestamp, key, val) in metrics: + for (metric_ts, key, val) in metrics: with self._engine.begin() as conn: try: conn.execute(self._schema.trial_telemetry.insert().values( exp_id=self._experiment_id, trial_id=self._trial_id, - ts=timestamp, + ts=metric_ts, metric_id=key, metric_value=None if val is None else str(val), )) except IntegrityError as ex: - _LOG.warning("Record already exists: %s :: %s", (timestamp, key, val), ex) + _LOG.warning("Record already exists: %s :: %s", (metric_ts, key, val), ex) diff --git a/mlos_bench/mlos_bench/tests/environments/__init__.py b/mlos_bench/mlos_bench/tests/environments/__init__.py index 5e71d49b762..e33188a9e36 100644 --- a/mlos_bench/mlos_bench/tests/environments/__init__.py +++ b/mlos_bench/mlos_bench/tests/environments/__init__.py @@ -42,11 +42,11 @@ def check_env_success(env: Environment, assert env_context.setup(tunable_groups, global_config) - (status, data) = env_context.run() + (status, _ts, data) = env_context.run() assert status.is_succeeded() assert data == pytest.approx(expected_results, nan_ok=True) - (status, telemetry) = env_context.status() + (status, _ts, telemetry) = env_context.status() assert status.is_good() assert telemetry == pytest.approx(expected_telemetry, nan_ok=True) @@ -69,7 +69,7 @@ def check_env_fail_telemetry(env: Environment, tunable_groups: TunableGroups) -> with env as env_context: assert env_context.setup(tunable_groups) - (status, _data) = env_context.run() + (status, _ts, _data) = env_context.run() assert status.is_succeeded() with pytest.raises(ValueError): diff --git a/mlos_bench/mlos_bench/tests/environments/local/local_fileshare_env_test.py b/mlos_bench/mlos_bench/tests/environments/local/local_fileshare_env_test.py index 4870d98ece0..bb455b8b766 100644 --- a/mlos_bench/mlos_bench/tests/environments/local/local_fileshare_env_test.py +++ b/mlos_bench/mlos_bench/tests/environments/local/local_fileshare_env_test.py @@ -81,7 +81,7 @@ def test_local_fileshare_env(tunable_groups: TunableGroups, """ with local_fileshare_env as env_context: assert env_context.setup(tunable_groups) - (status, _) = env_context.run() + (status, _ts, _output) = env_context.run() assert status.is_succeeded() assert mock_fileshare_service.get_upload() == [ ("grub.cfg", "EXP_ID/222/input/grub.cfg"), diff --git a/mlos_bench/mlos_bench/tests/environments/mock_env_test.py b/mlos_bench/mlos_bench/tests/environments/mock_env_test.py index 4551372f708..608edbf9efc 100644 --- a/mlos_bench/mlos_bench/tests/environments/mock_env_test.py +++ b/mlos_bench/mlos_bench/tests/environments/mock_env_test.py @@ -17,12 +17,12 @@ def test_mock_env_default(mock_env: MockEnv, tunable_groups: TunableGroups) -> N """ with mock_env as env_context: assert env_context.setup(tunable_groups) - (status, data) = env_context.run() + (status, _ts, data) = env_context.run() assert status.is_succeeded() assert data is not None assert data["score"] == pytest.approx(73.97, 0.01) # Second time, results should differ because of the noise. - (status, data) = env_context.run() + (status, _ts, data) = env_context.run() assert status.is_succeeded() assert data is not None assert data["score"] == pytest.approx(72.92, 0.01) @@ -36,7 +36,7 @@ def test_mock_env_no_noise(mock_env_no_noise: MockEnv, tunable_groups: TunableGr assert env_context.setup(tunable_groups) for _ in range(10): # Noise-free results should be the same every time. - (status, data) = env_context.run() + (status, _ts, data) = env_context.run() assert status.is_succeeded() assert data is not None assert data["score"] == pytest.approx(75.0, 0.01) @@ -62,7 +62,7 @@ def test_mock_env_assign(mock_env: MockEnv, tunable_groups: TunableGroups, with mock_env as env_context: tunable_groups.assign(tunable_values) assert env_context.setup(tunable_groups) - (status, data) = env_context.run() + (status, _ts, data) = env_context.run() assert status.is_succeeded() assert data is not None assert data["score"] == pytest.approx(expected_score, 0.01) @@ -91,7 +91,7 @@ def test_mock_env_no_noise_assign(mock_env_no_noise: MockEnv, assert env_context.setup(tunable_groups) for _ in range(10): # Noise-free environment should produce the same results every time. - (status, data) = env_context.run() + (status, _ts, data) = env_context.run() assert status.is_succeeded() assert data is not None assert data["score"] == pytest.approx(expected_score, 0.01) diff --git a/mlos_bench/mlos_bench/tests/optimizers/toy_optimization_loop_test.py b/mlos_bench/mlos_bench/tests/optimizers/toy_optimization_loop_test.py index 54e860a8553..3d2a000d27c 100644 --- a/mlos_bench/mlos_bench/tests/optimizers/toy_optimization_loop_test.py +++ b/mlos_bench/mlos_bench/tests/optimizers/toy_optimization_loop_test.py @@ -56,7 +56,7 @@ def _optimize(env: Environment, opt: Optimizer) -> Tuple[float, TunableGroups]: assert env_context.setup(tunables) - (status, output) = env_context.run() + (status, _ts, output) = env_context.run() assert status.is_succeeded() assert output is not None score = output['score'] diff --git a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py index 39984c864a5..78d3528c175 100644 --- a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py +++ b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py @@ -120,10 +120,11 @@ def _dummy_run_exp(exp: SqlStorage.Experiment, tunable_name: str) -> SqlStorage. assert trial.tunable_config_id == config_i + 1 tunable_value = float(tunables.get_tunable(tunable_name)[0].numerical_value) tunable_value_norm = base_score * (tunable_value - tunable_min) / tunable_range - trial.update_telemetry(status=Status.RUNNING, metrics=[ - (datetime.utcnow(), "some-metric", tunable_value_norm + random() / 100), + timestamp = datetime.utcnow() + trial.update_telemetry(status=Status.RUNNING, timestamp=timestamp, metrics=[ + (timestamp, "some-metric", tunable_value_norm + random() / 100), ]) - trial.update(Status.SUCCEEDED, datetime.utcnow(), metrics={ + trial.update(Status.SUCCEEDED, timestamp, metrics={ # Give some variance on the score. # And some influence from the tunable value. "score": tunable_value_norm + random() / 100 diff --git a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py index 9320a14da8b..41ad5c28ea0 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py @@ -57,7 +57,7 @@ def test_update_telemetry(storage: Storage, trial = exp_storage.new_trial(tunable_groups) assert exp_storage.load_telemetry(trial.trial_id) == [] - trial.update_telemetry(Status.RUNNING, telemetry_data) + trial.update_telemetry(Status.RUNNING, datetime.utcnow(), telemetry_data) assert exp_storage.load_telemetry(trial.trial_id) == _telemetry_str(telemetry_data) # Also check that the TrialData telemetry looks right. @@ -72,7 +72,8 @@ def test_update_telemetry_twice(exp_storage: Storage.Experiment, Make sure update_telemetry() call is idempotent. """ trial = exp_storage.new_trial(tunable_groups) - trial.update_telemetry(Status.RUNNING, telemetry_data) - trial.update_telemetry(Status.RUNNING, telemetry_data) - trial.update_telemetry(Status.RUNNING, telemetry_data) + timestamp = datetime.utcnow() + trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) + trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) + trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) assert exp_storage.load_telemetry(trial.trial_id) == _telemetry_str(telemetry_data)