diff --git a/server/consumer.go b/server/consumer.go index 552f7f457b..db1743cb17 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5835,10 +5835,6 @@ func (o *consumer) requestNextMsgSubject() string { func (o *consumer) decStreamPending(sseq uint64, subj string) { o.mu.Lock() - // Update our cached num pending only if we think deliverMsg has not done so. - if sseq >= o.sseq && o.isFilteredMatch(subj) { - o.npc-- - } // Check if this message was pending. p, wasPending := o.pending[sseq] @@ -5846,6 +5842,15 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) { if o.rdc != nil { rdc = o.rdc[sseq] } + + // Update our cached num pending only if we think deliverMsg has not done so. + // Either we have not reached the message yet, or we've hit the race condition + // when there is contention at the beginning of the stream. In which case we can + // only decrement if the ack floor is still low enough to be able to detect it. + if o.isFilteredMatch(subj) && sseq > o.asflr && (sseq >= o.sseq || !wasPending) { + o.npc-- + } + o.mu.Unlock() // If it was pending process it like an ack. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index d5d20ae89c..f578b12307 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -24737,3 +24737,68 @@ func TestJetStreamWouldExceedLimits(t *testing.T) { require_True(t, js.wouldExceedLimits(MemoryStorage, int(js.config.MaxMemory)+1)) require_True(t, js.wouldExceedLimits(FileStorage, int(js.config.MaxStore)+1)) } + +func TestJetStreamConsumerDecrementPendingCountOnSkippedMsg(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + + requireExpected := func(expected int64) { + t.Helper() + o.mu.RLock() + defer o.mu.RUnlock() + require_Equal(t, o.npc, expected) + } + + // Should initially report no messages available. + requireExpected(0) + + // A new message is available, should report in pending. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + requireExpected(1) + + // Pending count should decrease when the message is deleted. + err = js.DeleteMsg("TEST", 1) + require_NoError(t, err) + requireExpected(0) + + // Make more messages available, should report in pending. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + _, err = js.Publish("foo", nil) + require_NoError(t, err) + requireExpected(2) + + // Simulate getNextMsg being called and the starting sequence to skip over a deleted message. + // Also simulate one pending message. + o.mu.Lock() + o.sseq = 100 + o.npc-- + o.pending = make(map[uint64]*Pending) + o.pending[2] = &Pending{} + o.mu.Unlock() + + // Since this message is pending we should not decrement pending count as we've done so already. + o.decStreamPending(2, "foo") + requireExpected(1) + + // This is the deleted message that was skipped, and we can decrement the pending count + // because it's not pending and only as long as the ack floor hasn't moved up yet. + o.decStreamPending(3, "foo") + requireExpected(0) +}