-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 918] [Refactor] Remove the clearMessageQueuesCh
in partitionConsumer.dispatcher()
#921
Conversation
clearMessageQueuesCh
in partitionConsumer.dispatcher()
} | ||
|
||
close(doneCh) | ||
clearQueueCb(nextMessageInQueue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you forget the reset available permits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you forget the reset available permits?
Hi. @nodece, I don't think it's necessary to reset available permits. In the user side, if permits over the threshold, internalFlow
will be invoked.
pulsar-client-go/pulsar/consumer_partition.go
Lines 196 to 208 in 1d3499a
if ap >= flowThreshold { | |
availablePermits := ap | |
requestedPermits := ap | |
// check if permits changed | |
if !atomic.CompareAndSwapInt32(&p.permits, ap, 0) { | |
return | |
} | |
p.pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits) | |
if err := p.pc.internalFlow(uint32(requestedPermits)); err != nil { | |
p.pc.log.WithError(err).Error("unable to send permits") | |
} | |
} |
In the broker side, I checked the relative code and doesn't find any need to reset it. The Java Client does not reset it too.
So I think it's no need to reset the available permits. I don't understand why should reset it in the legacy code and I guess it's just a mearsure taken for safe.
@@ -138,15 +138,14 @@ type partitionConsumer struct { | |||
// the size of the queue channel for buffering messages | |||
queueSize int32 | |||
queueCh chan []*message | |||
startMessageID trackingMessageID | |||
startMessageID atomicMessageID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could atomic.Value{}
instead of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could
atomic.Value{}
instead of this?
I think atomicMessageID
is better than atomic.Value{}
because it's more simple and clearly.
For example, if we use atomic.Value
as the startMessageID type, the original using of startMessageID
will be like this.
// original L985
return pc.startMessageID.greater(msgID.messageID)
// atomicMessageID
return pc.startMessageID.get().greater(msgID.messageID)
// atomic.Value{}
return pc.startMessageID.Load().(trackingMessageID).greater(msgID.messageID)
atomic.Value{}
will need one more time type assertion.
But atomicMessageID
may need a better name and declear position. Do you have more idea? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
pulsar/consumer_test.go
Outdated
@@ -3177,6 +3177,7 @@ func TestConsumerSeekByTimeOnPartitionedTopic(t *testing.T) { | |||
// should be able to consume all messages once again | |||
for i := 0; i < N; i++ { | |||
msg, err := consumer.Receive(ctx) | |||
fmt.Println(string(msg.Payload()) + "-" + strconv.Itoa(i)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt.Println(string(msg.Payload()) + "-" + strconv.Itoa(i)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…nsumer.dispatcher()
91996c4
to
3afd2d4
Compare
Master Issue: #918
Motivation
The two chanel
clearMessageQueuesCh
andclearQueueCb
just need to keep one.For more details please check #918.
This PR does not ony aim to clean up, but also fix the potential bug in
clearMessageQueuesCh
.For example, the
clearMessageQueuesCh
do the jod including clearing messageCh. But it may cause problem whenSeekByTime
invoked on partition topic.pulsar-client-go/pulsar/consumer_impl.go
Lines 614 to 626 in 1d3499a
pulsar-client-go/pulsar/consumer_partition.go
Lines 1168 to 1175 in 1d3499a
When consume the partition topic, all the
partitionConsumer
share the samemessageCh
. AfterSeekByTime
on partitioned topic, the messageCh may be cleared more than one time which will cause the messages losing. Suppose there is such a situation, partitionConsumer-1 has cleared its messageCh and queueCh. When partitionConsumer-2 do the clear job, it can also exec this logic.pulsar-client-go/pulsar/consumer_partition.go
Lines 1172 to 1174 in 1d3499a
But
messageCh
is a share chan, partitionConsumer-1 may received new messages and put them tomessageCh
at this moment. There is such a possibility that partitionConsumer-2 cleared the new messages frommessageCh
.Modifications
clearMessageQueuesCh
inpartitionConsumer.dispatcher()
clearQueueCb
inpartitionConsumer.dispatcher()
Verifying this change