Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Aug 7, 2024
1 parent fc21e17 commit 0f57c1e
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 50 deletions.
68 changes: 43 additions & 25 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import dragon.native.process_group as dragon_process_group
import dragon.native.machine as dragon_machine

from smartsim._core.launcher.dragon.pqueue import NodePrioritizer
from smartsim._core.launcher.dragon.pqueue import NodePrioritizer, PrioritizerFilter

# pylint: enable=import-error
# isort: on
Expand Down Expand Up @@ -211,13 +211,15 @@ def allocated_hosts(self) -> dict[str, str]:

@property
def assigned_steps(self) -> dict[str, str]:
return self._assigned_steps
# return self._assigned_steps
return self._allocated_hosts

# todo: remove?
@property
def free_hosts(self) -> t.Deque[str]: # todo: swap to dict[str, str]:
def free_hosts(self) -> t.List[str]: # todo: swap to dict[str, str]:
with self._queue_lock:
return self._free_hosts
# return self._free_hosts
return self._prioritizer.unassigned()

# @property
# def free_hosts(self) -> t.List[str]:
Expand Down Expand Up @@ -245,7 +247,7 @@ def _initialize_hosts(self) -> None:
"""List of hosts on which steps can be launched"""
self._allocated_hosts: t.Dict[str, str] = {}
# todo: replace self._allocated_hosts w/self._assigned_steps
self._assigned_steps: t.Dict[str, str] = {}
# self._assigned_steps: t.Dict[str, str] = {}
"""Mapping of hosts on which a step is already running to step ID"""

self._ref_map: t.Dict[str, _NodeRefCount] = {}
Expand Down Expand Up @@ -460,34 +462,50 @@ def _allocate_step(
num_hosts: int = request.nodes # or 1 # todo - make at least 1 again

with self._queue_lock:
# if num_hosts <= 0 or num_hosts > len(self._hosts):
# return None

# if request.hostlist:
# tracking_info = self._prioritizer.next_n_from(
# num_hosts, [host for host in request.hostlist.split(",") if host]
# )
# else:
# tracking_info = self._prioritizer.next_n(num_hosts)
if num_hosts <= 0 or num_hosts > len(self._hosts):
logger.debug(
f"The number of requested hosts ({num_hosts}) is invalid or"
f" cannot be satisfied with {len(self._hosts)} available nodes"
)
return None

# # give back the list of node names
# to_allocate = [str(info[1]) for info in tracking_info]
hosts = []
if request.hostlist:
# convert the comma-separated argument into a real list
hosts = [host for host in request.hostlist.split(",") if host]

# for host in to_allocate:
# self._assigned_steps[host] = step_id
if hosts:
reference_counts = self._prioritizer.next_n_from(num_hosts, hosts)
else:
filter_on: t.Optional[PrioritizerFilter] = None
if request.policy and request.policy.gpu_affinity:
filter_on = PrioritizerFilter.GPU
reference_counts = self._prioritizer.next_n(num_hosts, filter_on)

# return to_allocate
if num_hosts <= 0 or num_hosts > len(self._free_hosts):
if len(reference_counts) < num_hosts:
# exit if the prioritizer can't identify enough nodes
return None

to_allocate = []
for _ in range(num_hosts):
host = self._free_hosts.popleft()
to_allocate = [ref_counter[1] for ref_counter in reference_counts]

# track assigning this step to each node
for host in to_allocate:
# self._assigned_steps[host] = step_id
self._allocated_hosts[host] = step_id
to_allocate.append(host)

return to_allocate

# if num_hosts <= 0 or num_hosts > len(self._free_hosts):
# return None

# to_allocate = []
# for _ in range(num_hosts):
# # host = self._free_hosts.popleft()
# self._allocated_hosts[host] = step_id
# to_allocate.append(host)

# return to_allocate

@staticmethod
def _create_redirect_workers(
global_policy: dragon_policy.Policy,
Expand Down Expand Up @@ -769,7 +787,7 @@ def _refresh_statuses(self) -> None:
logger.debug(f"Releasing host {host}")
try:
self._allocated_hosts.pop(host) # todo: remove
self._assigned_steps.pop(host)
# self._assigned_steps.pop(host)
self._prioritizer.decrement(host)
except KeyError:
logger.error(f"Tried to free a non-allocated host: {host}")
Expand Down
59 changes: 44 additions & 15 deletions smartsim/_core/launcher/dragon/pqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
import typing as t

from smartsim.error.errors import SmartSimError
from smartsim.log import get_logger


logger = get_logger(__name__)


# tracking structure for [num_refs, node_name, is_dirty]
_NodeRefCount = t.List[t.Union[int, str]]
Expand Down Expand Up @@ -101,6 +106,12 @@ def increment(self, host: str) -> None:
tracking_info[0] = int(ref_count) + 1
tracking_info[2] = 1

def _all_refs(self) -> t.List[_NodeRefCount]:
"""Combine the CPU and GPU nodes into a single heap"""
refs = [*self._cpu_refs, *self._gpu_refs]
heapq.heapify(refs)
return refs

def get_tracking_info(self, host: str) -> _NodeRefCount:
return self._ref_map[host]

Expand All @@ -127,15 +138,27 @@ def _create_sub_heap(self, hosts: t.List[str]) -> t.List[_NodeRefCount]:

def next_from(self, hosts: t.List[str]) -> t.Optional[_NodeRefCount]:
"""Return the next node available given a set of desired hosts"""
if not hosts or len(hosts) == 0:
raise ValueError("No host names provided")

sub_heap = self._create_sub_heap(hosts)
return self._get_next_available_node(sub_heap)

def next_n_from(self, num_items: int, hosts: t.List[str]) -> t.List[_NodeRefCount]:
"""Return the next N available nodes given a set of desired hosts"""
"""Return the next N available nodes given a set of desired hosts
:param num_items: the number of items to retrieve
:param hosts: the host names to filter on
:returns: a list of reference counts
:raises ValueError: if hosts list is empty"""
if not hosts or len(hosts) == 0:
raise ValueError("No host names provided")

if num_items < 1:
raise ValueError(f"Number of items requested {num_items} is invalid")

sub_heap = self._create_sub_heap(hosts)
return self._get_next_n_available_nodes(num_items, sub_heap)

@property
def unassigned(
self, heap: t.Optional[t.List[_NodeRefCount]] = None
) -> t.List[_NodeRefCount]:
Expand All @@ -145,7 +168,6 @@ def unassigned(

return [node for node in heap if node[0] == 0]

@property
def assigned(
self, heap: t.Optional[t.List[_NodeRefCount]] = None
) -> t.List[_NodeRefCount]:
Expand All @@ -163,22 +185,19 @@ def _check_satisfiable_n(
num_nodes = len(self._ref_map.keys())

if num_items < 1:
# msg = f"Unable to satisfy request for less than 1 node"
# msg = f"Cannot satisfy request for less than 1 node"
# raise ValueError(msg)
return False

if num_nodes < num_items:
# msg = f"Unable to satisfy request for {n} nodes from pool of {num_nodes}"
# msg = f"Cannot satisfy request for {n} nodes from pool of {num_nodes}"
# raise ValueError(msg)
return False

num_open = (
len(self.unassigned()) if heap is None else len(self.unassigned(heap))
)
num_open = len(self.unassigned(heap))
if num_open < num_items:
# msg = "Unable to satisfy request for "
# f"{n} nodes from {num_nodes} available"
# raise ValueError(msg)
msg = f"Cannot satisfy request for {num_items} nodes from {num_open} available"
logger.warning(msg)
return False

return True
Expand Down Expand Up @@ -228,14 +247,20 @@ def _get_next_n_available_nodes(
:returns: Collection of reserved nodes"""
next_nodes: t.List[_NodeRefCount] = []

if num_items < 1:
raise ValueError(f"Number of items requested {num_items} is invalid")

if not self._check_satisfiable_n(num_items, heap):
return next_nodes

# TODO: convert return type to an actual node or the hostname
next_node: t.Optional[_NodeRefCount] = self._get_next_available_node(heap)
while len(next_nodes) < num_items and next_node is not None:
next_nodes.append(next_node)

while len(next_nodes) < num_items:
next_node = self._get_next_available_node(heap)
if next_node:
next_nodes.append(next_node)
else:
break

return next_nodes

Expand All @@ -256,7 +281,7 @@ def next(
return None

def next_n(
self, num_items: int = 1, filter_on: PrioritizerFilter = PrioritizerFilter.CPU
self, num_items: int = 1, filter_on: t.Optional[PrioritizerFilter] = None
) -> t.List[_NodeRefCount]:
"""Find the next N available nodes w/least amount of references using
the supplied filter to target a specific node capability
Expand All @@ -265,5 +290,9 @@ def next_n(
heap = self._cpu_refs
if filter_on == PrioritizerFilter.GPU:
heap = self._gpu_refs
elif filter_on == PrioritizerFilter.CPU:
heap = self._cpu_refs
else:
heap = self._all_refs()

return self._get_next_n_available_nodes(num_items, heap)
6 changes: 3 additions & 3 deletions tests/test_dragon_run_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None:

assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._free_hosts) == 1
assert len(dragon_backend.free_hosts) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
# assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
Expand All @@ -290,7 +290,7 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None:

assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._free_hosts) == 1
assert len(dragon_backend.free_hosts) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
# assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
Expand Down Expand Up @@ -448,7 +448,7 @@ def test_stop_request(monkeypatch: pytest.MonkeyPatch) -> None:
== SmartSimStatus.STATUS_CANCELLED
)

assert len(dragon_backend._assigned_steps) == 0
assert len(dragon_backend._allocated_hosts) == 0
assert len(dragon_backend._free_hosts) == 3


Expand Down
Loading

0 comments on commit 0f57c1e

Please sign in to comment.