Skip to content

Commit

Permalink
comment out changes to all-tests-passing for sanity
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Jul 30, 2024
1 parent 03269e7 commit 3a80b84
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 83 deletions.
195 changes: 129 additions & 66 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,26 @@ def hosts(self) -> list[str]:
with self._queue_lock:
return self._hosts

# todo: remove
@property
def assigned_steps(self) -> dict[str, str]:
return self._assigned_steps
def free_hosts(self) -> t.Deque[str]: # todo: swap to dict[str, str]:
with self._queue_lock:
return self._free_hosts

# todo: remove
@property
def free_hosts(self) -> t.List[str]:
def allocated_hosts(self) -> dict[str, str]:
with self._queue_lock:
return [str(item[1]) for item in self._prioritizer.unassigned()]
return self._allocated_hosts

@property
def assigned_steps(self) -> dict[str, str]:
return self._assigned_steps

# @property
# def free_hosts(self) -> t.List[str]:
# with self._queue_lock:
# return [str(item[1]) for item in self._prioritizer.unassigned()]

@property
def group_infos(self) -> dict[str, ProcessGroupInfo]:
Expand All @@ -227,8 +239,12 @@ def _initialize_hosts(self) -> None:
self._gpus = [node.num_gpus for node in self._nodes]

# """List of hosts available in allocation"""
# self._free_hosts: t.Deque[str] = collections.deque(self._hosts)
self._free_hosts: t.Deque[str] = collections.deque(
self._hosts
) # todo: remove
"""List of hosts on which steps can be launched"""
self._allocated_hosts: t.Dict[str, str] = {}
# todo: replace self._allocated_hosts w/self._assigned_steps
self._assigned_steps: t.Dict[str, str] = {}
"""Mapping of hosts on which a step is already running to step ID"""

Expand Down Expand Up @@ -304,49 +320,62 @@ def current_time(self) -> float:
"""Current time for DragonBackend object, in seconds since the Epoch"""
return time.time()

def _can_honor_hosts(
self, request: DragonRunRequest
def _can_honor_policy(self, request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the current state of the backend process inhibits executing
the request.
:param request: the DragonRunRequest to verify
:returns: Tuple indicating if the request can be honored and
"""Check if the policy can be honored with resources available
in the allocation.
:param request: DragonRunRequest containing policy information
:returns: Tuple indicating if the policy can be honored and
an optional error message"""
# ensure the policy can be honored
if request.policy:
if request.policy.cpu_affinity:
# make sure some node has enough CPUs
available = max(self._cpus)
requested = max(request.policy.cpu_affinity)
if requested >= available:
return False, "Cannot satisfy request, not enough CPUs available"
if request.policy.gpu_affinity:
# make sure some node has enough GPUs
available = max(self._gpus)
requested = max(request.policy.gpu_affinity)
if requested >= available:
return False, "Cannot satisfy request, not enough GPUs available"
return True, None

def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]]:
"""Check if request can be honored with resources available in the allocation.
Currently only checks for total number of nodes,
in the future it will also look at other constraints
such as memory, accelerators, and so on.
"""
if request.nodes > len(self._hosts):
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(self._hosts)} nodes are available."
return False, message
if self._shutdown_requested:
message = "Cannot satisfy request, server is shutting down."
return False, message

requested_hosts: t.Set[str] = set()
if request.hostlist:
requested_hosts = set(request.hostlist)
if not requested_hosts:
return True, None
honorable, err = self._can_honor_policy(request)
if not honorable:
return False, err

all_hosts = set(self._hosts)
valid_hosts = all_hosts.intersection(requested_hosts)
return True, None
# honorable, err = self._can_honor_hosts(request)
# if not honorable:
# return False, err

# don't worry about count when only hostnames are supplied (req.nodes == 0)
if request.nodes > len(valid_hosts):
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(valid_hosts)} named hosts are available."
return False, message
# honorable, err = self._can_honor_state(request)
# if not honorable:
# return False, err

return True, None
# honorable, err = self._can_honor_affinities(request)
# if not honorable:
# return False, err

def _can_honor_state(
self, _request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the current state of the backend process inhibits executing
the request.
:param _request: the DragonRunRequest to verify
:returns: Tuple indicating if the request can be honored and
an optional error message"""
if self._shutdown_requested:
message = "Cannot satisfy request, server is shutting down."
return False, message
# return True, None

return True, None

def _can_honor_affinities(
self, request: DragonRunRequest
Expand Down Expand Up @@ -375,24 +404,47 @@ def _can_honor_affinities(

return True, None

def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]]:
"""Check if request can be honored with resources available in the allocation.
def _can_honor_hosts(
self, request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the current state of the backend process inhibits executing
the request.
:param request: the DragonRunRequest to verify
:returns: Tuple indicating if the request can be honored and
an optional error message"""
if request.nodes > len(self._hosts):
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(self._hosts)} nodes are available."
return False, message

Currently only checks for total number of nodes,
in the future it will also look at other constraints
such as memory, accelerators, and so on.
"""
honorable, err = self._can_honor_hosts(request)
if not honorable:
return False, err
requested_hosts: t.Set[str] = set()
if request.hostlist:
requested_hosts = set(request.hostlist)
if not requested_hosts:
return True, None

honorable, err = self._can_honor_state(request)
if not honorable:
return False, err
all_hosts = set(self._hosts)
valid_hosts = all_hosts.intersection(requested_hosts)

honorable, err = self._can_honor_affinities(request)
if not honorable:
return False, err
# don't worry about count when only hostnames are supplied (req.nodes == 0)
if request.nodes > len(valid_hosts):
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(valid_hosts)} named hosts are available."
return False, message

return True, None

def _can_honor_state(
self, _request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the current state of the backend process inhibits executing
the request.
:param _request: the DragonRunRequest to verify
:returns: Tuple indicating if the request can be honored and
an optional error message"""
if self._shutdown_requested:
message = "Cannot satisfy request, server is shutting down."
return False, message

return True, None

Expand All @@ -404,25 +456,35 @@ def _allocate_step(
:param request: The request to be executed
:returns: A list of selected hostnames"""
# ensure at least one host is selected
num_hosts: int = request.nodes or 1
num_hosts: int = request.nodes # or 1 # todo - make at least 1 again

with self._queue_lock:
# ensure positive integers in valid range are accepted
if num_hosts <= 0 or num_hosts > len(self._hosts):
return None
# # ensure positive integers in valid range are accepted
# if num_hosts <= 0 or num_hosts > len(self._hosts):
# return None

if request.hostlist:
tracking_info = self._prioritizer.next_n_from(
num_hosts, [host for host in request.hostlist.split(",") if host]
)
else:
tracking_info = self._prioritizer.next_n(num_hosts)
# if request.hostlist:
# tracking_info = self._prioritizer.next_n_from(
# num_hosts, [host for host in request.hostlist.split(",") if host]
# )
# else:
# tracking_info = self._prioritizer.next_n(num_hosts)

# give back the list of node names
to_allocate = [str(info[1]) for info in tracking_info]
# # give back the list of node names
# to_allocate = [str(info[1]) for info in tracking_info]

# for host in to_allocate:
# self._assigned_steps[host] = step_id

# return to_allocate
if num_hosts <= 0 or num_hosts > len(self._free_hosts):
return None

for host in to_allocate:
self._assigned_steps[host] = step_id
to_allocate = []
for _ in range(num_hosts):
host = self._free_hosts.popleft()
self._allocated_hosts[host] = step_id
to_allocate.append(host)

return to_allocate

Expand Down Expand Up @@ -712,11 +774,12 @@ def _refresh_statuses(self) -> None:
for host in group_info.hosts:
logger.debug(f"Releasing host {host}")
try:
self._allocated_hosts.pop(host) # todo: remove
self._assigned_steps.pop(host)
self._prioritizer.decrement(host)
except KeyError:
logger.error(f"Tried to free a non-allocated host: {host}")
# self._free_hosts.append(host)
self._free_hosts.append(host) # todo: remove
group_info.process_group = None
group_info.redir_workers = None

Expand Down
10 changes: 5 additions & 5 deletions smartsim/_core/schemas/dragonRequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,16 @@ def from_run_args(
gpu_affinity = [int(x.strip()) for x in gpu_args.split(",") if x]
cpu_affinity = [int(x.strip()) for x in cpu_args.split(",") if x]

# list[str] converted to comma-separated str must split into a list[str]
hosts: t.List[str] = []
if hostlist_value := str(run_args.get("host-list", "")):
hosts = [x for x in hostlist_value.split(",") if x]
# # list[str] converted to comma-separated str must split into a list[str]
# hosts: t.List[str] = []
# if hostlist_value := str(run_args.get("host-list", "")):
# hosts = [x for x in hostlist_value.split(",") if x]

try:
return DragonRunPolicy(
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
hostlist=hosts,
# hostlist=hosts,
)
except ValidationError as ex:
raise SmartSimError("Unable to build DragonRunPolicy") from ex
Expand Down
47 changes: 35 additions & 12 deletions tests/test_dragon_run_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import pytest
from pydantic import ValidationError

from smartsim._core.launcher.dragon.pqueue import NodePrioritizer

# The tests in this file belong to the group_b group
pytestmark = pytest.mark.group_b

Expand Down Expand Up @@ -165,6 +167,11 @@ def get_mock_backend(
monkeypatch.setattr(
dragon_backend, "_free_hosts", collections.deque(dragon_backend._hosts)
)
# monkeypatch.setattr(
# dragon_backend._prioritizer,
# NodePrioritizer(dragon_backend._hosts, dragon_backend._queue_lock),
# collections.deque(dragon_backend._hosts),
# )

return dragon_backend

Expand Down Expand Up @@ -270,8 +277,10 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._free_hosts) == 1
assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
# assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
# assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id

monkeypatch.setattr(
dragon_backend._group_infos[step_id].process_group, "status", "Running"
Expand All @@ -282,8 +291,10 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._free_hosts) == 1
assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
# assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
# assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id

dragon_backend._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED

Expand Down Expand Up @@ -367,8 +378,10 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._free_hosts) == 1
assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
# assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
# assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id

monkeypatch.setattr(
dragon_backend._group_infos[step_id].process_group, "status", "Running"
Expand All @@ -379,8 +392,10 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._free_hosts) == 1
assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
# assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
# assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id

dragon_backend._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED

Expand Down Expand Up @@ -669,23 +684,31 @@ def test_view(monkeypatch: pytest.MonkeyPatch) -> None:
set_mock_group_infos(monkeypatch, dragon_backend)
hosts = dragon_backend.hosts

expected_message = textwrap.dedent(f"""\
expected_message = textwrap.dedent(
f"""\
Dragon server backend update
| Host | Status |
|--------|----------|
|---------|----------|
| {hosts[0]} | Busy |
| {hosts[1]} | Free |
| {hosts[2]} | Free |
| Step | Status | Hosts | Return codes | Num procs |
|----------|--------------|-------------|----------------|-------------|
|----------|--------------|-----------------|----------------|-------------|
| abc123-1 | Running | {hosts[0]} | | 1 |
| del999-2 | Cancelled | {hosts[1]} | -9 | 1 |
| c101vz-3 | Completed | {hosts[1]},{hosts[2]} | 0 | 2 |
| 0ghjk1-4 | Failed | {hosts[2]} | -1 | 1 |
| ljace0-5 | NeverStarted | | | 0 |""")
| ljace0-5 | NeverStarted | | | 0 |"""
)

# get rid of white space to make the comparison easier
actual_msg = dragon_backend.status_message.replace(" ", "")
expected_message = expected_message.replace(" ", "")

print("*************************")
print(actual_msg)
print("*************************")
print(expected_message)
print("*************************")

assert actual_msg == expected_message

0 comments on commit 3a80b84

Please sign in to comment.