Skip to content

Commit

Permalink
[FIXED] Don't timeout for retried AckAll
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen authored and neilalexander committed Jan 21, 2025
1 parent 68607a7 commit 7189018
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
3 changes: 2 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2860,7 +2860,8 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
// no-op
if dseq <= o.adflr || sseq <= o.asflr {
o.mu.Unlock()
return ackInPlace
// Return true to let caller respond back to the client.
return true
}
if o.maxp > 0 && len(o.pending) >= o.maxp {
needSignal = true
Expand Down
44 changes: 44 additions & 0 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,3 +1526,47 @@ func TestJetStreamConsumerBackoffWhenBackoffLengthIsEqualToMaxDeliverConfig(t *t
require_NoError(t, err)
require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(3))
}

func TestJetStreamConsumerRetryAckAfterTimeout(t *testing.T) {
for _, ack := range []struct {
title string
policy nats.SubOpt
}{
{title: "AckExplicit", policy: nats.AckExplicit()},
{title: "AckAll", policy: nats.AckAll()},
} {
t.Run(ack.title, func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

_, err = js.Publish("foo", nil)
require_NoError(t, err)

sub, err := js.PullSubscribe("foo", "CONSUMER", ack.policy)
require_NoError(t, err)

msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)

msg := msgs[0]
// Send core request so the client is unaware of the ack being sent.
_, err = nc.Request(msg.Reply, nil, time.Second)
require_NoError(t, err)

// It could be we have already acked this specific message, but we haven't received the success response.
// Retrying the ack should not time out and still signal success.
err = msg.AckSync()
require_NoError(t, err)
})
}
}

0 comments on commit 7189018

Please sign in to comment.