Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration of dragon-based event broadcasting #710

Merged
merged 40 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c6a145a
Squash event integration
ankona Sep 18, 2024
49e0da4
Revert featurestorekey test changes
ankona Sep 18, 2024
fd0a5ec
capture dragon start method ex
ankona Sep 20, 2024
00a4496
Split message pump into separate module from dispatcher test
ankona Sep 20, 2024
a87aba7
extract msg_pump_factory for reuse in test_worker_manager.py
ankona Sep 23, 2024
4879122
add test verifying protoclient event raises
ankona Sep 24, 2024
6f1cba7
reduce timeouts & backoffs, share backbone across protoclient tests
ankona Sep 24, 2024
c0a1bca
Test eventing end-to-end in single process
ankona Sep 25, 2024
d81334d
docstrings & miscellaneous minor fixes (reuse descriptor code, add dr…
ankona Sep 25, 2024
f5b7b7d
fix infinite loop bug in consumer batch receive
ankona Sep 25, 2024
bd03711
Sort imports to solve dragon import issue in non-dragon tests
ankona Sep 25, 2024
98260f3
swap session scopes to module to destroy dragon resources
ankona Sep 26, 2024
5ba2a42
use make process local to avoid MPI issue, fix some test regressions,…
ankona Sep 26, 2024
1f4e6e3
more docstrings standard fixes
ankona Sep 26, 2024
9c8d127
reduce default worker connect timeout, fix test timeout issue due to …
ankona Sep 26, 2024
7442eb1
use constants in tests for env var strings, docstrings, remove commen…
ankona Sep 26, 2024
da20b5f
docstring formatting in tests
ankona Sep 27, 2024
f5ba5a6
docstrings
ankona Sep 27, 2024
d689754
parameterize ddict creation, add single ddict touchpoint util module,…
ankona Sep 27, 2024
7ddcd7c
remove completed todos, fix docstrings, remove obsolete/commented code
ankona Sep 27, 2024
762937c
extract notify listener from dragon backend, fix dragon import order,…
ankona Oct 1, 2024
51baf61
stringification bug fix
ankona Oct 1, 2024
3f4af8e
remove use of deprecated class
ankona Oct 1, 2024
979373c
review changes part 1, improve dragon errors handling, add unregister…
ankona Oct 1, 2024
5898005
import order follow-up
ankona Oct 1, 2024
af870f9
pr review updates
ankona Oct 4, 2024
27ab4f9
fix extra arg copypasta
ankona Oct 4, 2024
fac30bd
test bugs
ankona Oct 4, 2024
a1cf7ff
Remove from_sender_supplied_descriptor factory on FLI channel
ankona Oct 7, 2024
124a195
re-home protoclient into mli subpackage
ankona Oct 7, 2024
b2c4cb7
additional missed docstrings
ankona Oct 7, 2024
0c495f6
improve service param docstrings, avoid separate var/param descriptions
ankona Oct 8, 2024
1fc59e4
docstring fixes
ankona Oct 8, 2024
b316128
increase default memory per backbone node
ankona Oct 8, 2024
2ed47a4
fix fixture usage bug (worker queue preloaded into backbone)
ankona Oct 8, 2024
8ccebb5
extract dragon fixtures into dragon conftest.py
ankona Oct 8, 2024
608d6bd
remove unused import
ankona Oct 8, 2024
68d0d0c
fix send-multi with FLI after sender-supplied channel removal
ankona Oct 9, 2024
c28870f
Update dispatch tests to use dragon processes
ankona Oct 9, 2024
78d5598
Use cached FLI channel on single-send
ankona Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
test_hostlist = None
has_aprun = shutil.which("aprun") is not None


def get_account() -> str:
return test_account

Expand Down Expand Up @@ -459,15 +460,10 @@ def environment_cleanup(monkeypatch: pytest.MonkeyPatch) -> None:

@pytest.fixture(scope="function", autouse=True)
def check_output_dir() -> None:
try:
global test_output_dirs
assert os.path.isdir(test_output_root)
assert len(os.listdir(test_output_root)) >= test_output_dirs
test_output_dirs = len(os.listdir(test_output_root))
except Exception:
# swallow error when the tests can't clean up test dirs
# and let the next run do the job.
...
global test_output_dirs
assert os.path.isdir(test_output_root)
assert len(os.listdir(test_output_root)) >= test_output_dirs
test_output_dirs = len(os.listdir(test_output_root))


@pytest.fixture
Expand Down
2 changes: 1 addition & 1 deletion ex/high_throughput_inference/mock_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from smartsim.log import get_logger, log_to_file
from smartsim.protoclient import ProtoClient

logger = get_logger("App", "DEBUG")
logger = get_logger("App")


CHECK_RESULTS_AND_MAKE_ALL_SLOWER = False
Expand Down
2 changes: 1 addition & 1 deletion ex/high_throughput_inference/standalone_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def service_as_dragon_proc(
args = parser.parse_args()

connect_to_infrastructure()
ddict_str = os.environ["_SMARTSIM_INFRA_BACKBONE"]
ddict_str = os.environ[BackboneFeatureStore.MLI_BACKBONE]

backbone = BackboneFeatureStore.from_descriptor(ddict_str)

Expand Down
3 changes: 1 addition & 2 deletions smartsim/_core/_cli/scripts/dragon_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,13 @@ def get_auth_token(request: DragonInstallRequest) -> t.Optional[Token]:
def create_dotenv(dragon_root_dir: pathlib.Path, dragon_version: str) -> None:
"""Create a .env file with required environment variables for the Dragon runtime"""
dragon_root = str(dragon_root_dir)
dragon_rut_dir = dragon_root
dragon_inc_dir = dragon_root + "/include"
dragon_lib_dir = dragon_root + "/lib"
dragon_bin_dir = dragon_root + "/bin"

dragon_vars = {
"DRAGON_BASE_DIR": dragon_root,
"DRAGON_ROOT_DIR": dragon_rut_dir,
"DRAGON_ROOT_DIR": dragon_root,
"DRAGON_INCLUDE_DIR": dragon_inc_dir,
"DRAGON_LIB_DIR": dragon_lib_dir,
"DRAGON_VERSION": dragon_version,
Expand Down
26 changes: 14 additions & 12 deletions smartsim/_core/entrypoints/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,21 @@ class Service(ABC):
def __init__(
self,
as_service: bool = False,
cooldown: int = 0,
loop_delay: int = 0,
cooldown: float = 0,
loop_delay: float = 0,
health_check_frequency: float = 0,
) -> None:
"""Initialize the ServiceHost

:param as_service: Determines if the host will run until shutdown criteria
are met or as a run-once instance
:param cooldown: Period of time to allow service to run before automatic
shutdown, in seconds. A non-zero, positive integer.
:param loop_delay: Delay between iterations of the event loop (in seconds)
:param health_check_frequency: Delay between calls to a
health check handler (in seconds)
:param as_service: Determines if the host runs continuously until
shutdown criteria are met, or executes the service lifecycle once and exits
:param cooldown: Period of time (in seconds) to allow the service to run
after a shutdown is permitted. Enables the service to avoid restarting if
new work is discovered. A value of 0 disables the cooldown.
:param loop_delay: Time (in seconds) between iterations of the event loop
:param health_check_frequency: Time (in seconds) between calls to a
health check handler. A value of 0 triggers the health check on every
iteration.
"""
self._as_service = as_service
"""If the service should run until shutdown function returns True"""
Expand All @@ -64,8 +66,8 @@ def __init__(
self._loop_delay = abs(loop_delay)
"""Forced delay between iterations of the event loop"""
self._health_check_frequency = health_check_frequency
"""The time (in seconds) between desired health checks. A health check
frequency of zero will never trigger the health check."""
"""The time (in seconds) between desired health checks. Frequency of 0
will trigger the health check on every event loop iteration."""
self._last_health_check = time.time()
"""The timestamp of the latest health check"""

Expand Down Expand Up @@ -135,7 +137,7 @@ def execute(self) -> None:
"Failure in event loop resulted in service termination"
)

if self._health_check_frequency > 0:
if self._health_check_frequency >= 0:
hc_elapsed = time.time() - self._last_health_check
if hc_elapsed >= self._health_check_frequency:
self._on_health_check()
Expand Down
23 changes: 20 additions & 3 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@
import dragon.native.machine as dragon_machine

from smartsim._core.launcher.dragon.pqueue import NodePrioritizer, PrioritizerFilter
from smartsim._core.mli.infrastructure.control.event_listener import (
from smartsim._core.mli.infrastructure.control.listener import (

Check warning on line 51 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L51

Added line #L51 was not covered by tests
ConsumerRegistrationListener,
)
from smartsim._core.mli.infrastructure.storage.backbone_feature_store import (

Check warning on line 54 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L54

Added line #L54 was not covered by tests
BackboneFeatureStore,
)
from smartsim._core.mli.infrastructure.storage.dragon_util import create_ddict
from smartsim.error.errors import SmartSimError

Check warning on line 58 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L57-L58

Added lines #L57 - L58 were not covered by tests

# pylint: enable=import-error
# isort: on
Expand All @@ -82,8 +82,8 @@


class DragonStatus(str, Enum):
ERROR = "Error"
RUNNING = "Running"

Check warning on line 86 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L85-L86

Added lines #L85 - L86 were not covered by tests

def __str__(self) -> str:
return self.value
Expand Down Expand Up @@ -157,9 +157,10 @@
by threads spawned by it.
"""

_DEFAULT_NUM_MGR_PER_NODE = 2
"""The default number of manager processes for each feature store node"""
_DEFAULT_MEM_PER_NODE = 256 * 1024**2
"""The default memory capacity to allocate for a feaure store node (in megabytes)"""

Check warning on line 163 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L160-L163

Added lines #L160 - L163 were not covered by tests

def __init__(self, pid: int) -> None:
self._pid = pid
Expand Down Expand Up @@ -194,12 +195,12 @@
"""Whether the server frontend should shut down when the backend does"""
self._shutdown_initiation_time: t.Optional[float] = None
"""The time at which the server initiated shutdown"""
self._cooldown_period = self._initialize_cooldown()
"""Time in seconds needed by the server to complete shutdown"""
self._backbone: t.Optional[BackboneFeatureStore] = None
"""The backbone feature store"""
self._listener: t.Optional[dragon_process.Process] = None
"""The standalone process executing the event consumer"""

Check warning on line 203 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L198-L203

Added lines #L198 - L203 were not covered by tests

self._nodes: t.List["dragon_machine.Node"] = []
"""Node capability information for hosts in the allocation"""
Expand Down Expand Up @@ -264,8 +265,8 @@

:returns: a status message
"""
view = DragonBackendView(self)
return "Dragon server backend update\n" f"{view.host_table}\n{view.step_table}"

Check warning on line 269 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L268-L269

Added lines #L268 - L269 were not covered by tests

def _heartbeat(self) -> None:
"""Update the value of the last heartbeat to the current time."""
Expand Down Expand Up @@ -548,74 +549,83 @@
self._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED
self._group_infos[step_id].return_codes = [-9]

def _create_backbone(self) -> BackboneFeatureStore:

Check warning on line 552 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L552

Added line #L552 was not covered by tests
"""
Create a BackboneFeatureStore if one does not exist.
Creates a BackboneFeatureStore if one does not exist. Updates
environment variables of this process to include the backbone
descriptor.

:returns: The descriptor of the backbone feature store
"""
if self._backbone is None:
backbone_storage = create_ddict(

Check warning on line 561 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L560-L561

Added lines #L560 - L561 were not covered by tests
len(self._hosts),
self._DEFAULT_NUM_MGR_PER_NODE,
self._DEFAULT_MEM_PER_NODE,
)

self._backbone = BackboneFeatureStore(

Check warning on line 567 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L567

Added line #L567 was not covered by tests
backbone_storage, allow_reserved_writes=True
)

# put the backbone descriptor in the env vars
os.environ.update(self._backbone.get_env())

Check warning on line 572 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L572

Added line #L572 was not covered by tests

return self._backbone

Check warning on line 574 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L574

Added line #L574 was not covered by tests

@staticmethod
def _initialize_cooldown() -> int:

Check warning on line 577 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L576-L577

Added lines #L576 - L577 were not covered by tests
"""Load environment configuration and determine the correct cooldown
period to apply to the backend process.

:returns: The calculated cooldown (in seconds)
"""
smartsim_config = get_config()
return (

Check warning on line 584 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L583-L584

Added lines #L583 - L584 were not covered by tests
smartsim_config.telemetry_frequency * 2 + 5
if smartsim_config.telemetry_enabled
else 5
)

def start_event_listener(

Check warning on line 590 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L590

Added line #L590 was not covered by tests
self, cpu_affinity: list[int], gpu_affinity: list[int]
) -> dragon_process.Process:
"""Start a standalone event listener.

:param cpu_affinity: The CPU affinity for the process
:param gpu_affinity: The CPU affinity for the process
:returns: The dragon Process managing the process
:raises SmartSimError: If the backbone is not provided
"""
if self._backbone is None:
raise SmartSimError("Backbone feature store is not available")

Check warning on line 601 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L600-L601

Added lines #L600 - L601 were not covered by tests

service = ConsumerRegistrationListener(

Check warning on line 603 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L603

Added line #L603 was not covered by tests
self._backbone, 1.0, 2.0, as_service=True, health_check_frequency=90
)

options = dragon_process_desc.ProcessOptions(make_inf_channels=True)
local_policy = dragon_policy.Policy(

Check warning on line 608 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L607-L608

Added lines #L607 - L608 were not covered by tests
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=socket.gethostname(),
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
)
process = dragon_process.Process(

Check warning on line 614 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L614

Added line #L614 was not covered by tests
target=service.execute,
args=[],
cwd=os.getcwd(),
env={
**os.environ,
**(self._backbone.get_env() if self._backbone is not None else {}),
**self._backbone.get_env(),
},
policy=local_policy,
options=options,
stderr=dragon_process.Popen.STDOUT,
stdout=dragon_process.Popen.STDOUT,
)
process.start()
return process

Check warning on line 628 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L627-L628

Added lines #L627 - L628 were not covered by tests

@staticmethod
def create_run_policy(
Expand Down Expand Up @@ -657,6 +667,7 @@
)

def _start_steps(self) -> None:
"""Start all new steps created since the last update."""
self._heartbeat()

with self._queue_lock:
Expand Down Expand Up @@ -821,6 +832,9 @@
group_info.redir_workers = None

def _update_shutdown_status(self) -> None:
"""Query the status of running tasks and update the status
of any that have completed.
"""
self._heartbeat()
with self._queue_lock:
self._can_shutdown |= (
Expand All @@ -834,6 +848,9 @@
)

def _should_print_status(self) -> bool:
"""Determine if status messages should be printed based off the last
update. Returns `True` to trigger prints, `False` otherwise.
"""
if self.current_time - self._last_update_time > 10:
self._last_update_time = self.current_time
return True
Expand All @@ -841,7 +858,7 @@

def _update(self) -> None:
"""Trigger all update queries and update local state database"""
self._create_backbone()

Check warning on line 861 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L861

Added line #L861 was not covered by tests

self._stop_steps()
self._start_steps()
Expand All @@ -850,8 +867,8 @@

def _kill_all_running_jobs(self) -> None:
with self._queue_lock:
if self._listener and self._listener.is_alive:
self._listener.kill()

Check warning on line 871 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L870-L871

Added lines #L870 - L871 were not covered by tests

for step_id, group_info in self._group_infos.items():
if group_info.status not in TERMINAL_STATUSES:
Expand Down Expand Up @@ -940,7 +957,7 @@
self._backend = backend
"""A dragon backend used to produce the view"""

logger.debug(self.host_desc)

Check warning on line 960 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L960

Added line #L960 was not covered by tests

@property
def host_desc(self) -> str:
Expand Down
3 changes: 3 additions & 0 deletions smartsim/_core/launcher/dragon/dragonConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@

with open(config.dragon_dotenv, encoding="utf-8") as dot_env:
for kvp in dot_env.readlines():
if not kvp:
continue

Check warning on line 249 in smartsim/_core/launcher/dragon/dragonConnector.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonConnector.py#L249

Added line #L249 was not covered by tests

# skip any commented lines
if not kvp.startswith("#"):
split = kvp.strip().split("=", maxsplit=1)
Expand Down
8 changes: 0 additions & 8 deletions smartsim/_core/mli/comm/channel/dragon_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@

logger = get_logger(__name__)

DEFAULT_CHANNEL_BUFFER_SIZE = 500
"""Maximum number of messages that can be buffered. DragonCommChannel will
raise an exception if no clients consume messages before the buffer is filled."""

LAST_OFFSET = 0
"""The last offset used to create a local channel. This is used to avoid
unnecessary retries when creating a local channel."""


class DragonCommChannel(cch.CommChannelBase):
"""Passes messages by writing to a Dragon channel."""
Expand Down
8 changes: 5 additions & 3 deletions smartsim/_core/mli/comm/channel/dragon_fli.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(
) -> None:
"""Initialize the DragonFLIChannel instance.

:param fli_desc: The descriptor of the FLI channel to attach
:param fli_: The FLIInterface to use as the underlying communications channel
:param sender_supplied: Flag indicating if the FLI uses sender-supplied streams
:param buffer_size: Maximum number of sent messages that can be buffered
"""
Expand Down Expand Up @@ -79,7 +79,7 @@ def send(self, value: bytes, timeout: float = 0.001) -> None:
logger.debug(f"DragonFLIChannel {self.descriptor} sent message")
except Exception as e:
raise SmartSimError(
f"Error sending message: DragonFLIChannel {self.descriptor}"
f"Error sending via DragonFLIChannel {self.descriptor}"
) from e

def recv(self, timeout: float = 0.001) -> t.List[bytes]:
Expand All @@ -99,6 +99,7 @@ def recv(self, timeout: float = 0.001) -> t.List[bytes]:
logger.debug(f"DragonFLIChannel {self.descriptor} received message")
except fli.FLIEOT:
eot = True
logger.debug(f"DragonFLIChannel exhausted: {self.descriptor}")
except Exception as e:
raise SmartSimError(
f"Error receiving messages: DragonFLIChannel {self.descriptor}"
Expand Down Expand Up @@ -134,7 +135,8 @@ def from_descriptor(

:param descriptor: The descriptor that uniquely identifies the resource
:returns: An attached DragonFLIChannel
:raises SmartSimError: If creation of DragonFLIChanenel fails
:raises SmartSimError: If creation of DragonFLIChannel fails
:raises ValueError: If the descriptor is invalid
"""
if not descriptor:
raise ValueError("Invalid descriptor provided")
Expand Down
53 changes: 10 additions & 43 deletions smartsim/_core/mli/comm/channel/dragon_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@

import dragon.channels as dch
import dragon.fli as fli
import dragon.infrastructure.facts as df
import dragon.infrastructure.parameters as dp
import dragon.managed_memory as dm
import dragon.utils as du

from smartsim.error.errors import SmartSimError
from smartsim.log import get_logger
Expand All @@ -54,10 +51,10 @@ def channel_to_descriptor(channel: t.Union[dch.Channel, fli.FLInterface]) -> str

:param channel: The dragon channel to convert
:returns: The descriptor string
:raises SmartSimError: If a dragon channel is not provided
:raises ValueError: If a dragon channel is not provided
"""
if channel is None:
raise SmartSimError("Channel is not available to create a descriptor")
raise ValueError("Channel is not available to create a descriptor")

serialized_ch = channel.serialize()
return base64.b64encode(serialized_ch).decode("utf-8")
Expand All @@ -67,9 +64,11 @@ def pool_to_descriptor(pool: dm.MemoryPool) -> str:
"""Convert a dragon memory pool to a descriptor string.

:param pool: The memory pool to convert
:returns: The descriptor string"""
:returns: The descriptor string
:raises ValueError: If a memory pool is not provided
"""
if pool is None:
raise SmartSimError("Memory pool is not available to create a descriptor")
raise ValueError("Memory pool is not available to create a descriptor")

serialized_pool = pool.serialize()
return base64.b64encode(serialized_pool).decode("utf-8")
Expand All @@ -82,6 +81,7 @@ def descriptor_to_fli(descriptor: str) -> "fli.FLInterface":
:param descriptor: The descriptor of an FLI to attach to
:returns: The attached dragon FLI
:raises ValueError: If the descriptor is empty or incorrectly formatted
:raises SmartSimError: If attachment using the descriptor fails
"""
if len(descriptor) < 1:
raise ValueError("Descriptors may not be empty")
Expand All @@ -103,7 +103,8 @@ def descriptor_to_channel(descriptor: str) -> dch.Channel:
:param descriptor: The descriptor of a channel to attach to
:returns: The attached dragon Channel
:raises ValueError: If the descriptor is empty or incorrectly formatted
:raises SmartSimError: If the descriptor does not attach to a channel"""
:raises SmartSimError: If attachment using the descriptor fails
"""
if len(descriptor) < 1:
raise ValueError("Descriptors may not be empty")

Expand All @@ -122,43 +123,9 @@ def create_local(_capacity: int = 0) -> dch.Channel:
direct calls to `dch.Channel.make_process_local()` to enable
supplying a channel capacity.

:param capacity: The number of events the channel can buffer; uses the default
:param _capacity: The number of events the channel can buffer; uses the default
buffer size `DEFAULT_CHANNEL_BUFFER_SIZE` when not supplied
:returns: The instantiated channel
:raises SmartSimError: If unable to attach local channel
"""
# current implementation has a bug wrt MPI that must be fixed.
# falling back to `make_process_local` and disabling buffer size tests

# pool = dm.MemoryPool.attach(du.B64.str_to_bytes(dp.this_process.default_pd))
# pool_descriptor = pool_to_descriptor(pool)
# channel: t.Optional[dch.Channel] = None
# offset = 0

# global LAST_OFFSET
# if LAST_OFFSET:
# offset = LAST_OFFSET

# capacity = capacity if capacity > 0 else DEFAULT_CHANNEL_BUFFER_SIZE

# while not channel:
# # search for an open channel ID
# offset += 1
# channel_id = df.BASE_USER_MANAGED_CUID + offset
# try:
# channel = dch.Channel(mem_pool=pool, c_uid=channel_id, capacity=capacity)
# LAST_OFFSET = offset
# descriptor = channel_to_descriptor(channel)
# logger.debug(
# "Local channel created: "
# f"{channel_id=}, {pool_descriptor=}, {capacity=}, {descriptor=}"
# )
# except dch.ChannelError as e:
# if offset < 100:
# logger.warning(f"Channnel id `{channel_id}` is not open. Retrying...")
# else:
# LAST_OFFSET = 0
# logger.error(f"All attempts to attach local channel have failed")
# raise SmartSimError("Failed to attach local channel") from e
channel = dch.Channel.make_process_local()
return channel
Empty file.
Loading
Loading