diff --git a/smartsim/_core/launcher/dragon/dragonBackend.py b/smartsim/_core/launcher/dragon/dragonBackend.py index 577b951196..434e1a8ed0 100644 --- a/smartsim/_core/launcher/dragon/dragonBackend.py +++ b/smartsim/_core/launcher/dragon/dragonBackend.py @@ -27,6 +27,7 @@ import functools import itertools import multiprocessing as mp +import os import time import typing as t from dataclasses import dataclass, field @@ -48,13 +49,17 @@ import dragon.native.machine as dragon_machine from smartsim._core.launcher.dragon.pqueue import NodePrioritizer, PrioritizerFilter -from smartsim._core.mli.comm.channel.dragon_channel import DragonCommChannel +from smartsim._core.mli.comm.channel.dragon_channel import ( + DragonCommChannel, + create_local, +) from smartsim._core.mli.infrastructure.storage.backbone_feature_store import ( BackboneFeatureStore, EventBase, # EventBroadcaster, EventCategory, EventConsumer, + OnCreateConsumer, ) # pylint: enable=import-error @@ -572,12 +577,21 @@ def _create_backbone(self) -> BackboneFeatureStore: self._backbone = BackboneFeatureStore( backbone_storage, allow_reserved_writes=True ) + + # put the backbone descriptor in the env vars + os.environ.update(self._backbone.get_env()) logger.info(self._backbone.creation_date) return self._backbone def _on_consumer_created(self, event: EventBase) -> None: """Event handler for""" + if isinstance(event, OnCreateConsumer) and self._backbone is not None: + notify_list = set(self._backbone.notification_channels) + notify_list.add(event.descriptor) + self._backbone.notification_channels = list(notify_list) + return + logger.warning(f"Unhandled event received: {event}") def _bootstrap_event_listeners( @@ -591,7 +605,7 @@ def _bootstrap_event_listeners( # Update directly to avoid SEND/ACK pattern notify_descriptors.append(consumer.descriptor) - # consumer.register() # this will loop infinitely waiting for itself + notify_descriptors = list(set(notify_descriptors)) backbone.notification_channels = notify_descriptors @@ -605,16 +619,11 @@ def _create_eventing(self, backbone: BackboneFeatureStore) -> EventConsumer: attempting to connect any eventing clients. :returns: The newly created EventConsumer instance """ - # if self._event_producer is None: - # logger.info("Creating event publisher") - # # todo: ensure DCC.from_descriptor and not DCC.from_local - # self._event_producer = - # EventBroadcaster(backbone, DragonCommChannel.from_descriptor) - # logger.info("Created event publisher") if self._event_consumer is None: logger.info("Creating event consumer") - event_channel = DragonCommChannel.from_local() + dragon_channel = create_local(500) + event_channel = DragonCommChannel(dragon_channel) consumer = EventConsumer( event_channel, backbone, @@ -622,24 +631,19 @@ def _create_eventing(self, backbone: BackboneFeatureStore) -> EventConsumer: name="BackendConsumerRegistrar", event_handler=self._on_consumer_created, ) - consumer.register() - logger.info(f"Consumer `{consumer.name}` registration completed.") - # self._backbone.backend_channel = - # consumer.descriptor # i want to get rid of this extra channel - # self._bootstrap_event_listeners(backbone, consumer) self._event_consumer = consumer - - logger.info("Created event consumer") + backbone[BackboneFeatureStore.MLI_BACKEND_CONSUMER] = consumer.descriptor + logger.info(f"Backend consumer `{consumer.name}` created.") return self._event_consumer + def listen_to_registrations(self, timeout: float = 0.001) -> None: + if self._event_consumer is not None: + self._event_consumer.listen_once(timeout) + def _start_eventing_listeners(self) -> None: - if self._event_consumer: - self._event_consumer_process = mp.Process( - target=self._event_consumer.listen - ) - self._event_consumer_process.start() + # todo: start external listener entrypoint @staticmethod def create_run_policy( diff --git a/smartsim/_core/mli/comm/channel/channel.py b/smartsim/_core/mli/comm/channel/channel.py index bfa1c50fbc..a581e8e2a2 100644 --- a/smartsim/_core/mli/comm/channel/channel.py +++ b/smartsim/_core/mli/comm/channel/channel.py @@ -52,7 +52,7 @@ def __init__( """A user-friendly identifier for channel-related logging""" @abstractmethod - def send(self, value: bytes, timeout: float = 0) -> None: + def send(self, value: bytes, timeout: float = 0.001) -> None: """Send a message through the underlying communication channel. :param value: The value to send @@ -61,7 +61,7 @@ def send(self, value: bytes, timeout: float = 0) -> None: """ @abstractmethod - def recv(self, timeout: float = 0) -> t.List[bytes]: + def recv(self, timeout: float = 0.001) -> t.List[bytes]: """Receives message(s) through the underlying communication channel. :param timeout: Maximum time to wait (in seconds) for messages to arrive diff --git a/smartsim/_core/mli/comm/channel/dragon_channel.py b/smartsim/_core/mli/comm/channel/dragon_channel.py index 0b73080d6f..9c0ac3423a 100644 --- a/smartsim/_core/mli/comm/channel/dragon_channel.py +++ b/smartsim/_core/mli/comm/channel/dragon_channel.py @@ -147,7 +147,7 @@ def send(self, value: bytes, timeout: float = 0.001) -> None: """ try: with self._channel.sendh(timeout=timeout) as sendh: - sendh.send_bytes(value) + sendh.send_bytes(value, blocking=False) logger.debug(f"DragonCommChannel {self.descriptor} sent message") except Exception as e: raise SmartSimError( diff --git a/smartsim/_core/mli/infrastructure/storage/backbone_feature_store.py b/smartsim/_core/mli/infrastructure/storage/backbone_feature_store.py index 1110dc8125..83c255fe78 100644 --- a/smartsim/_core/mli/infrastructure/storage/backbone_feature_store.py +++ b/smartsim/_core/mli/infrastructure/storage/backbone_feature_store.py @@ -122,8 +122,8 @@ def backend_channel(self) -> t.Optional[str]: """Retrieve the channel descriptor exposed by the MLI backend for events :returns: a stringified channel descriptor""" - if self.MLI_NOTIFY_CONSUMERS in self: - return str(self[self.MLI_NOTIFY_CONSUMERS]) + if self.MLI_BACKEND_CONSUMER in self: + return str(self[self.MLI_BACKEND_CONSUMER]) return None @backend_channel.setter @@ -131,7 +131,7 @@ def backend_channel(self, value: str) -> None: """Set the channel exposed by the MLI backend for events :param value: a stringified channel descriptor""" - self[self.MLI_NOTIFY_CONSUMERS] = value + self[self.MLI_BACKEND_CONSUMER] = value @property def worker_queue(self) -> t.Optional[str]: @@ -165,8 +165,7 @@ def _record_creation_data(self) -> None: ) self[self._CREATED_ON] = str(time.time()) - if os.environ.get(BackboneFeatureStore.MLI_BACKBONE, None) is None: - os.environ.update(self.get_env()) + os.environ[self.MLI_BACKBONE] = self.descriptor @classmethod def from_writable_descriptor( @@ -479,7 +478,7 @@ def _get_comm_channel(self, descriptor: str) -> CommChannelBase: logger.error(msg, exc_info=True) raise SmartSimError(msg) from ex - def _get_next_event_event(self) -> t.Optional[EventBase]: + def _get_next_event(self) -> t.Optional[EventBase]: """Pop the next event to be sent from the queue. :returns: The next event to send if any events are enqueued, otherwise `None`. @@ -512,7 +511,7 @@ def _broadcast(self, timeout: float = 0.001) -> int: num_listeners = len(self._descriptors) # send each event to every consumer - while event := self._get_next_event_event(): + while event := self._get_next_event(): logger.debug(f"Broadcasting {event=} to {num_listeners} listeners") event_bytes = bytes(event) @@ -524,7 +523,7 @@ def _broadcast(self, timeout: float = 0.001) -> int: num_sent += 1 except Exception as ex: raise SmartSimError( - f"Broadcast {i}/{num_listeners} for event {event.uid} to " + f"Broadcast {i+1}/{num_listeners} for event {event.uid} to " f"channel {descriptor} from {self._uid} failed." ) from ex @@ -547,6 +546,7 @@ def send(self, event: EventBase, timeout: float = 0.001) -> int: except (KeyError, ValueError, SmartSimError): raise except Exception as ex: + logger.exception("An unexpected exception occurred while sending") raise SmartSimError("An unexpected failure occurred while sending") from ex @@ -600,8 +600,8 @@ def name(self) -> str: self._name = str(uuid.uuid4()) return self._name - def receive( - self, filters: t.Optional[t.List[EventCategory]] = None, timeout: float = 0 + def recv( + self, filters: t.Optional[t.List[EventCategory]] = None, timeout: float = 0.001 ) -> t.List[EventBase]: """Receives available published event(s). @@ -648,44 +648,35 @@ def receive( def register(self) -> None: """Send an event to register this consumer as a listener""" - awaiting_confirmation = True descriptor = self._comm_channel.descriptor - backoffs = itertools.cycle((0.1, 0.2, 0.4, 0.8)) event = OnCreateConsumer(descriptor, self._global_filters) - # create a temporary publisher to broadcast my own existence. - publisher = EventBroadcaster(self._backbone, DragonCommChannel.from_local) - - # we're going to sit in this loop to wait for the backbone to get - # updated with the registration (to avoid SEND/ACK) - while awaiting_confirmation: - registered_channels = self._backbone.notification_channels - # todo: this should probably be descriptor_string? maybe i need to - # get rid of descriptor as bytes or just make desc_string required in ABC - if descriptor in registered_channels: - awaiting_confirmation = False + registrar_key = BackboneFeatureStore.MLI_BACKEND_CONSUMER + config = self._backbone.wait_for([registrar_key], 2.0) - time.sleep(next(backoffs)) + registrar_descriptor = str(config.get(registrar_key, None)) - # if backend_descriptor := self._backbone.backend_channel: - # backend_channel = DragonCommChannel. - # from_descriptor(backend_descriptor) - # backend = EventSender(self._backbone, backend_channel) - # backend.send(event) + if registrar_descriptor: + logger.debug(f"Sending registration for {self.name}") - # broadcast that this consumer is now ready to mingle - publisher = EventBroadcaster(self._backbone, DragonCommChannel.from_local) - publisher.send(event, timeout=0.01) + registrar_channel = DragonCommChannel.from_descriptor(registrar_descriptor) + registrar_channel.send(bytes(event), timeout=1.0) - # def register_callback(self, callback: t.Callable[[EventBase], None]) -> None: ... + logger.debug(f"Registration for {self.name} sent") + else: + logger.warning("Unable to register. No registrar channel found.") - def listen(self) -> None: + def listen_once(self, timeout: float = 0.001) -> None: """Function to handle incoming events""" - print("starting listener...") - - while True: - print("awaiting new message") - incoming_messages = self.receive() - for message in incoming_messages: - if self._event_handler: - self._event_handler(message) + logger.debug(f"Starting event listener with {timeout} second timeout") + logger.debug("Awaiting new messages") + + incoming_messages = self.recv(timeout=timeout) + + if not incoming_messages: + logger.debug("Consumer received empty message list.") + + for message in incoming_messages: + logger.debug(f"Sending event {message=} to handler.") + if self._event_handler: + self._event_handler(message) diff --git a/tests/dragon/test_dragon_backend.py b/tests/dragon/test_dragon_backend.py index a4e61d4307..0631e11e6c 100644 --- a/tests/dragon/test_dragon_backend.py +++ b/tests/dragon/test_dragon_backend.py @@ -24,151 +24,197 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import os +import typing as t import unittest.mock as mock import pytest -# from smartsim._core.launcher.dragon.dragonBackend import DragonBackend, NodePrioritizer -# from smartsim._core.mli.infrastructure.storage.backbone_feature_store import EventSender, OnCreateConsumer - -# dragon = pytest.importorskip("dragon") - -# import dragon.utils as du -# from dragon.channels import Channel -# from dragon.data.ddict.ddict import DDict -# from dragon.fli import DragonFLIError, FLInterface - -# from smartsim._core.mli.comm.channel.dragon_channel import DragonCommChannel -# from smartsim._core.mli.comm.channel.dragon_fli import DragonFLIChannel -# from smartsim._core.mli.infrastructure.environment_loader import EnvironmentConfigLoader -# from smartsim._core.mli.infrastructure.storage.dragon_feature_store import ( -# DragonFeatureStore, -# ) +from smartsim._core.launcher.dragon.dragonBackend import DragonBackend, NodePrioritizer +from smartsim._core.mli.infrastructure.storage.backbone_feature_store import ( + BackboneFeatureStore, + EventBase, + EventBroadcaster, + EventConsumer, + EventSender, + OnCreateConsumer, +) +from smartsim.log import get_logger + +dragon = pytest.importorskip("dragon") + +import dragon.utils as du +from dragon.channels import Channel +from dragon.data.ddict.ddict import DDict +from dragon.fli import DragonFLIError, FLInterface + +from smartsim._core.mli.comm.channel.dragon_channel import DragonCommChannel +from smartsim._core.mli.comm.channel.dragon_fli import DragonFLIChannel +from smartsim._core.mli.infrastructure.environment_loader import EnvironmentConfigLoader +from smartsim._core.mli.infrastructure.storage.dragon_feature_store import ( + DragonFeatureStore, +) # The tests in this file belong to the dragon group pytestmark = pytest.mark.dragon +logger = get_logger(__name__) def test_dragonbackend_listener_boostrapping(monkeypatch: pytest.MonkeyPatch): - """Verify that an event listener is started""" - # backend_channel = DragonCommChannel.from_local() - assert True - - # with monkeypatch.context() as patcher: - # # patcher.setattr("smartsim._core.launcher.dragon.dragonBackend", "NodePrioritizer", mock.MagicMock()) - # patcher.setattr(NodePrioritizer, "__init__", lambda self, nodes, lock: None) - # patcher.setattr(DragonBackend, "_initialize_hosts", lambda self: None) - - # backend = DragonBackend(pid=9999) - # backend._create_backbone() - - # # create the consumer and start a listener process - # backend_consumer = backend._create_eventing(backend._backbone) - - # # ensure the consumer that was created is retained - # assert backend._event_consumer is not None - # assert backend._event_consumer == backend_consumer - - # assert backend._backbone.notification_channels == [backend_consumer.descriptor] - - # # create components to publish events - # # sender_channel = DragonCommChannel.from_local() - # sender = EventSender(backend._backbone, backend_channel) - - # # simulate a new consumer registration - # new_consumer_channel = DragonCommChannel.from_local() - # registration = OnCreateConsumer(new_consumer_channel.descriptor) - # new_consumer_channel.send(bytes(registration), 0.1) - - # events = backend_consumer.receive() - # assert len(events) == 1 - - -# @pytest.mark.parametrize( -# "content", -# [ -# pytest.param(b"a"), -# pytest.param(b"new byte string"), -# ], -# ) -# def test_environment_loader_attach_fli(content: bytes, monkeypatch: pytest.MonkeyPatch): -# """A descriptor can be stored, loaded, and reattached""" -# chan = Channel.make_process_local() -# queue = FLInterface(main_ch=chan) -# monkeypatch.setenv( -# "_SMARTSIM_REQUEST_QUEUE", du.B64.bytes_to_str(queue.serialize()) -# ) - -# config = EnvironmentConfigLoader( -# featurestore_factory=DragonFeatureStore.from_descriptor, -# callback_factory=DragonCommChannel.from_descriptor, -# queue_factory=DragonFLIChannel.from_sender_supplied_descriptor, -# ) -# config_queue = config.get_queue() - -# _ = config_queue.send(content) - -# old_recv = queue.recvh() -# result, _ = old_recv.recv_bytes() -# assert result == content - - -# def test_environment_loader_serialize_fli(monkeypatch: pytest.MonkeyPatch): -# """The serialized descriptors of a loaded and unloaded -# queue are the same""" -# chan = Channel.make_process_local() -# queue = FLInterface(main_ch=chan) -# monkeypatch.setenv( -# "_SMARTSIM_REQUEST_QUEUE", du.B64.bytes_to_str(queue.serialize()) -# ) - -# config = EnvironmentConfigLoader( -# featurestore_factory=DragonFeatureStore.from_descriptor, -# callback_factory=DragonCommChannel.from_descriptor, -# queue_factory=DragonFLIChannel.from_descriptor, -# ) -# config_queue = config.get_queue() -# assert config_queue._fli.serialize() == queue.serialize() - - -# def test_environment_loader_flifails(monkeypatch: pytest.MonkeyPatch): -# """An incorrect serialized descriptor will fails to attach""" -# monkeypatch.setenv("_SMARTSIM_REQUEST_QUEUE", "randomstring") -# config = EnvironmentConfigLoader( -# featurestore_factory=DragonFeatureStore.from_descriptor, -# callback_factory=None, -# queue_factory=DragonFLIChannel.from_descriptor, -# ) - -# with pytest.raises(DragonFLIError): -# config.get_queue() - - -# def test_environment_loader_backbone_load_dfs(monkeypatch: pytest.MonkeyPatch): -# """Verify the dragon feature store is loaded correctly by the -# EnvironmentConfigLoader to demonstrate featurestore_factory correctness""" -# feature_store = DragonFeatureStore(DDict()) -# monkeypatch.setenv("_SMARTSIM_INFRA_BACKBONE", feature_store.descriptor) - -# config = EnvironmentConfigLoader( -# featurestore_factory=DragonFeatureStore.from_descriptor, -# callback_factory=None, -# queue_factory=None, -# ) - -# print(f"calling config.get_backbone: `{feature_store.descriptor}`") - -# backbone = config.get_backbone() -# assert backbone is not None - - -# def test_environment_variables_not_set(): -# """EnvironmentConfigLoader getters return None when environment -# variables are not set""" -# config = EnvironmentConfigLoader( -# featurestore_factory=DragonFeatureStore.from_descriptor, -# callback_factory=DragonCommChannel.from_descriptor, -# queue_factory=DragonCommChannel.from_descriptor, -# ) -# assert config.get_backbone() is None -# assert config.get_queue() is None + """Verify that the dragon backend registration channel correctly + registers new consumers in the backbone and begins sending events + to the new consumers""" + + backend = DragonBackend(pid=9999) + + backend._create_backbone() + backbone = backend._backbone + + def mock_event_handler(event: EventBase) -> None: + logger.debug(f"Handling event in mock handler: {event}") + + bb_descriptor = os.environ.get(BackboneFeatureStore.MLI_BACKBONE, None) + assert bb_descriptor + + fs = BackboneFeatureStore.from_descriptor(bb_descriptor) + fs[event.uid] = "received" + + # create the consumer and start a listener process + backend_consumer = backend._create_eventing(backbone) + registrar_descriptor = backend._event_consumer.descriptor + + # ensure the consumer is stored to backend & published to backbone + assert backend._event_consumer == backend_consumer + assert backbone.backend_channel == registrar_descriptor + assert os.environ.get(BackboneFeatureStore.MLI_BACKBONE, None) + + # simulate a new consumer registration + new_consumer_ch = DragonCommChannel.from_local() + new_consumer = EventConsumer( + new_consumer_ch, + backbone, + [], + name="test-consumer-a", + event_handler=mock_event_handler, + ) + assert new_consumer, "new_consumer construction failed" + + # send registration to registrar channel + new_consumer.register() + + # the backend consumer should handle updating the notify list and the new + # consumer that just broadcast its registration should be registered... + # backend_consumer.listen_once(timeout=2.0) + backend.listen_to_registrations(timeout=0.1) + + # # confirm the backend registrar consumer registerd the new listener + assert new_consumer_ch.descriptor in backbone.notification_channels + + broadcaster = EventBroadcaster(backbone, DragonCommChannel.from_descriptor) + + # re-send the same thing because i'm too lazy to create a new consumer + broadcast_event = OnCreateConsumer(registrar_descriptor, []) + broadcaster.send(broadcast_event, timeout=0.1) + + new_consumer.listen_once(timeout=0.1) + + values = backbone.wait_for( + [broadcast_event.uid, BackboneFeatureStore.MLI_NOTIFY_CONSUMERS], 1.0 + ) + stored = values[broadcast_event.uid] + assert stored == "received", "The handler didn't update the backbone" + + # confirm that directly retrieving the value isn't different from + # using backbone.notification_channels helper method + notify_list = str(values[BackboneFeatureStore.MLI_NOTIFY_CONSUMERS]).split(",") + assert new_consumer.descriptor in set(notify_list) + + +@pytest.mark.parametrize( + "content", + [ + pytest.param(b"a"), + pytest.param(b"new byte string"), + ], +) +def test_environment_loader_attach_fli(content: bytes, monkeypatch: pytest.MonkeyPatch): + """A descriptor can be stored, loaded, and reattached""" + chan = Channel.make_process_local() + queue = FLInterface(main_ch=chan) + monkeypatch.setenv( + "_SMARTSIM_REQUEST_QUEUE", du.B64.bytes_to_str(queue.serialize()) + ) + + config = EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=DragonCommChannel.from_descriptor, + queue_factory=DragonFLIChannel.from_sender_supplied_descriptor, + ) + config_queue = config.get_queue() + + _ = config_queue.send(content) + + old_recv = queue.recvh() + result, _ = old_recv.recv_bytes() + assert result == content + + +def test_environment_loader_serialize_fli(monkeypatch: pytest.MonkeyPatch): + """The serialized descriptors of a loaded and unloaded + queue are the same""" + chan = Channel.make_process_local() + queue = FLInterface(main_ch=chan) + monkeypatch.setenv( + "_SMARTSIM_REQUEST_QUEUE", du.B64.bytes_to_str(queue.serialize()) + ) + + config = EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=DragonCommChannel.from_descriptor, + queue_factory=DragonFLIChannel.from_descriptor, + ) + config_queue = config.get_queue() + assert config_queue._fli.serialize() == queue.serialize() + + +def test_environment_loader_flifails(monkeypatch: pytest.MonkeyPatch): + """An incorrect serialized descriptor will fails to attach""" + monkeypatch.setenv("_SMARTSIM_REQUEST_QUEUE", "randomstring") + config = EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=None, + queue_factory=DragonFLIChannel.from_descriptor, + ) + + with pytest.raises(DragonFLIError): + config.get_queue() + + +def test_environment_loader_backbone_load_dfs(monkeypatch: pytest.MonkeyPatch): + """Verify the dragon feature store is loaded correctly by the + EnvironmentConfigLoader to demonstrate featurestore_factory correctness""" + feature_store = DragonFeatureStore(DDict()) + monkeypatch.setenv("_SMARTSIM_INFRA_BACKBONE", feature_store.descriptor) + + config = EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=None, + queue_factory=None, + ) + + print(f"calling config.get_backbone: `{feature_store.descriptor}`") + + backbone = config.get_backbone() + assert backbone is not None + + +def test_environment_variables_not_set(): + """EnvironmentConfigLoader getters return None when environment + variables are not set""" + config = EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=DragonCommChannel.from_descriptor, + queue_factory=DragonCommChannel.from_descriptor, + ) + assert config.get_backbone() is None + assert config.get_queue() is None diff --git a/tests/dragon/test_featurestore.py b/tests/dragon/test_featurestore.py index 7f16497418..434bc5eabb 100644 --- a/tests/dragon/test_featurestore.py +++ b/tests/dragon/test_featurestore.py @@ -181,15 +181,15 @@ def test_eventconsumer_eventpublisher_integration( mock_client_app.send(event_4) # worker manager should only get updates about feature update - wmgr_messages = wmgr_consumer.receive() + wmgr_messages = wmgr_consumer.recv() assert len(wmgr_messages) == 3 # the backend should only receive messages about consumer creation - back_messages = back_consumer.receive() + back_messages = back_consumer.recv() assert len(back_messages) == 1 # hypothetical app has no filters and will get all events - app_messages = capp_consumer.receive() + app_messages = capp_consumer.recv() assert len(app_messages) == 4 diff --git a/tests/dragon/test_featurestore_base.py b/tests/dragon/test_featurestore_base.py index 87536c5ba5..59a30a3e85 100644 --- a/tests/dragon/test_featurestore_base.py +++ b/tests/dragon/test_featurestore_base.py @@ -561,7 +561,7 @@ def test_eventconsumer_receive(test_dir: str) -> None: consumer = EventConsumer(comm_channel, backbone) - all_received: t.List[OnCreateConsumer] = consumer.receive() + all_received: t.List[OnCreateConsumer] = consumer.recv() assert len(all_received) == 1 # verify we received the same event that was raised @@ -595,7 +595,7 @@ def test_eventconsumer_receive_multi(test_dir: str, num_sent: int) -> None: consumer = EventConsumer(comm_channel, backbone) - all_received: t.List[OnCreateConsumer] = consumer.receive() + all_received: t.List[OnCreateConsumer] = consumer.recv() assert len(all_received) == num_sent @@ -621,7 +621,7 @@ def test_eventconsumer_receive_empty(test_dir: str) -> None: consumer = EventConsumer(comm_channel, backbone) - messages = consumer.receive() + messages = consumer.recv() # the messages array should be empty assert not messages @@ -696,15 +696,15 @@ def test_eventconsumer_eventpublisher_integration(test_dir: str) -> None: mock_client_app.send(event_4) # worker manager should only get updates about feature update - wmgr_messages = wmgr_consumer.receive() + wmgr_messages = wmgr_consumer.recv() assert len(wmgr_messages) == 3 # the backend should only receive messages about consumer creation - back_messages = back_consumer.receive() + back_messages = back_consumer.recv() assert len(back_messages) == 1 # hypothetical app has no filters and will get all events - app_messages = capp_consumer.receive() + app_messages = capp_consumer.recv() assert len(app_messages) == 4 diff --git a/tests/dragon/test_featurestore_integration.py b/tests/dragon/test_featurestore_integration.py index b088df5b4d..ccc63def73 100644 --- a/tests/dragon/test_featurestore_integration.py +++ b/tests/dragon/test_featurestore_integration.py @@ -138,15 +138,15 @@ def test_eventconsumer_eventpublisher_integration( mock_client_app.send(event, timeout=0.1) # worker manager should only get updates about feature update - wmgr_messages = wmgr_consumer.receive() + wmgr_messages = wmgr_consumer.recv() assert len(wmgr_messages) == 3 # the backend should only receive messages about consumer creation - back_messages = back_consumer.receive() + back_messages = back_consumer.recv() assert len(back_messages) == 1 # hypothetical app has no filters and will get all events - app_messages = capp_consumer.receive() + app_messages = capp_consumer.recv() assert len(app_messages) == 4 @@ -204,7 +204,7 @@ def test_eventconsumer_max_dequeue( num_dequeued = 0 - while wmgr_messages := wmgr_consumer.receive(timeout=0.01): + while wmgr_messages := wmgr_consumer.recv(timeout=0.01): # worker manager should not get more than `max_num_msgs` events num_dequeued += len(wmgr_messages)