Skip to content

Commit

Permalink
Cherry-picks for 2.10.25 (#6401)
Browse files Browse the repository at this point in the history
Includes the following:

- #6399

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Jan 23, 2025
2 parents fb122e0 + b88f999 commit c21dfde
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
6 changes: 4 additions & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3613,8 +3613,10 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
}
// Message was scheduled for redelivery but was removed in the meantime.
if err == ErrStoreMsgNotFound || err == errDeletedMsg {
delete(o.pending, seq)
delete(o.rdc, seq)
// This is a race condition where the message is still in o.pending and
// scheduled for redelivery, but it has been removed from the stream.
// o.processTerm is called in a goroutine so could run after we get here.
// That will correct the pending state and delivery/ack floors, so just skip here.
continue
}
return pmsg, dc, err
Expand Down
33 changes: 29 additions & 4 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1665,20 +1665,45 @@ func TestJetStreamConsumerMessageDeletedDuringRedelivery(t *testing.T) {
err = js.DeleteMsg("TEST", 2)
require_NoError(t, err)

// Wait for mset.storeUpdates to call into o.decStreamPending which runs
// the o.processTerm goroutine, removing one message from pending.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
o.mu.RLock()
defer o.mu.RUnlock()
if len(o.pending) != 2 {
return fmt.Errorf("expected 2 pending, but got %d", len(o.pending))
}
return nil
})

// Now empty the redelivery queue and reset the pending state.
o.mu.Lock()
defer o.mu.Unlock()
for seq := range o.rdc {
for _, seq := range o.rdq {
o.removeFromRedeliverQueue(seq)
}

o.pending = make(map[uint64]*Pending)
o.pending[2] = &Pending{}
o.addToRedeliverQueue(2)

// Also reset delivery/ack floors to confirm they get corrected.
o.adflr, o.asflr = 0, 0
o.dseq, o.sseq = 11, 11

// Getting the next message should skip seq 2, as that's deleted, but must not touch state.
_, _, err = o.getNextMsg()
o.mu.Unlock()
require_Error(t, err, ErrStoreEOF)
require_Len(t, len(o.pending), 1)

// Simulate the o.processTerm goroutine running after a call to o.getNextMsg.
// Pending state and delivery/ack floors should be corrected.
o.processTerm(2, 2, 1, ackTermUnackedLimitsReason, _EMPTY_)

o.mu.RLock()
defer o.mu.RUnlock()
require_Len(t, len(o.pending), 0)
require_Len(t, len(o.rdc), 0)
require_Equal(t, o.adflr, 10)
require_Equal(t, o.asflr, 10)
})
}
}

0 comments on commit c21dfde

Please sign in to comment.