Skip to content

Commit

Permalink
fix init reordering bug
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Aug 26, 2024
1 parent ef034d5 commit 53800a0
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 32 deletions.
10 changes: 5 additions & 5 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ def __init__(self, pid: int) -> None:
self._step_ids = (f"{create_short_id_str()}-{id}" for id in itertools.count())
"""Incremental ID to assign to new steps prior to execution"""

self._initialize_hosts()
self._queued_steps: "collections.OrderedDict[str, DragonRunRequest]" = (
collections.OrderedDict()
)
Expand Down Expand Up @@ -188,11 +187,7 @@ def __init__(self, pid: int) -> None:
else 5
)
"""Time in seconds needed to server to complete shutdown"""

self._view = DragonBackendView(self)
logger.debug(self._view.host_desc)
self._infra_ddict: t.Optional[dragon_ddict.DDict] = None
self._prioritizer = NodePrioritizer(self._nodes, self._queue_lock)

self._nodes: t.List["dragon_machine.Node"] = []
"""Node capability information for hosts in the allocation"""
Expand All @@ -205,6 +200,11 @@ def __init__(self, pid: int) -> None:
self._allocated_hosts: t.Dict[str, t.Set[str]] = {}
"""Mapping with hostnames as keys and a set of running step IDs as the value"""

self._initialize_hosts()
self._view = DragonBackendView(self)
logger.debug(self._view.host_desc)
self._prioritizer = NodePrioritizer(self._nodes, self._queue_lock)

@property
def hosts(self) -> list[str]:
with self._queue_lock:
Expand Down
8 changes: 2 additions & 6 deletions smartsim/_core/launcher/dragon/pqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,6 @@ def remove(
:param tracking_id: a unique task identifier executing on the node
to remove
:raises ValueError: if tracking_id is already assigned to this node"""
if tracking_id and tracking_id not in self.assigned_tasks:
raise ValueError("Attempted removal of untracked item")

self._num_refs = max(self._num_refs - 1, 0)
if tracking_id:
self._assigned_tasks = self._assigned_tasks - {tracking_id}
Expand Down Expand Up @@ -460,8 +457,7 @@ def next_n(
:param hosts: a list of hostnames used to filter the available nodes
:returns: Collection of reserved nodes
:raises ValueError: if the hosts parameter is an empty list"""
if hosts is not None and not hosts:
raise ValueError("No hostnames provided")

# if hosts is not None and not hosts:
# raise ValueError("No hostnames provided")
heap = self._create_sub_heap(hosts, filter_on)
return self._get_next_n_available_nodes(num_items, heap, tracking_id)
57 changes: 46 additions & 11 deletions tests/test_dragon_run_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def set_mock_group_infos(
}

monkeypatch.setattr(dragon_backend, "_group_infos", group_infos)
monkeypatch.setattr(dragon_backend, "_allocated_hosts", {hosts[0]: "abc123-1"})
monkeypatch.setattr(dragon_backend, "_allocated_hosts", {hosts[0]: {"abc123-1"}})
monkeypatch.setattr(dragon_backend, "_running_steps", ["abc123-1"])

return group_infos
Expand Down Expand Up @@ -221,8 +221,8 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend.free_hosts) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[0]]
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[1]]

monkeypatch.setattr(
dragon_backend._group_infos[step_id].process_group, "status", "Running"
Expand All @@ -233,8 +233,8 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend.free_hosts) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[0]]
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[1]]

dragon_backend._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED

Expand Down Expand Up @@ -316,8 +316,8 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._prioritizer.unassigned()) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[0]]
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[1]]

monkeypatch.setattr(
dragon_backend._group_infos[step_id].process_group, "status", "Running"
Expand All @@ -328,8 +328,8 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._prioritizer.unassigned()) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[0]]
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[1]]

dragon_backend._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED

Expand Down Expand Up @@ -635,7 +635,8 @@ def test_view(monkeypatch: pytest.MonkeyPatch) -> None:
hosts = dragon_backend.hosts
dragon_backend._prioritizer.increment(hosts[0])

expected_msg = textwrap.dedent(f"""\
expected_msg = textwrap.dedent(
f"""\
Dragon server backend update
| Host | Status |
|--------|----------|
Expand All @@ -648,7 +649,8 @@ def test_view(monkeypatch: pytest.MonkeyPatch) -> None:
| del999-2 | Cancelled | {hosts[1]} | -9 | 1 |
| c101vz-3 | Completed | {hosts[1]},{hosts[2]} | 0 | 2 |
| 0ghjk1-4 | Failed | {hosts[2]} | -1 | 1 |
| ljace0-5 | NeverStarted | | | 0 |""")
| ljace0-5 | NeverStarted | | | 0 |"""
)

# get rid of white space to make the comparison easier
actual_msg = dragon_backend.status_message.replace(" ", "")
Expand Down Expand Up @@ -728,3 +730,36 @@ def test_can_honor_hosts_unavailable_hosts_ok(monkeypatch: pytest.MonkeyPatch) -
assert can_honor, error_msg
# confirm failure message indicates number of nodes requested as cause
assert error_msg is None, error_msg


def test_can_honor_hosts_1_hosts_requested(monkeypatch: pytest.MonkeyPatch) -> None:
"""Verify that requesting nodes with invalid names causes number of available
nodes check to be reduced but still passes if enough valid named nodes are passed"""
dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0)

# let's supply 2 valid and 1 invalid hostname
actual_hosts = list(dragon_backend._hosts)
actual_hosts[0] = f"x{actual_hosts[0]}"

host_list = ",".join(actual_hosts)

run_req = DragonRunRequest(
exe="sleep",
exe_args=["5"],
path="/a/fake/path",
nodes=1, # <----- requesting 0 nodes - should be ignored
hostlist=host_list, # <--- two valid names are available
tasks=1,
tasks_per_node=1,
env={},
current_env={},
pmi_enabled=False,
policy=DragonRunPolicy(),
)

can_honor, error_msg = dragon_backend._can_honor(run_req)

# confirm the failure is indicated
assert can_honor, error_msg
# # confirm failure message indicates number of nodes requested as cause
# assert error_msg is None, error_msg
18 changes: 8 additions & 10 deletions tests/test_node_prioritizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,9 @@ def test_node_prioritizer_multi_increment_subheap_assigned() -> None:
assert len(all_tracking_info) == 0


def test_node_prioritizer_empty_subheap_next_w_hosts() -> None:
def test_node_prioritizer_empty_subheap_next_w_no_hosts() -> None:
"""Verify that retrieving multiple nodes via `next_n` API does
not allow an empty host list"""
with an empty host list uses the entire available host list"""

num_cpu_nodes, num_gpu_nodes = 8, 0
cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes)
Expand All @@ -476,15 +476,15 @@ def test_node_prioritizer_empty_subheap_next_w_hosts() -> None:

# request n == {num_requested} nodes from set of 3 available
num_requested = 1
with pytest.raises(ValueError) as ex:
p.next(hosts=hostnames)
node = p.next(hosts=hostnames)
assert node

assert "No hostnames provided" == ex.value.args[0]
# assert "No hostnames provided" == ex.value.args[0]


def test_node_prioritizer_empty_subheap_next_n_w_hosts() -> None:
"""Verify that retrieving multiple nodes via `next_n` API does
not allow an empty host list"""
not blow up with an empty host list"""

num_cpu_nodes, num_gpu_nodes = 8, 0
cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes)
Expand All @@ -501,10 +501,8 @@ def test_node_prioritizer_empty_subheap_next_n_w_hosts() -> None:

# request n == {num_requested} nodes from set of 3 available
num_requested = 1
with pytest.raises(ValueError) as ex:
p.next_n(num_requested, hosts=hostnames)

assert "No hostnames provided" == ex.value.args[0]
node = p.next_n(num_requested, hosts=hostnames)
assert node is not None


@pytest.mark.parametrize("num_requested", [-100, -1, 0])
Expand Down

0 comments on commit 53800a0

Please sign in to comment.