Skip to content

Commit

Permalink
resolve check-all issues
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Jul 29, 2024
1 parent 76bf007 commit 953bcd0
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 69 deletions.
43 changes: 20 additions & 23 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import collections
import functools
import heapq
import itertools
import time
import typing as t
Expand All @@ -47,7 +46,6 @@
import dragon.native.machine as dragon_machine

from smartsim._core.launcher.dragon.pqueue import NodePrioritizer
from smartsim.error.errors import SmartSimError

# pylint: enable=import-error
# isort: on
Expand All @@ -74,7 +72,7 @@


# tracking structure for [num_refs, node_name, is_dirty]
_NodeRefCount = t.List[int, str, bool]
_NodeRefCount = t.List[t.Union[int, str, bool]]


class DragonStatus(str, Enum):
Expand Down Expand Up @@ -206,14 +204,13 @@ def hosts(self) -> list[str]:
return self._hosts

@property
def allocated_hosts(self) -> dict[str, str]:
with self._queue_lock:
return self._allocated_hosts
def assigned_steps(self) -> dict[str, str]:
return self._assigned_steps

# @property
# def free_hosts(self) -> t.Deque[str]:
# with self._queue_lock:
# return self._free_hosts
@property
def free_hosts(self) -> t.List[str]:
with self._queue_lock:
return [str(item[1]) for item in self._prioritizer.unassigned()]

@property
def group_infos(self) -> dict[str, ProcessGroupInfo]:
Expand All @@ -232,7 +229,7 @@ def _initialize_hosts(self) -> None:
# """List of hosts available in allocation"""
# self._free_hosts: t.Deque[str] = collections.deque(self._hosts)
"""List of hosts on which steps can be launched"""
self._allocated_hosts: t.Dict[str, str] = {}
self._assigned_steps: t.Dict[str, str] = {}
"""Mapping of hosts on which a step is already running to step ID"""

self._ref_map: t.Dict[str, _NodeRefCount] = {}
Expand All @@ -242,8 +239,6 @@ def _initialize_hosts(self) -> None:
self._gpu_refs: t.List[_NodeRefCount] = []
"""Track reference counts to GPU nodes"""

self._initialize_reference_counters()

def __str__(self) -> str:
return self.status_message

Expand Down Expand Up @@ -322,9 +317,11 @@ def _can_honor_hosts(
message += f"but only {len(self._hosts)} nodes are available."
return False, message

requested_hosts: t.Set[str] = set(request.policy.hostlist)
if not requested_hosts:
return True, None
requested_hosts: t.Set[str] = set()
if request.hostlist:
requested_hosts = set(request.hostlist)
if not requested_hosts:
return True, None

all_hosts = set(self._hosts)
valid_hosts = all_hosts.intersection(requested_hosts)
Expand Down Expand Up @@ -414,18 +411,18 @@ def _allocate_step(
if num_hosts <= 0 or num_hosts > len(self._hosts):
return None

if request.policy.hostlist:
if request.hostlist:
tracking_info = self._prioritizer.next_n_from(
num_hosts, request.policy.hostlist
num_hosts, [host for host in request.hostlist.split(",") if host]
)
else:
tracking_info = self._prioritizer.next_n(num_hosts)

# give back the list of node names
to_allocate = [info[1] for info in tracking_info]
to_allocate = [str(info[1]) for info in tracking_info]

for host in to_allocate:
self._allocated_hosts[host] = step_id
self._assigned_steps[host] = step_id

return to_allocate

Expand Down Expand Up @@ -711,7 +708,7 @@ def _refresh_statuses(self) -> None:
for host in group_info.hosts:
logger.debug(f"Releasing host {host}")
try:
self._allocated_hosts.pop(host)
self._assigned_steps.pop(host)
self._prioritizer.decrement(host)
except KeyError:
logger.error(f"Tried to free a non-allocated host: {host}")
Expand Down
77 changes: 44 additions & 33 deletions smartsim/_core/launcher/dragon/pqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@

from smartsim.error.errors import SmartSimError


# tracking structure for [num_refs, node_name, is_dirty]
_NodeRefCount = t.List[t.Union[int, str, bool]]
_NodeRefCount = t.List[t.Union[int, str]]


class PrioritizerFilter(str, enum.Enum):
CPU: str = enum.auto()
GPU: str = enum.auto()
CPU = enum.auto()
GPU = enum.auto()


class Node(t.Protocol):
Expand All @@ -54,7 +53,6 @@ def hostname(self) -> str: ...


class NodePrioritizer:

def __init__(self, nodes: t.List[Node], lock: threading.RLock) -> None:
if not nodes:
raise SmartSimError("Missing nodes to prioritize")
Expand All @@ -76,7 +74,11 @@ def _initialize_reference_counters(self, nodes: t.List[Node]) -> None:
"""Perform initialization of reference counters for nodes in the allocation."""
for node in nodes:
# initialize all node counts to 0 and mark the entries "is_dirty=False"
tracking_info = [0, node.hostname, False] # use list for mutability
tracking_info: _NodeRefCount = [
0,
node.hostname,
0,
] # use list for mutability

self._ref_map[node.hostname] = tracking_info

Expand All @@ -94,8 +96,9 @@ def increment(self, host: str) -> None:
ref counter is marked as dirty to trigger a reordering on retrieval"""
with self._lock:
tracking_info = self._ref_map[host]
tracking_info[0] += 1
tracking_info[2] = True
ref_count, *_ = tracking_info
tracking_info[0] = int(ref_count) + 1
tracking_info[2] = 1

def get_tracking_info(self, host: str) -> _NodeRefCount:
return self._ref_map[host]
Expand All @@ -105,8 +108,8 @@ def decrement(self, host: str) -> None:
ref counter is marked as dirty to trigger a reordering"""
with self._lock:
tracking_info = self._ref_map[host]
tracking_info[0] = max(tracking_info[0] - 1, 0)
tracking_info[2] = True
tracking_info[0] = max(int(tracking_info[0]) - 1, 0)
tracking_info[2] = 1

def _create_sub_heap(self, hosts: t.List[str]) -> t.List[_NodeRefCount]:
nodes_tracking_info: t.List[_NodeRefCount] = []
Expand All @@ -121,17 +124,17 @@ def _create_sub_heap(self, hosts: t.List[str]) -> t.List[_NodeRefCount]:

return nodes_tracking_info

def next_from(self, hosts: t.List[str]) -> _NodeRefCount:
def next_from(self, hosts: t.List[str]) -> t.Optional[_NodeRefCount]:
"""Return the next node available given a set of desired hosts"""
sub_heap = self._create_sub_heap(hosts)
return self._get_next_available_node(sub_heap)

def next_n_from(self, n: int, hosts: t.List[str]) -> t.List[_NodeRefCount]:
def next_n_from(self, num_items: int, hosts: t.List[str]) -> t.List[_NodeRefCount]:
"""Return the next N available nodes given a set of desired hosts"""
sub_heap = self._create_sub_heap(hosts)
return self._get_next_n_available_nodes(n, sub_heap)
return self._get_next_n_available_nodes(num_items, sub_heap)

def open(
def unassigned(
self, heap: t.Optional[t.List[_NodeRefCount]] = None
) -> t.List[_NodeRefCount]:
"""Helper method to identify the nodes that are currently not assigned"""
Expand All @@ -140,7 +143,7 @@ def open(

return [node for node in heap if node[0] == 0]

def closed(
def assigned(
self, heap: t.Optional[t.List[_NodeRefCount]] = None
) -> t.List[_NodeRefCount]:
"""Helper method to identify the nodes that are currently assigned"""
Expand All @@ -150,25 +153,28 @@ def closed(
return [node for node in heap if node[0] == 1]

def _check_satisfiable_n(
self, n: int, heap: t.Optional[t.List[_NodeRefCount]] = None
self, num_items: int, heap: t.Optional[t.List[_NodeRefCount]] = None
) -> bool:
"""Validates that a request for some number of nodes `n` can be
satisfied by the prioritizer given the set of nodes available"""
num_nodes = len(self._ref_map.keys())

if n < 1:
if num_items < 1:
# msg = f"Unable to satisfy request for less than 1 node"
# raise ValueError(msg)
return False

if num_nodes < n:
if num_nodes < num_items:
# msg = f"Unable to satisfy request for {n} nodes from pool of {num_nodes}"
# raise ValueError(msg)
return False

num_open = len(self.open()) if heap is None else len(self.open(heap))
if num_open < n:
# msg = f"Unable to satisfy request for {n} nodes from {num_nodes} available"
num_open = (
len(self.unassigned()) if heap is None else len(self.unassigned(heap))
)
if num_open < num_items:
# msg = "Unable to satisfy request for "
# f"{n} nodes from {num_nodes} available"
# raise ValueError(msg)
return False

Expand All @@ -189,13 +195,13 @@ def _get_next_available_node(
while is_dirty:
if is_dirty:
# mark dirty items clean and place back into heap to be sorted
tracking_info[2] = False
tracking_info[2] = 0
heapq.heappush(heap, tracking_info)

tracking_info = heapq.heappop(heap)
is_dirty = tracking_info[2]

original_ref_count = tracking_info[0]
original_ref_count = int(tracking_info[0])
if original_ref_count == 0:
# increment the ref count before putting back onto heap
tracking_info[0] = original_ref_count + 1
Expand All @@ -210,46 +216,51 @@ def _get_next_available_node(

def _get_next_n_available_nodes(
self,
n: int,
heap: t.Optional[t.List[_NodeRefCount]],
num_items: int,
heap: t.List[_NodeRefCount],
) -> t.List[_NodeRefCount]:
"""Find the next N available nodes w/least amount of references using
the supplied filter to target a specific node capability
:param n: number of nodes to reserve
:returns: Collection of reserved nodes"""
next_nodes: t.List[_NodeRefCount] = []

if not self._check_satisfiable_n(n, heap):
if not self._check_satisfiable_n(num_items, heap):
return next_nodes

# TODO: convert return type to an actual node or the hostname
next_node: t.Optional[_NodeRefCount] = self._get_next_available_node(heap)
while len(next_nodes) < n and next_node is not None:
while len(next_nodes) < num_items and next_node is not None:
next_nodes.append(next_node)
next_node = self._get_next_available_node(heap)

return next_nodes

def next(self, filter: PrioritizerFilter = PrioritizerFilter.CPU) -> _NodeRefCount:
def next(
self, filter_on: PrioritizerFilter = PrioritizerFilter.CPU
) -> t.Optional[_NodeRefCount]:
"""Find the next node available w/least amount of references using
the supplied filter to target a specific node capability"""
heap = self._cpu_refs

if filter == PrioritizerFilter.GPU:
if filter_on == PrioritizerFilter.GPU:
heap = self._gpu_refs

# TODO: convert return type to an actual node or the hostname
return self._get_next_available_node(heap)
if node := self._get_next_available_node(heap):
return node

return None

def next_n(
self, n: int = 1, filter: PrioritizerFilter = PrioritizerFilter.CPU
self, num_items: int = 1, filter_on: PrioritizerFilter = PrioritizerFilter.CPU
) -> t.List[_NodeRefCount]:
"""Find the next N available nodes w/least amount of references using
the supplied filter to target a specific node capability
:param n: number of nodes to reserve
:returns: Collection of reserved nodes"""
heap = self._cpu_refs
if filter == PrioritizerFilter.GPU:
if filter_on == PrioritizerFilter.GPU:
heap = self._gpu_refs

return self._get_next_n_available_nodes(n, heap)
return self._get_next_n_available_nodes(num_items, heap)
6 changes: 4 additions & 2 deletions smartsim/_core/launcher/step/dragonStep.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,11 @@ def _write_request_file(self) -> str:
run_settings = t.cast(DragonRunSettings, step.step_settings)
run_args = run_settings.run_args
env = run_settings.env_vars
nodes = int(run_args.get("nodes", None) or 1)
nodes = int(
run_args.get("nodes", None) or 1
) # todo: is this the same use as my new host-list?
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)
hosts = str(run_args.get("host-list", ""))
# hosts = str(run_args.get("host-list", ""))

policy = DragonRunPolicy.from_run_args(run_args)

Expand Down
2 changes: 1 addition & 1 deletion smartsim/_core/schemas/dragonRequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def from_run_args(

# list[str] converted to comma-separated str must split into a list[str]
hosts: t.List[str] = []
if hostlist_value := run_args.get("host-list", None):
if hostlist_value := str(run_args.get("host-list", "")):
hosts = [x for x in hostlist_value.split(",") if x]

try:
Expand Down
18 changes: 9 additions & 9 deletions tests/test_dragon_run_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._free_hosts) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id

monkeypatch.setattr(
dragon_backend._group_infos[step_id].process_group, "status", "Running"
Expand All @@ -282,8 +282,8 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._free_hosts) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id

dragon_backend._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED

Expand Down Expand Up @@ -367,8 +367,8 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._free_hosts) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id

monkeypatch.setattr(
dragon_backend._group_infos[step_id].process_group, "status", "Running"
Expand All @@ -379,8 +379,8 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._free_hosts) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
assert dragon_backend._assigned_steps[dragon_backend.hosts[0]] == step_id
assert dragon_backend._assigned_steps[dragon_backend.hosts[1]] == step_id

dragon_backend._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED

Expand Down Expand Up @@ -433,7 +433,7 @@ def test_stop_request(monkeypatch: pytest.MonkeyPatch) -> None:
== SmartSimStatus.STATUS_CANCELLED
)

assert len(dragon_backend._allocated_hosts) == 0
assert len(dragon_backend._assigned_steps) == 0
assert len(dragon_backend._free_hosts) == 3


Expand Down
Loading

0 comments on commit 953bcd0

Please sign in to comment.