Skip to content

Commit

Permalink
fix: don't assert on unordered publishes after publish error (#49)
Browse files Browse the repository at this point in the history
* fix: allow unordered publishes after publish error

* Clarify comment / retrigger checks
  • Loading branch information
pradn authored Mar 23, 2020
1 parent ba5a9dd commit ea19ce6
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 0 deletions.
3 changes: 3 additions & 0 deletions google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,6 @@ def publish(self, message):
self.commit()

return future

def _set_status(self, status):
self._status = status
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ def commit(self):
if self._current_batch:
self._current_batch.commit()

# At this point, we lose track of the old batch, but we don't
# care since we just committed it.
# Setting this to None guarantees the next publish() creates a new
# batch.
self._current_batch = None

def unpause(self):
""" Not relevant for this class. """
raise NotImplementedError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from google.auth import credentials
from google.cloud.pubsub_v1 import publisher
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher._batch import base
from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer


Expand Down Expand Up @@ -102,3 +103,24 @@ def test_publish_batch_full():
future = sequencer.publish(message)
batch.publish.assert_called_once_with(message)
assert future is not None


def test_publish_after_batch_error():
client = create_client()
message = create_message()
batch = mock.Mock(spec=client._batch_class)

sequencer = unordered_sequencer.UnorderedSequencer(client, "topic_name")
sequencer._set_batch(batch)

sequencer.commit()
batch.commit.assert_called_once()

# Simulate publish RPC failing.
batch._set_status(base.BatchStatus.ERROR)

# Will create a new batch since the old one has been committed. The fact
# that the old batch errored should not matter in the publish of the next
# message.
future = sequencer.publish(message)
assert future is not None

0 comments on commit ea19ce6

Please sign in to comment.