Skip to content

Commit

Permalink
separate status checks for unmanaged entities
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Mar 7, 2024
1 parent 0a38e66 commit 57ede39
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
18 changes: 18 additions & 0 deletions smartsim/_core/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import json
import pathlib
import time
import typing as t
from dataclasses import dataclass
Expand Down Expand Up @@ -55,6 +57,7 @@ def __init__(self) -> None:
self.collectors: t.Dict[str, str] = {}
self.config: t.Dict[str, str] = {}
self._is_complete: bool = False
self._state: t.Dict[str, str] = {}

@property
def is_db(self) -> bool:
Expand All @@ -75,6 +78,21 @@ def is_complete(self) -> bool:
def set_complete(self) -> None:
self._is_complete = True

def poll_state(self) -> None:
# do not poll if already known to be done
if self._is_complete or not self.is_db:
# we currently only need to poll for unmanaged db updates
return

p = pathlib.Path(self.status_dir) / "stop.json"
if p.exists():
self._is_complete = True
self._state = json.loads(p.read_text())

@property
def state(self) -> t.Dict[str, str]:
return self._state


class Job:
"""Keep track of various information for the controller.
Expand Down
19 changes: 14 additions & 5 deletions smartsim/_core/utils/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ def process_manifest(self, manifest_path: str) -> None:
filter_fn=lambda e: e.key not in self._tracked_jobs
):
entity.path = str(exp_dir)
self._tracked_jobs[entity.key] = entity

if entity.telemetry_on:
collectors = find_collectors(entity)
Expand All @@ -409,9 +410,7 @@ def process_manifest(self, manifest_path: str) -> None:
pathlib.Path(entity.status_dir),
)

if entity.is_managed or entity.is_db:
self._tracked_jobs[entity.key] = entity

if entity.is_managed:
# Tell JobManager the task is unmanaged when adding so it will
# monitor it but not try to start it
self.job_manager.add_job(
Expand All @@ -420,8 +419,9 @@ def process_manifest(self, manifest_path: str) -> None:
entity,
False,
)
# and tell the launcher it's managed so it doesn't lookup the PID
self._launcher.step_mapping.add(
entity.name, entity.step_id, entity.task_id, entity.is_managed
entity.name, entity.step_id, entity.task_id, True
)
self._tracked_runs[run.timestamp] = run

Expand Down Expand Up @@ -503,8 +503,17 @@ async def on_timestep(self, timestamp: int) -> None:

await self._collector.collect()

# ensure unmanaged jobs move out of tracked jobs list
u_jobs = [job for job in self._tracked_jobs.values() if not job.is_managed]
for job in u_jobs:
job.poll_state()
if job.is_complete:
completed_entity = self._tracked_jobs.pop(job.key)
self._completed_jobs[job.key] = completed_entity

# consider not using name to avoid collisions
if names := {entity.name: entity for entity in self._tracked_jobs.values()}:
m_jobs = [job for job in self._tracked_jobs.values() if job.is_managed]
if names := {entity.name: entity for entity in m_jobs}:
step_updates = self._launcher.get_step_update(list(names.keys()))

for step_name, step_info in step_updates:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_telemetry_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ def test_telemetry_db_only_without_generate(test_dir, wlmutils, monkeypatch, con
try:
exp.start(orc)

snooze_nonblocking(telemetry_output_path, max_delay=30, post_data_delay=2)
snooze_nonblocking(telemetry_output_path, max_delay=10, post_data_delay=2)

start_events = list(telemetry_output_path.rglob("start.json"))
stop_events = list(telemetry_output_path.rglob("stop.json"))
Expand Down

0 comments on commit 57ede39

Please sign in to comment.