Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: Flow Control does not work as expected. #7677

Closed
kamalaboulhosn opened this issue Apr 8, 2019 · 22 comments · Fixed by #7948
Closed

Pub/Sub: Flow Control does not work as expected. #7677

kamalaboulhosn opened this issue Apr 8, 2019 · 22 comments · Fixed by #7948
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. triaged for GA type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@kamalaboulhosn
Copy link

kamalaboulhosn commented Apr 8, 2019

The flow control settings for the Cloud Pub/Sub subscriber should bound the number of unacknowledged messages outstanding to the callback at a time. However, it seems to have no effect on the client, where the number of outstanding messages always ends up being 10.

OS type and version: Mac OS X 10.14.3, Debian Linux
Python version: Python 2.7.10 on Mac, Python 3.6.5 on Linux
google-cloud-pubsub version: 0.40.0

Steps to reproduce

  1. Create a Cloud Pub/Sub topic and subscription
  2. Publish many messages to the topic
  3. Update the project name and subscription name in the code below.
  4. Run the code.

Expected behavior: Since the flow control settings have max_messages set to 2, only two "Received Message" lines should print at a time, with two "Acking Message" lines printing out 20 seconds later. Instead, 10 messages come to the callback at a time. Changing the value of max_messages has no impact on the number of messages sent to the callback.

Code example

import time

from google.cloud import pubsub_v1

project_name=<insert project name here>
subscription_name=<insert subscription name here>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_name, subscription_name)

def callback(message):
    print ("Received Message")
    time.sleep(20)
    print ("Acking Message")
    message.ack()

flow_control = pubsub_v1.types.FlowControl(max_messages=2)
subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control)

print('Subscriber started for {}'.format(subscription_path))
while True:
    time.sleep(60)
@yoshi-automation yoshi-automation added triage me I really want to be triaged. 🚨 This issue needs some love. labels Apr 9, 2019
@sduskis sduskis added api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. and removed 🚨 This issue needs some love. triage me I really want to be triaged. labels Apr 15, 2019
@yoshi-automation yoshi-automation added triage me I really want to be triaged. 🚨 This issue needs some love. labels Apr 15, 2019
@sduskis sduskis added priority: p2 Moderately-important priority. Fix may not be included in next release. triaged for GA and removed 🚨 This issue needs some love. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. triage me I really want to be triaged. labels Apr 16, 2019
@tseaver tseaver changed the title Cloud Pub/Sub: Flow Control does not work as expected. Pub/Sub: Flow Control does not work as expected. Apr 16, 2019
@anguillanneuf
Copy link
Contributor

anguillanneuf commented Apr 26, 2019

@kamalaboulhosn I reproduced this issue using an exact code of your code. However, a simple change in the code (the try..except block) corrected the unexpected behavior. It looks like our code samples actually introduce this unexpected behavior :(

My code:

import time

from google.cloud import pubsub_v1

subscriber_client = pubsub_v1.SubscriberClient()
flow_control = pubsub_v1.types.FlowControl(max_messages=1)

def callback(m):
    print('{}: {} received'.format(int(time.time()), m.data))
    time.sleep(5)
    m.ack()
    print('{}: {} acked'.format(int(time.time()), m.data))

future = subscriber_client.subscribe(
    'projects/{}/subscriptions/{}'.format('your_project', 'your_subscription'),
    flow_control=flow_control,
    callback=callback
)

print('Listening...')
try:
    future.result()
except:
    future.cancel()

Output:

Listening...
1556318231: b'm0' received
1556318236: b'm0' acked
1556318237: b'm2' received
1556318242: b'm2' acked
1556318242: b'm1' received
1556318247: b'm1' acked

@kamalaboulhosn
Copy link
Author

I tried the code you provided and I still see the exact same result with flow control not working.

@anguillanneuf
Copy link
Contributor

anguillanneuf commented Apr 26, 2019

Very strange.. I tried 1, 2, 3 messages, they all worked in both Python 2 and Python 3, on MacOS and on Linux machines, with the more than occasional +1 message (for example, max_messages set to 2, but receive 3). With Python 2, I just couldn't kill the program with Ctrl + C or Ctrl + D. Let me look into it further. Thanks for you patience.

Screen Shot 2019-04-29 at 10 23 14 AM

@plamut
Copy link
Contributor

plamut commented May 7, 2019

@kamalaboulhosn @anguillanneuf How did you manage to reproduce "I get 10 messages at once", what was the pattern of publishing messages? And does that happen under Python 2.7 only?

I was able to trace down the cause of the n+1 messages received and have a proof-of-concept fix - the background consumer thread is properly paused when the leaser load reaches 1.0, and does not pull an extra message.

However, I was not able to reproduce the "received 10 messages" issue, is that somehow related to the server side that sometimes sends 10 messages in a single response? I was told that this is a possibility, can any of you two provide more info? Thanks!

@kamalaboulhosn
Copy link
Author

I published 1000 messages at the beginning before I started the subscriber (but after I created the subscription). That way, there are a lot of messages immediately waiting for the subscriber. I see the issue with both Python2.7 and Python3.6.

Responses sent on the stream from the server can contain anywhere between 1 and (# of messages batched on the publish side) in a single message. The client library is responsible for restricting the number of outstanding user messages based on the flow control settings, even if more messages have been delivered.

If I create a simple subscriber in Java and set the flow control settings to restrict the number of outstanding user messages, everything works as expected.

@plamut
Copy link
Contributor

plamut commented May 7, 2019

@kamalaboulhosn Thanks, will try with batch-publishing messages as well (update: indeed, that was it).

Another question about other clients, so that the Python client will behave consistently - say that I currently have 7 messages leased (out of a maximum of 10), and the server delivers another 10 messages in a single response.

The streaming pull manager can immediately lease 3 of those messages and invoke the user's callback for them, but what about the remaining "excessive" 7 messages? Should the subscriber client cache them internally until some of the currently leased messages get ACK-ed, freeing the leasing capacity? And then only resume the streaming pull once it clears the internal cache AND the lease management load drops below the FlowControl.resume_threshold setting?

This makes the most sense to me, but want to make sure that this is indeed how other clients handle it.

@kamalaboulhosn
Copy link
Author

@plamut The subscriber should hold on to the extra 7 messages and wait until there is more capacity to send them to the user's callback, yes. You can see this in the Java implementation. Resuming the streaming pull after these excessive messages are cleared and we are below the resume_threshold sounds reasonable.

@plamut
Copy link
Contributor

plamut commented May 7, 2019

@kamalaboulhosn Thanks for confirming this, and I got the same answer for the node.js client as well. I will implement an internal message buffer then.

@jtressle
Copy link

FYI,

I'm experiencing the same issue where my subscriber is exceeding 'max_messages'. I'm on Ubuntu 16.04, and running Python 3.6.8 in a VM. My task takes about 2-5 minutes, and max_messages is set to 1.

I've tried @anguillanneuf code and the issue persists.

My subscriber pulls the correct # of messages about 90% of the time.

Are there any other potential fixes?

Thanks.

@plamut
Copy link
Contributor

plamut commented May 20, 2019

Hi @jtressle, there is a PR linked just above your comment that addresses this very issue. 😉 It has not yet been merged, though.

Until the fix is merged and released, there might a workaround, albeit an ugly one. If your message callback blocks the thread before acknowledging the received message, it might work to to limit the scheduler's max_workers to 1.

subscriber_client = pubsub_v1.SubscriberClient()
flow_control = pubsub_v1.types.FlowControl(max_messages=1)
custom_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
custom_scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(custom_executor)

future = subscriber_client.subscribe(
    'my-subscription',
    flow_control=flow_control,
    callback=my_callback,
    scheduler=custom_scheduler
)

When a message is received, a callbacks is invoked asynchronously using the underlying scheduler, and if the latter's thread pool only has 1 available thread, only a single callback will be executed at a time - unless, of course, that callback dispatched its work to some other thread.

@jtressle
Copy link

@plamut this appears to have worked! Thanks for the tip.

@plamut
Copy link
Contributor

plamut commented May 23, 2019

@jtressle Excellent, great to hear that!

@jtressle
Copy link

@plamut when will the merge happen, you think?

Unfortunately, the fix you recommended is locking up some of my resources. I’ll continue to troubleshoot.

Thanks.

@plamut
Copy link
Contributor

plamut commented May 24, 2019

@jtressle All review comments on the PR have been addressed (nothing major), currently awaiting another review. I would say it will most likely be merged some time next week.

@plamut
Copy link
Contributor

plamut commented May 24, 2019

@jtressle I am happy to inform you that the PR has just been approved and merged.

@jtressle
Copy link

@plamut what's the best way of getting this fix quickly? I'm using PIP to install pubsub, but it's not updated yet.

Thanks!

@plamut
Copy link
Contributor

plamut commented May 26, 2019

@jtressle I don't know the next release date, but until then, would installing a development version of the pubsub package work for you?

$ pip install -e git+https://github.com/googleapis/google-cloud-python.git#egg=google-cloud-pubsub\&subdirectory=pubsub

@jtressle
Copy link

@plamut perfect! It's installed and I'm testing now. I'll let you know if I see any issues with the max_messages.

Thanks again.

@jtressle
Copy link

@plamut I wanted to give you an update. I've been running the development pubsub version for about 8 hours. So far, it looks like the 'max_messages' issue is fixed.

However, I have experienced ~4 lost messages. I'm not sure if this can be attributed to the patch, but I'll keep on monitoring. I'll wait for version 0.42 before submitting an issue.

Thanks again!

@plamut
Copy link
Contributor

plamut commented May 27, 2019

@jtressle Thanks for the feedback, appreciated!

What do you mean by "lost" messages? Not being delivered at all or maybe just not being delivered in expected order, seemingly appearing to have been lost? The server is designed for availability and throughput, thus the message delivery order is not guaranteed (longer explanation).

However, the server will do its best to deliver each message at least once, with two exceptions:

  • If a message cannot be delivered within 7 days, it is deleted.
  • Messages published before a given subscription is created will usually not be delivered.

Could any of that explain the observed behavior? Have the "lost" messages perhaps been delivered in the meantime?

Generally, if there is an error / timeout when acknowledging a message, say, because the processing took too long, or if the ACK request got lost, the server will eventually try to re-deliver the message.

@jtressle
Copy link

@plamut Thanks for the info.

I continued to run pubsub over the weekend and didn't experience any lost messages. The earlier lost messages could have been caused by an issue on my side. Also, still no issue with max_messages so your fix definitely works.

Thanks again for your help!

@plamut
Copy link
Contributor

plamut commented May 28, 2019

@jtressle It was my pleasure. And thank you for confirming that the fix indeed works. 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. triaged for GA type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
6 participants