Skip to content

Commit

Permalink
Add a method for waiting for unpublished events (#42)
Browse files Browse the repository at this point in the history
Also update the README to reflect this update.
  • Loading branch information
t-persson authored Jan 20, 2022
1 parent b4fe63e commit 4fc0560
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Publishing an event
ACTIVITY_TRIGGERED = EiffelActivityTriggeredEvent()
ACTIVITY_TRIGGERED.data.add("name", "Test activity")
PUBLISHER.send_event(ACTIVITY_TRIGGERED)
PUBLISHER.wait_for_unpublished_events()
Deprecation of routing key
--------------------------
Expand Down
6 changes: 2 additions & 4 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ This code snippet will subscribe to an ActivityStarted in order to publish an Ac

.. code-block:: python
import time
from eiffellib.subscribers import RabbitMQSubscriber
from eiffellib.publishers import RabbitMQPublisher
from eiffellib.events import (EiffelActivityTriggeredEvent,
Expand Down Expand Up @@ -68,7 +67,7 @@ This code snippet will subscribe to an ActivityStarted in order to publish an Ac
PUBLISHER.send_event(ACTIVITY_STARTED)
# Wait for event to be received by 'callback'.
time.sleep(1)
PUBLISHER.wait_for_unpublished_events()
Activity
--------
Expand All @@ -80,7 +79,6 @@ An activity is just a callable which will send ActivityTriggered, Started and Fi
.. code-block:: python
import os
import time
from eiffellib.subscribers import RabbitMQSubscriber
from eiffellib.publishers import RabbitMQPublisher
from eiffellib.events import EiffelAnnouncementPublishedEvent
Expand Down Expand Up @@ -117,4 +115,4 @@ An activity is just a callable which will send ActivityTriggered, Started and Fi
PUBLISHER.send_event(ANNOUNCEMENT)
# Wait for event to be received by 'callback'.
time.sleep(1)
PUBLISHER.wait_for_unpublished_events()
11 changes: 10 additions & 1 deletion eiffellib/publishers/eiffel_publisher.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Axis Communications AB.
# Copyright 2020-2022 Axis Communications AB.
#
# For a full list of individual contributors, please see the commit history.
#
Expand Down Expand Up @@ -46,6 +46,15 @@ def send(self, msg):
"""
raise NotImplementedError

def wait_for_unpublished_events(timeout=60):
"""Wait for all unpublished events to become published.
:raises TimeoutError: If the timeout is reached, this will be raised.
:param timeout: A timeout, in seconds, to wait before exiting.
:type timeout: int
"""
raise NotImplementedError

def close(self):
"""Close down publisher. Override if special actions are required."""
self.running = False
21 changes: 21 additions & 0 deletions eiffellib/publishers/rabbitmq_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,27 @@ def _confirm_delivery(self, method_frame):
len(self._deliveries), self._acks, self._nacks)
_LOG.debug(f"[{current_thread().getName()}] '_confirm_delivery' Lock released")

def wait_for_unpublished_events(self, timeout=60):
"""Wait for all unpublished events to become published.
For the RabbitMQ publisher an event becomes published if the
broker (not the consumer) responds with an ACK.
:raises TimeoutError: If the timeout is reached, this will be raised.
:param timeout: A timeout, in seconds, to wait before exiting.
:type timeout: int
"""
end = time.time() + timeout
deliveries = 0
while time.time() < end:
time.sleep(0.1)
deliveries = len(self._deliveries) + len(self._nacked_deliveries)
if deliveries == 0:
break
else:
raise TimeoutError("Timeout (%0.2fs) while waiting for events to publish"
" (%d still unpublished)" % (timeout, deliveries))

def send_event(self, event, block=True):
"""Validate and send an eiffel event to the rabbitmq server.
Expand Down

0 comments on commit 4fc0560

Please sign in to comment.