Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: threads can skip the line in publisher flow controller #422

Merged
merged 5 commits into from
Jun 17, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add publisher flow controller test for FIFO order
  • Loading branch information
plamut committed Jun 2, 2021
commit 7add5059d5e4df1af899018e148081b23daa245c
35 changes: 35 additions & 0 deletions tests/unit/pubsub_v1/publisher/test_flow_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,41 @@ def test_threads_posting_large_messages_do_not_starve():
pytest.fail("Adding messages blocked or errored.") # pragma: NO COVER


def test_blocked_messages_are_accepted_in_fifo_order():
settings = types.PublishFlowControl(
message_limit=1,
byte_limit=1_000_000, # Unlimited for practical purposes in the test.
limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK,
)
flow_controller = FlowController(settings)

# It's OK if the message instance is shared, as flow controlelr is only concerned
# with byte sizes and counts, and not with particular message instances.
message = grpc_types.PubsubMessage(data=b"x")

adding_done_events = [threading.Event() for _ in range(10)]
releasing_done_events = [threading.Event() for _ in adding_done_events]

# Add messages. The first one will be accepted, and the rest should queue behind.
for adding_done in adding_done_events:
_run_in_daemon(flow_controller, "add", [message], adding_done)
time.sleep(0.1)

if not adding_done_events[0].wait(timeout=0.1): # pragma: NO COVER
pytest.fail("The first message unexpectedly got blocked on adding.")

# For each message, check that it has indeed been added to the flow controller.
# Then release it to make room for the next message in line, and repeat the check.
enumeration = enumerate(zip(adding_done_events, releasing_done_events))
for i, (adding_done, releasing_done) in enumeration:
if not adding_done.wait(timeout=0.1): # pragma: NO COVER
pytest.fail(f"Queued message still blocked on adding (i={i}).")

_run_in_daemon(flow_controller, "release", [message], releasing_done)
if not releasing_done.wait(timeout=0.1): # pragma: NO COVER
pytest.fail(f"Queued message was not released in time (i={i}).")


def test_warning_on_internal_reservation_stats_error_when_unblocking():
settings = types.PublishFlowControl(
message_limit=1,
Expand Down