-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
failTimeoutMessages() cannot delete outdated messages #1236
Comments
There seems to be a concurrency issue here. I will work on this. |
@geniusjoe Could you review #1247? |
@nodece Sure. As I mentioned in #1247 review, this bugfix definitely can remove outdated messages, but can this pr solve #551 concurrency issue? |
Don't worry about |
Okay. I think this pr is what I need. |
Expected behavior
The
failTimeoutMessages()
timer inpulsar/producer_partition.go
should periodically delete outdated messages inp.pendingQueue
which are older than
p.options.SendTimeout
.Actual behavior
Messages in
p.pendingQueue
which are older thanp.options.SendTimeout
cannot be deleted. If one message cannot send successfully all the time, it will remain in thependingQueue
forever and lead to reconnecting fail infinitely.Helpful information
I think this bug may be related to bugfix #551.
Bugfix #551 is aimed to solve a race condition between
grabCnx()
andfailTimeoutMessages()
functions. Producer may encounter this race condition whenfailTimeoutMessages()
first delete outdated messages, thengrabCnx()
reconnect success and resend these pending deleted messages.#551 solution is to refresh all
p.pendingQueue
messagessendAt
field whengrabCnx()
reconnect happened, so thatfailTimeoutMessages()
will not take effect to these messages. Code reference below:When compared with Java client, Java code may not encounter this race condition. Java
failPendingMessages()
will first check if current channel connection is close, if current connection is not close, then it will triggercnx.ctx().channel().eventLoop()
to avoid race condition. Code reference below:I think we can add
Lock()
andUnlock()
as member method inp.pendingQueue
and remove lock operation in all of implementation methods such asPut()
orPoll()
. We should regard "iterate every member and delete some of them" or "iterate every member and change some member value" as an atomic operation. We should callp.pendingQueue.Lock()
orp.pendingQueue.Unlock()
whenever we take some action to thep.pendingQueue
.Steps to reproduce
handleSend()
function, always return a transient error when encountered some specific message.Log will something like below and I use an
UnknownError
as a transient errorSystem configuration
Client version: 0.12.1
Broker version: 2.9
The text was updated successfully, but these errors were encountered: