Skip to content

Commit

Permalink
test eventing end-to-end in single process
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Sep 25, 2024
1 parent 44be6a8 commit 29d1fe0
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 217 deletions.
46 changes: 25 additions & 21 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import functools
import itertools
import multiprocessing as mp
import os
import time
import typing as t
from dataclasses import dataclass, field
Expand All @@ -48,13 +49,17 @@
import dragon.native.machine as dragon_machine

from smartsim._core.launcher.dragon.pqueue import NodePrioritizer, PrioritizerFilter
from smartsim._core.mli.comm.channel.dragon_channel import DragonCommChannel
from smartsim._core.mli.comm.channel.dragon_channel import (
DragonCommChannel,
create_local,
)
from smartsim._core.mli.infrastructure.storage.backbone_feature_store import (
BackboneFeatureStore,
EventBase,
# EventBroadcaster,
EventCategory,
EventConsumer,
OnCreateConsumer,
)

# pylint: enable=import-error
Expand Down Expand Up @@ -572,12 +577,21 @@ def _create_backbone(self) -> BackboneFeatureStore:
self._backbone = BackboneFeatureStore(
backbone_storage, allow_reserved_writes=True
)

# put the backbone descriptor in the env vars
os.environ.update(self._backbone.get_env())
logger.info(self._backbone.creation_date)

return self._backbone

def _on_consumer_created(self, event: EventBase) -> None:
"""Event handler for"""
if isinstance(event, OnCreateConsumer) and self._backbone is not None:
notify_list = set(self._backbone.notification_channels)
notify_list.add(event.descriptor)
self._backbone.notification_channels = list(notify_list)
return

logger.warning(f"Unhandled event received: {event}")

def _bootstrap_event_listeners(
Expand All @@ -591,7 +605,7 @@ def _bootstrap_event_listeners(

# Update directly to avoid SEND/ACK pattern
notify_descriptors.append(consumer.descriptor)
# consumer.register() # this will loop infinitely waiting for itself
notify_descriptors = list(set(notify_descriptors))

backbone.notification_channels = notify_descriptors

Expand All @@ -605,41 +619,31 @@ def _create_eventing(self, backbone: BackboneFeatureStore) -> EventConsumer:
attempting to connect any eventing clients.
:returns: The newly created EventConsumer instance
"""
# if self._event_producer is None:
# logger.info("Creating event publisher")
# # todo: ensure DCC.from_descriptor and not DCC.from_local
# self._event_producer =
# EventBroadcaster(backbone, DragonCommChannel.from_descriptor)
# logger.info("Created event publisher")

if self._event_consumer is None:
logger.info("Creating event consumer")
event_channel = DragonCommChannel.from_local()
dragon_channel = create_local(500)
event_channel = DragonCommChannel(dragon_channel)
consumer = EventConsumer(
event_channel,
backbone,
[EventCategory.CONSUMER_CREATED],
name="BackendConsumerRegistrar",
event_handler=self._on_consumer_created,
)
consumer.register()
logger.info(f"Consumer `{consumer.name}` registration completed.")

# self._backbone.backend_channel =
# consumer.descriptor # i want to get rid of this extra channel
# self._bootstrap_event_listeners(backbone, consumer)
self._event_consumer = consumer

logger.info("Created event consumer")
backbone[BackboneFeatureStore.MLI_BACKEND_CONSUMER] = consumer.descriptor
logger.info(f"Backend consumer `{consumer.name}` created.")

return self._event_consumer

def listen_to_registrations(self, timeout: float = 0.001) -> None:
if self._event_consumer is not None:
self._event_consumer.listen_once(timeout)

def _start_eventing_listeners(self) -> None:
if self._event_consumer:
self._event_consumer_process = mp.Process(
target=self._event_consumer.listen
)
self._event_consumer_process.start()
# todo: start external listener entrypoint

@staticmethod
def create_run_policy(
Expand Down
4 changes: 2 additions & 2 deletions smartsim/_core/mli/comm/channel/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__(
"""A user-friendly identifier for channel-related logging"""

@abstractmethod
def send(self, value: bytes, timeout: float = 0) -> None:
def send(self, value: bytes, timeout: float = 0.001) -> None:
"""Send a message through the underlying communication channel.
:param value: The value to send
Expand All @@ -61,7 +61,7 @@ def send(self, value: bytes, timeout: float = 0) -> None:
"""

@abstractmethod
def recv(self, timeout: float = 0) -> t.List[bytes]:
def recv(self, timeout: float = 0.001) -> t.List[bytes]:
"""Receives message(s) through the underlying communication channel.
:param timeout: Maximum time to wait (in seconds) for messages to arrive
Expand Down
2 changes: 1 addition & 1 deletion smartsim/_core/mli/comm/channel/dragon_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def send(self, value: bytes, timeout: float = 0.001) -> None:
"""
try:
with self._channel.sendh(timeout=timeout) as sendh:
sendh.send_bytes(value)
sendh.send_bytes(value, blocking=False)
logger.debug(f"DragonCommChannel {self.descriptor} sent message")
except Exception as e:
raise SmartSimError(
Expand Down
75 changes: 33 additions & 42 deletions smartsim/_core/mli/infrastructure/storage/backbone_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,16 @@ def backend_channel(self) -> t.Optional[str]:
"""Retrieve the channel descriptor exposed by the MLI backend for events
:returns: a stringified channel descriptor"""
if self.MLI_NOTIFY_CONSUMERS in self:
return str(self[self.MLI_NOTIFY_CONSUMERS])
if self.MLI_BACKEND_CONSUMER in self:
return str(self[self.MLI_BACKEND_CONSUMER])
return None

@backend_channel.setter
def backend_channel(self, value: str) -> None:
"""Set the channel exposed by the MLI backend for events
:param value: a stringified channel descriptor"""
self[self.MLI_NOTIFY_CONSUMERS] = value
self[self.MLI_BACKEND_CONSUMER] = value

@property
def worker_queue(self) -> t.Optional[str]:
Expand Down Expand Up @@ -165,8 +165,7 @@ def _record_creation_data(self) -> None:
)
self[self._CREATED_ON] = str(time.time())

if os.environ.get(BackboneFeatureStore.MLI_BACKBONE, None) is None:
os.environ.update(self.get_env())
os.environ[self.MLI_BACKBONE] = self.descriptor

@classmethod
def from_writable_descriptor(
Expand Down Expand Up @@ -479,7 +478,7 @@ def _get_comm_channel(self, descriptor: str) -> CommChannelBase:
logger.error(msg, exc_info=True)
raise SmartSimError(msg) from ex

def _get_next_event_event(self) -> t.Optional[EventBase]:
def _get_next_event(self) -> t.Optional[EventBase]:
"""Pop the next event to be sent from the queue.
:returns: The next event to send if any events are enqueued, otherwise `None`.
Expand Down Expand Up @@ -512,7 +511,7 @@ def _broadcast(self, timeout: float = 0.001) -> int:
num_listeners = len(self._descriptors)

# send each event to every consumer
while event := self._get_next_event_event():
while event := self._get_next_event():
logger.debug(f"Broadcasting {event=} to {num_listeners} listeners")
event_bytes = bytes(event)

Expand All @@ -524,7 +523,7 @@ def _broadcast(self, timeout: float = 0.001) -> int:
num_sent += 1
except Exception as ex:
raise SmartSimError(
f"Broadcast {i}/{num_listeners} for event {event.uid} to "
f"Broadcast {i+1}/{num_listeners} for event {event.uid} to "
f"channel {descriptor} from {self._uid} failed."
) from ex

Expand All @@ -547,6 +546,7 @@ def send(self, event: EventBase, timeout: float = 0.001) -> int:
except (KeyError, ValueError, SmartSimError):
raise
except Exception as ex:
logger.exception("An unexpected exception occurred while sending")
raise SmartSimError("An unexpected failure occurred while sending") from ex


Expand Down Expand Up @@ -600,8 +600,8 @@ def name(self) -> str:
self._name = str(uuid.uuid4())
return self._name

def receive(
self, filters: t.Optional[t.List[EventCategory]] = None, timeout: float = 0
def recv(
self, filters: t.Optional[t.List[EventCategory]] = None, timeout: float = 0.001
) -> t.List[EventBase]:
"""Receives available published event(s).
Expand Down Expand Up @@ -648,44 +648,35 @@ def receive(

def register(self) -> None:
"""Send an event to register this consumer as a listener"""
awaiting_confirmation = True
descriptor = self._comm_channel.descriptor
backoffs = itertools.cycle((0.1, 0.2, 0.4, 0.8))
event = OnCreateConsumer(descriptor, self._global_filters)

# create a temporary publisher to broadcast my own existence.
publisher = EventBroadcaster(self._backbone, DragonCommChannel.from_local)

# we're going to sit in this loop to wait for the backbone to get
# updated with the registration (to avoid SEND/ACK)
while awaiting_confirmation:
registered_channels = self._backbone.notification_channels
# todo: this should probably be descriptor_string? maybe i need to
# get rid of descriptor as bytes or just make desc_string required in ABC
if descriptor in registered_channels:
awaiting_confirmation = False
registrar_key = BackboneFeatureStore.MLI_BACKEND_CONSUMER
config = self._backbone.wait_for([registrar_key], 2.0)

time.sleep(next(backoffs))
registrar_descriptor = str(config.get(registrar_key, None))

# if backend_descriptor := self._backbone.backend_channel:
# backend_channel = DragonCommChannel.
# from_descriptor(backend_descriptor)
# backend = EventSender(self._backbone, backend_channel)
# backend.send(event)
if registrar_descriptor:
logger.debug(f"Sending registration for {self.name}")

# broadcast that this consumer is now ready to mingle
publisher = EventBroadcaster(self._backbone, DragonCommChannel.from_local)
publisher.send(event, timeout=0.01)
registrar_channel = DragonCommChannel.from_descriptor(registrar_descriptor)
registrar_channel.send(bytes(event), timeout=1.0)

# def register_callback(self, callback: t.Callable[[EventBase], None]) -> None: ...
logger.debug(f"Registration for {self.name} sent")
else:
logger.warning("Unable to register. No registrar channel found.")

def listen(self) -> None:
def listen_once(self, timeout: float = 0.001) -> None:
"""Function to handle incoming events"""
print("starting listener...")

while True:
print("awaiting new message")
incoming_messages = self.receive()
for message in incoming_messages:
if self._event_handler:
self._event_handler(message)
logger.debug(f"Starting event listener with {timeout} second timeout")
logger.debug("Awaiting new messages")

incoming_messages = self.recv(timeout=timeout)

if not incoming_messages:
logger.debug("Consumer received empty message list.")

for message in incoming_messages:
logger.debug(f"Sending event {message=} to handler.")
if self._event_handler:
self._event_handler(message)
Loading

0 comments on commit 29d1fe0

Please sign in to comment.