Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Launch Arg Builders Format Input for Launchers #620

Merged
merged 38 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
42f7ff1
Builders are generic, have a format method
MattToast Jun 17, 2024
f76ee4f
Impl slurm
MattToast Jun 18, 2024
3027721
Impl local
MattToast Jun 18, 2024
6266b75
Impl jsrun
MattToast Jun 18, 2024
bbb5152
Impl mpi{run,exec}, orterun
MattToast Jun 18, 2024
b96a160
Impl aprun
MattToast Jun 18, 2024
dc1cf0b
Impl dragon
MattToast Jun 20, 2024
e555342
Type errors supressed for now
MattToast Jun 20, 2024
9161125
Add a dispatcher class to send built settings to a launcher
MattToast Jun 22, 2024
d765718
Isort/Black
MattToast Jun 24, 2024
01393c7
Env: dict -> Mapping
MattToast Jun 26, 2024
e504ccf
Organize dispatch file, call out work to do
MattToast Jun 26, 2024
910b2d9
Wire up dispatch dragon builder to dragon launcher
MattToast Jun 26, 2024
f4ebada
Import sort
MattToast Jul 2, 2024
9d2901f
Remove old TODOs
MattToast Jul 2, 2024
c413b8f
textwrap.dedent fix
MattToast Jul 3, 2024
35e686c
Add doc strs
MattToast Jul 4, 2024
9b2c2b7
Make dispatching settings more concise
MattToast Jul 4, 2024
a1a7638
Merge remote-tracking branch 'upstream/smartsim-refactor' into builde…
MattToast Jul 9, 2024
219d481
Address reviewer feedback
MattToast Jul 9, 2024
44ba1eb
Merge remote-tracking branch 'upstream/smartsim-refactor' into builde…
MattToast Jul 10, 2024
9797442
Remove stale FIXME comment
MattToast Jul 16, 2024
4e9f5b2
Dispatcher takes a format function with launcher like
MattToast Jul 18, 2024
0b9ec1a
Re-wrire up default dispatcher
MattToast Jul 18, 2024
6d5a3c4
Add tests for new dispatch API, make old tests pass
MattToast Jul 19, 2024
6b7943a
Upper pin typing extensions (thanks TF)
MattToast Jul 20, 2024
82ee19f
Make 3.9 compatable, lint, better names
MattToast Jul 20, 2024
36bcfbb
Address reviewer comments
MattToast Jul 17, 2024
0a1ebba
Add tests for `Experiment.start_jobs`
MattToast Jul 21, 2024
b729e91
Rename `Builder` -> `Arguments`
MattToast Jul 22, 2024
f0bf80f
Remove `Experiment._control`
MattToast Jul 24, 2024
7a0b893
Address some reviewer feedback
MattToast Jul 24, 2024
9984d18
Moar reviewer feedback
MattToast Jul 24, 2024
dbe581d
Moar reviewer requested docs
MattToast Jul 24, 2024
7327375
Merge remote-tracking branch 'upstream/smartsim-refactor' into builde…
MattToast Jul 24, 2024
0b4e65d
typo
MattToast Jul 24, 2024
50f15c0
Merge remote-tracking branch 'upstream/smartsim-refactor' into builde…
MattToast Jul 25, 2024
7bcc8fd
Address (online and offline) reviewer comments
MattToast Jul 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,17 @@ module = [
"smartsim._core.control.controller",
"smartsim._core.control.manifest",
"smartsim._core.entrypoints.dragon_client",
"smartsim._core.launcher.*",
"smartsim._core.launcher.colocated",
"smartsim._core.launcher.launcher",
"smartsim._core.launcher.local.*",
"smartsim._core.launcher.lsf.*",
"smartsim._core.launcher.pbs.*",
"smartsim._core.launcher.sge.*",
"smartsim._core.launcher.slurm.*",
"smartsim._core.launcher.step.*",
"smartsim._core.launcher.stepInfo",
"smartsim._core.launcher.stepMapping",
"smartsim._core.launcher.taskManager",
"smartsim._core.utils.serialize",
"smartsim._core.utils.telemetry.*",
"smartsim.database.*",
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ def has_ext_modules(_placeholder):
"types-tqdm",
"types-tensorflow==2.12.0.9",
"types-setuptools",
"typing_extensions>=4.1.0,<4.6",
],
"docs": [
"Sphinx==6.2.1",
Expand Down Expand Up @@ -226,6 +225,7 @@ def has_ext_modules(_placeholder):
"pygithub>=2.3.0",
"numpy<2",
"smartredis>=0.5,<0.6",
"typing_extensions>=4.1.0,<4.6",
],
cmdclass={
"build_py": SmartSimBuild,
Expand Down
5 changes: 1 addition & 4 deletions smartsim/_core/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,7 @@ def dragon_dotenv(self) -> Path:

@property
def dragon_server_path(self) -> t.Optional[str]:
return os.getenv(
"SMARTSIM_DRAGON_SERVER_PATH",
os.getenv("SMARTSIM_DRAGON_SERVER_PATH_EXP", None),
)
return os.getenv("SMARTSIM_DRAGON_SERVER_PATH", None)

@property
def dragon_server_timeout(self) -> int:
Expand Down
47 changes: 33 additions & 14 deletions smartsim/_core/launcher/dragon/dragonConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
)
from ...utils.network import find_free_port, get_best_interface_and_address

if t.TYPE_CHECKING:
from typing_extensions import Self

from smartsim.experiment import Experiment

logger = get_logger(__name__)

_SchemaT = t.TypeVar("_SchemaT", bound=t.Union[DragonRequest, DragonResponse])
Expand All @@ -69,29 +74,27 @@ class DragonConnector:
to start a Dragon server and communicate with it.
"""

def __init__(self) -> None:
def __init__(self, path: str | os.PathLike[str]) -> None:
self._context: zmq.Context[t.Any] = zmq.Context.instance()
self._context.setsockopt(zmq.REQ_CORRELATE, 1)
self._context.setsockopt(zmq.REQ_RELAXED, 1)
self._authenticator: t.Optional[zmq.auth.thread.ThreadAuthenticator] = None
config = get_config()
self._reset_timeout(config.dragon_server_timeout)

# TODO: We should be able to make these "non-optional"
# by simply moving the impl of
# `DragonConnectior.connect_to_dragon` to this method. This is
# fine as we expect the that method should only be called once
# without hitting a guard clause.
self._dragon_head_socket: t.Optional[zmq.Socket[t.Any]] = None
self._dragon_head_process: t.Optional[subprocess.Popen[bytes]] = None
# Returned by dragon head, useful if shutdown is to be requested
# but process was started by another connector
self._dragon_head_pid: t.Optional[int] = None
self._dragon_server_path = config.dragon_server_path
self._dragon_server_path = _resolve_dragon_path(path)
logger.debug(f"Dragon Server path was set to {self._dragon_server_path}")
self._env_vars: t.Dict[str, str] = {}
if self._dragon_server_path is None:
raise SmartSimError(
"DragonConnector could not find the dragon server path. "
"This should not happen if the Connector was started by an "
"experiment.\nIf the DragonConnector was started manually, "
"then the environment variable SMARTSIM_DRAGON_SERVER_PATH "
"should be set to an existing directory."
)

@property
def is_connected(self) -> bool:
Expand Down Expand Up @@ -293,8 +296,7 @@ def connect_to_dragon(self) -> None:
"Establishing connection with Dragon server or starting a new one..."
)

path = _resolve_dragon_path(self._dragon_server_path)

path = self._dragon_server_path
self._connect_to_existing_server(path)
if self.is_connected:
return
Expand Down Expand Up @@ -520,8 +522,25 @@ def _dragon_cleanup(


def _resolve_dragon_path(fallback: t.Union[str, "os.PathLike[str]"]) -> Path:
dragon_server_path = get_config().dragon_server_path or os.path.join(
fallback, ".smartsim", "dragon"
"""Return the path at which a user should set up a dragon server.

The order of path resolution is:
1) If the the user has set a global dragon path via
`Config.dragon_server_path` use that without alteration.
2) Use the `fallback` path which should be the path to an existing
directory. Append the default dragon server subdirectory defined by
`Config.dragon_default_subdir`

Currently this function will raise if a user attempts to specify multiple
dragon server paths via `:` seperation.

:param fallback: The path to an existing directory on the file system to
use if the global dragon directory is not set.
:returns: The path to directory in which the dragon server should run.
"""
config = get_config()
dragon_server_path = config.dragon_server_path or os.path.join(
fallback, config.dragon_default_subdir
)
dragon_server_paths = dragon_server_path.split(":")
if len(dragon_server_paths) > 1:
Expand Down
95 changes: 81 additions & 14 deletions smartsim/_core/launcher/dragon/dragonLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import typing as t

from smartsim._core.schemas.dragonRequests import DragonRunPolicy
from smartsim.types import LaunchedJobID

from ...._core.launcher.stepMapping import StepMap
from ....error import LauncherError, SmartSimError
Expand All @@ -44,6 +45,7 @@
from ....status import SmartSimStatus
from ...schemas import (
DragonRunRequest,
DragonRunRequestView,
DragonRunResponse,
DragonStopRequest,
DragonStopResponse,
Expand All @@ -57,6 +59,11 @@
from ..stepInfo import StepInfo
from .dragonConnector import DragonConnector, _SchemaT

if t.TYPE_CHECKING:
from typing_extensions import Self

from smartsim.experiment import Experiment

logger = get_logger(__name__)


Expand All @@ -74,9 +81,9 @@ class DragonLauncher(WLMLauncher):
the Job Manager to interact with it.
"""

def __init__(self) -> None:
def __init__(self, server_path: str | os.PathLike[str]) -> None:
super().__init__()
self._connector = DragonConnector()
self._connector = DragonConnector(server_path)
"""Connector used to start and interact with the Dragon server"""
self._slurm_launcher = SlurmLauncher()
"""Slurm sub-launcher, used only for batch jobs"""
Expand Down Expand Up @@ -121,6 +128,22 @@ def add_step_to_mapping_table(self, name: str, step_map: StepMap) -> None:
)
sublauncher.add_step_to_mapping_table(name, sublauncher_step_map)

@classmethod
def create(cls, exp: Experiment) -> Self:
self = cls(exp.exp_path)
self._connector.connect_to_dragon() # pylint: disable=protected-access
return self

def start(
self, args_and_policy: tuple[DragonRunRequestView, DragonRunPolicy]
) -> LaunchedJobID:
req_args, policy = args_and_policy
self._connector.load_persisted_env()
merged_env = self._connector.merge_persisted_env(os.environ.copy())
req = DragonRunRequest(**dict(req_args), current_env=merged_env, policy=policy)
res = _assert_schema_type(self._connector.send_request(req), DragonRunResponse)
return LaunchedJobID(res.step_id)

def run(self, step: Step) -> t.Optional[str]:
"""Run a job step through Slurm

Expand Down Expand Up @@ -167,31 +190,25 @@ def run(self, step: Step) -> t.Optional[str]:
run_args = step.run_settings.run_args
req_env = step.run_settings.env_vars
self._connector.load_persisted_env()
merged_env = self._connector.merge_persisted_env(os.environ.copy())
nodes = int(run_args.get("nodes", None) or 1)
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)

policy = DragonRunPolicy.from_run_args(run_args)

response = _assert_schema_type(
self._connector.send_request(
DragonRunRequest(
step_id = self.start(
(
DragonRunRequestView(
exe=cmd[0],
exe_args=cmd[1:],
path=step.cwd,
name=step.name,
nodes=nodes,
tasks_per_node=tasks_per_node,
env=req_env,
current_env=merged_env,
output_file=out,
error_file=err,
policy=policy,
)
),
DragonRunResponse,
),
policy,
)
)
step_id = str(response.step_id)
else:
# pylint: disable-next=consider-using-with
out_strm = open(out, "w+", encoding="utf-8")
Expand Down Expand Up @@ -325,3 +342,53 @@ def _assert_schema_type(obj: object, typ: t.Type[_SchemaT], /) -> _SchemaT:
if not isinstance(obj, typ):
raise TypeError(f"Expected schema of type `{typ}`, but got {type(obj)}")
return obj


# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# TODO: Remove this registry and move back to builder file after fixing
# circular import caused by `DragonLauncher.supported_rs`
# -----------------------------------------------------------------------------
from smartsim.settings.arguments.launch.dragon import DragonLaunchArguments
from smartsim.settings.dispatch import ExecutableProtocol, dispatch


def _as_run_request_args_and_policy(
run_req_args: DragonLaunchArguments,
exe: ExecutableProtocol,
env: t.Mapping[str, str | None],
) -> tuple[DragonRunRequestView, DragonRunPolicy]:
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# FIXME: This type is 100% unacceptable, but I don't want to spend too much
# time on fixing the dragon launcher API. Something that we need to
# revisit in the future though.
exe_, *args = exe.as_program_arguments()
run_args = dict[str, "int | str | float | None"](run_req_args._launch_args)
policy = DragonRunPolicy.from_run_args(run_args)
return (
DragonRunRequestView(
exe=exe_,
exe_args=args,
# FIXME: Currently this is hard coded because the schema requires
# it, but in future, it is almost certainly necessary that
# this will need to be injected by the user or by us to have
# the command execute next to any generated files. A similar
# problem exists for the other settings.
# TODO: Find a way to inject this path
path=os.getcwd(),
env=env,
# TODO: Not sure how this info is injected
name=None,
output_file=None,
error_file=None,
**run_args,
),
policy,
)


dispatch(
DragonLaunchArguments,
with_format=_as_run_request_args_and_policy,
to_launcher=DragonLauncher,
)
# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
25 changes: 25 additions & 0 deletions smartsim/_core/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
"""
A file of helper functions for SmartSim
"""
from __future__ import annotations

import base64
import collections.abc
import os
Expand All @@ -45,6 +47,7 @@
from types import FrameType


_T = t.TypeVar("_T")
_TSignalHandlerFn = t.Callable[[int, t.Optional["FrameType"]], object]


Expand Down Expand Up @@ -411,6 +414,28 @@ def is_crayex_platform() -> bool:
return result.is_cray


def first(predicate: t.Callable[[_T], bool], iterable: t.Iterable[_T]) -> _T | None:
"""Return the first instance of an iterable that meets some precondition.
Any elements of the iterable that do not meet the precondition will be
forgotten. If no item in the iterable is found that meets the predicate,
`None` is returned. This is roughly equivalent to

.. highlight:: python
.. code-block:: python

next(filter(predicate, iterable), None)

but does not require the predicate to be a type guard to type check.

:param predicate: A function that returns `True` or `False` given a element
of the iterable
:param iterable: An iterable that yields elements to evealuate
:returns: The first element of the iterable to make the the `predicate`
return `True`
"""
return next((item for item in iterable if predicate(item)), None)


@t.final
class SignalInterceptionStack(collections.abc.Collection[_TSignalHandlerFn]):
"""Registers a stack of callables to be called when a signal is
Expand Down
4 changes: 4 additions & 0 deletions smartsim/error/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ class LauncherUnsupportedFeature(LauncherError):
"""Raised when the launcher does not support a given method"""


class LauncherNotFoundError(LauncherError):
"""A requested launcher could not be found"""


class AllocationError(LauncherError):
"""Raised when there is a problem with the user WLM allocation"""

Expand Down
Loading
Loading