Skip to content

Commit

Permalink
Return status and actual timestamp from all environments (#671)
Browse files Browse the repository at this point in the history
This is the first step to making trials asynchronous. In this PR we just
make sure every Environment method that returns status also returns the
timestamp of that status. In the next PR we will get these timestamps
and status values from the remote environment.

---------

Co-authored-by: Brian Kroth <[email protected]>
  • Loading branch information
motus and bpkroth authored Feb 13, 2024
1 parent 5cb5b05 commit 08dc436
Show file tree
Hide file tree
Showing 15 changed files with 103 additions and 87 deletions.
22 changes: 12 additions & 10 deletions mlos_bench/mlos_bench/environments/base_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def teardown(self) -> None:
assert self._in_context
self._is_ready = False

def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
"""
Execute the run script for this environment.
Expand All @@ -387,30 +387,32 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
Returns
-------
(status, output) : (Status, dict)
A pair of (Status, output) values, where `output` is a dict
(status, timestamp, output) : (Status, datetime, dict)
3-tuple of (Status, timestamp, output) values, where `output` is a dict
with the results or None if the status is not COMPLETED.
If run script is a benchmark, then the score is usually expected to
be in the `score` field.
"""
# Make sure we create a context before invoking setup/run/status/teardown
assert self._in_context
(status, _) = self.status()
return (status, None)
(status, timestamp, _) = self.status()
return (status, timestamp, None)

def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]:
def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:
"""
Check the status of the benchmark environment.
Returns
-------
(benchmark_status, telemetry) : (Status, list)
A pair of (benchmark status, telemetry) values.
(benchmark_status, timestamp, telemetry) : (Status, datetime, list)
3-tuple of (benchmark status, timestamp, telemetry) values.
`timestamp` is UTC time stamp of the status; it's current time by default.
`telemetry` is a list (maybe empty) of (timestamp, metric, value) triplets.
"""
# Make sure we create a context before invoking setup/run/status/teardown
assert self._in_context
timestamp = datetime.utcnow()
if self._is_ready:
return (Status.READY, [])
return (Status.READY, timestamp, [])
_LOG.warning("Environment not ready: %s", self)
return (Status.PENDING, [])
return (Status.PENDING, timestamp, [])
33 changes: 18 additions & 15 deletions mlos_bench/mlos_bench/environments/composite_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,61 +179,64 @@ def teardown(self) -> None:
env_context.teardown()
super().teardown()

def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
"""
Submit a new experiment to the environment.
Return the result of the *last* child environment if successful,
or the status of the last failed environment otherwise.
Returns
-------
(status, output) : (Status, dict)
A pair of (Status, output) values, where `output` is a dict
(status, timestamp, output) : (Status, datetime, dict)
3-tuple of (Status, timestamp, output) values, where `output` is a dict
with the results or None if the status is not COMPLETED.
If run script is a benchmark, then the score is usually expected to
be in the `score` field.
"""
_LOG.info("Run: %s", self._children)
(status, metrics) = super().run()
(status, timestamp, metrics) = super().run()
if not status.is_ready():
return (status, metrics)
return (status, timestamp, metrics)

joint_metrics = {}
for env_context in self._child_contexts:
_LOG.debug("Child env. run: %s", env_context)
(status, metrics) = env_context.run()
(status, timestamp, metrics) = env_context.run()
_LOG.debug("Child env. run results: %s :: %s %s", env_context, status, metrics)
if not status.is_good():
_LOG.info("Run failed: %s :: %s", self, status)
return (status, None)
return (status, timestamp, None)
joint_metrics.update(metrics or {})

_LOG.info("Run completed: %s :: %s %s", self, status, joint_metrics)
return (status, joint_metrics)
# Return the status and the timestamp of the last child environment.
return (status, timestamp, joint_metrics)

def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]:
def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:
"""
Check the status of the benchmark environment.
Returns
-------
(benchmark_status, telemetry) : (Status, list)
A pair of (benchmark status, telemetry) values.
(benchmark_status, timestamp, telemetry) : (Status, datetime, list)
3-tuple of (benchmark status, timestamp, telemetry) values.
`timestamp` is UTC time stamp of the status; it's current time by default.
`telemetry` is a list (maybe empty) of (timestamp, metric, value) triplets.
"""
(status, telemetry) = super().status()
(status, timestamp, telemetry) = super().status()
if not status.is_ready():
return (status, telemetry)
return (status, timestamp, telemetry)

joint_telemetry = []
final_status = None
for env_context in self._child_contexts:
(status, telemetry) = env_context.status()
(status, timestamp, telemetry) = env_context.status()
_LOG.debug("Child env. status: %s :: %s", env_context, status)
joint_telemetry.extend(telemetry)
if not status.is_good() and final_status is None:
final_status = status

final_status = final_status or status
_LOG.info("Final status: %s :: %s", self, final_status)
return (final_status, joint_telemetry)
# Return the status and the timestamp of the last child environment or the first failed child environment.
return (final_status, timestamp, joint_telemetry)
26 changes: 14 additions & 12 deletions mlos_bench/mlos_bench/environments/local/local_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,19 @@ def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) -

return self._is_ready

def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
"""
Run a script in the local scheduler environment.
Returns
-------
(status, output) : (Status, dict)
A pair of (Status, output) values, where `output` is a dict
(status, timestamp, output) : (Status, datetime, dict)
3-tuple of (Status, timestamp, output) values, where `output` is a dict
with the results or None if the status is not COMPLETED.
If run script is a benchmark, then the score is usually expected to
be in the `score` field.
"""
(status, _) = result = super().run()
(status, timestamp, _) = result = super().run()
if not status.is_ready():
return result

Expand All @@ -174,13 +174,13 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
if self._script_run:
(return_code, output) = self._local_exec(self._script_run, self._temp_dir)
if return_code != 0:
return (Status.FAILED, None)
return (Status.FAILED, timestamp, None)
stdout_data = self._extract_stdout_results(output.get("stdout", ""))

# FIXME: We should not be assuming that the only output file type is a CSV.
if not self._read_results_file:
_LOG.debug("Not reading the data at: %s", self)
return (Status.SUCCEEDED, stdout_data)
return (Status.SUCCEEDED, timestamp, stdout_data)

data = self._normalize_columns(pandas.read_csv(
self._config_loader_service.resolve_path(
Expand All @@ -201,7 +201,7 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:

stdout_data.update(data.iloc[-1].to_dict())
_LOG.info("Local run complete: %s ::\n%s", self, stdout_data)
return (Status.SUCCEEDED, stdout_data)
return (Status.SUCCEEDED, timestamp, stdout_data)

@staticmethod
def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame:
Expand All @@ -215,17 +215,19 @@ def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame:
data.rename(str.rstrip, axis='columns', inplace=True)
return data

def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]:
def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:

(status, _) = super().status()
(status, timestamp, _) = super().status()
if not (self._is_ready and self._read_telemetry_file):
return (status, [])
return (status, timestamp, [])

assert self._temp_dir is not None
try:
fname = self._config_loader_service.resolve_path(
self._read_telemetry_file, extra_paths=[self._temp_dir])

# TODO: Use the timestamp of the CSV file as our status timestamp?

# FIXME: We should not be assuming that the only output file type is a CSV.
data = self._normalize_columns(
pandas.read_csv(fname, index_col=False, parse_dates=[0]))
Expand All @@ -241,11 +243,11 @@ def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]:

except FileNotFoundError as ex:
_LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex)
return (status, [])
return (status, timestamp, [])

_LOG.debug("Read telemetry data:\n%s", data)
col_dtypes: Mapping[int, Type] = {0: datetime}
return (status, [
return (status, timestamp, [
(pandas.Timestamp(ts).to_pydatetime(), metric, value)
for (ts, metric, value) in data.to_records(index=False, column_dtypes=col_dtypes)
])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,22 @@ def _download_files(self, ignore_missing: bool = False) -> None:
_LOG.exception("Cannot download %s to %s", path_from, path_to)
raise ex

def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
"""
Download benchmark results from the shared storage
and run post-processing scripts locally.
Returns
-------
(status, output) : (Status, dict)
A pair of (Status, output) values, where `output` is a dict
(status, timestamp, output) : (Status, datetime, dict)
3-tuple of (Status, timestamp, output) values, where `output` is a dict
with the results or None if the status is not COMPLETED.
If run script is a benchmark, then the score is usually expected to
be in the `score` field.
"""
self._download_files()
return super().run()

def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]:
def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:
self._download_files(ignore_missing=True)
return super().status()
11 changes: 6 additions & 5 deletions mlos_bench/mlos_bench/environments/mock_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import random
import logging
from datetime import datetime
from typing import Dict, Optional, Tuple

import numpy
Expand Down Expand Up @@ -61,20 +62,20 @@ def __init__(self,
self._metrics = self.config.get("metrics", ["score"])
self._is_ready = True

def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
"""
Produce mock benchmark data for one experiment.
Returns
-------
(status, output) : (Status, dict)
A pair of (Status, output) values, where `output` is a dict
(status, timestamp, output) : (Status, datetime, dict)
3-tuple of (Status, timestamp, output) values, where `output` is a dict
with the results or None if the status is not COMPLETED.
The keys of the `output` dict are the names of the metrics
specified in the config; by default it's just one metric
named "score". All output metrics have the same value.
"""
(status, _) = result = super().run()
(status, timestamp, _) = result = super().run()
if not status.is_ready():
return result

Expand All @@ -89,7 +90,7 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
if self._range:
score = self._range[0] + score * (self._range[1] - self._range[0])

return (Status.SUCCEEDED, {metric: score for metric in self._metrics})
return (Status.SUCCEEDED, timestamp, {metric: score for metric in self._metrics})

@staticmethod
def _normalized(tunable: Tunable) -> float:
Expand Down
27 changes: 15 additions & 12 deletions mlos_bench/mlos_bench/environments/remote/remote_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

import logging
from datetime import datetime
from typing import Dict, Iterable, Optional, Tuple

from mlos_bench.environments.status import Status
Expand Down Expand Up @@ -104,15 +105,15 @@ def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) -

if self._script_setup:
_LOG.info("Set up the remote environment: %s", self)
(status, _) = self._remote_exec(self._script_setup)
(status, _timestamp, _output) = self._remote_exec(self._script_setup)
_LOG.info("Remote set up complete: %s :: %s", self, status)
self._is_ready = status.is_succeeded()
else:
self._is_ready = True

return self._is_ready

def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
"""
Runs the run script on the remote environment.
Expand All @@ -122,34 +123,34 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
Returns
-------
(status, output) : (Status, dict)
A pair of (Status, output) values, where `output` is a dict
(status, timestamp, output) : (Status, datetime, dict)
3-tuple of (Status, timestamp, output) values, where `output` is a dict
with the results or None if the status is not COMPLETED.
If run script is a benchmark, then the score is usually expected to
be in the `score` field.
"""
_LOG.info("Run script remotely on: %s", self)
(status, _) = result = super().run()
(status, timestamp, _) = result = super().run()
if not (status.is_ready() and self._script_run):
return result

(status, output) = self._remote_exec(self._script_run)
(status, timestamp, output) = self._remote_exec(self._script_run)
if status.is_succeeded() and output is not None:
output = self._extract_stdout_results(output.get("stdout", ""))
_LOG.info("Remote run complete: %s :: %s = %s", self, status, output)
return (status, output)
return (status, timestamp, output)

def teardown(self) -> None:
"""
Clean up and shut down the remote environment.
"""
if self._script_teardown:
_LOG.info("Remote teardown: %s", self)
(status, _) = self._remote_exec(self._script_teardown)
(status, _timestamp, _output) = self._remote_exec(self._script_teardown)
_LOG.info("Remote teardown complete: %s :: %s", self, status)
super().teardown()

def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, Optional[dict]]:
def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, datetime, Optional[dict]]:
"""
Run a script on the remote host.
Expand All @@ -160,8 +161,8 @@ def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, Optional[dict]]:
Returns
-------
result : (Status, dict)
A pair of Status and dict with the benchmark/script results.
result : (Status, datetime, dict)
3-tuple of Status, timestamp, and dict with the benchmark/script results.
Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
"""
env_params = self._get_env_params()
Expand All @@ -172,4 +173,6 @@ def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, Optional[dict]]:
if status in {Status.PENDING, Status.SUCCEEDED}:
(status, output) = self._remote_exec_service.get_remote_exec_results(output)
_LOG.debug("Status: %s :: %s", status, output)
return (status, output)
# FIXME: get the timestamp from the remote environment!
timestamp = datetime.utcnow()
return (status, timestamp, output)
12 changes: 6 additions & 6 deletions mlos_bench/mlos_bench/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,18 @@ def _run(env: Environment, opt: Optimizer, trial: Storage.Trial, global_config:
opt.register(trial.tunables, Status.FAILED)
return

(status, results) = env.run() # Block and wait for the final result.
(status, timestamp, results) = env.run() # Block and wait for the final result.
_LOG.info("Results: %s :: %s\n%s", trial.tunables, status, results)

# In async mode (TODO), poll the environment for status and telemetry
# and update the storage with the intermediate results.
(_, telemetry) = env.status()
# Use the status from `.run()` as it is the final status of the experiment.
(_status, _timestamp, telemetry) = env.status()

# Use the status and timestamp from `.run()` as it is the final status of the experiment.
# TODO: Use the `.status()` output in async mode.
trial.update_telemetry(status, telemetry)
trial.update_telemetry(status, timestamp, telemetry)

# FIXME: Use the actual timestamp from the benchmark.
trial.update(status, datetime.utcnow(), results)
trial.update(status, timestamp, results)
# Filter out non-numeric scores from the optimizer.
scores = results if not isinstance(results, dict) \
else {k: float(v) for (k, v) in results.items() if isinstance(v, (int, float))}
Expand Down
Loading

0 comments on commit 08dc436

Please sign in to comment.