From 71890187f51828694f51f6838abf1a99ae9af76b Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 21 Jan 2025 14:29:30 +0100 Subject: [PATCH] [FIXED] Don't timeout for retried AckAll Signed-off-by: Maurice van Veen --- server/consumer.go | 3 ++- server/jetstream_consumer_test.go | 44 +++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/server/consumer.go b/server/consumer.go index f267718361c..77cec0b7cd1 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2860,7 +2860,8 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b // no-op if dseq <= o.adflr || sseq <= o.asflr { o.mu.Unlock() - return ackInPlace + // Return true to let caller respond back to the client. + return true } if o.maxp > 0 && len(o.pending) >= o.maxp { needSignal = true diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 8e62dbb12a0..193798c3d67 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1526,3 +1526,47 @@ func TestJetStreamConsumerBackoffWhenBackoffLengthIsEqualToMaxDeliverConfig(t *t require_NoError(t, err) require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(3)) } + +func TestJetStreamConsumerRetryAckAfterTimeout(t *testing.T) { + for _, ack := range []struct { + title string + policy nats.SubOpt + }{ + {title: "AckExplicit", policy: nats.AckExplicit()}, + {title: "AckAll", policy: nats.AckAll()}, + } { + t.Run(ack.title, func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + sub, err := js.PullSubscribe("foo", "CONSUMER", ack.policy) + require_NoError(t, err) + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + + msg := msgs[0] + // Send core request so the client is unaware of the ack being sent. + _, err = nc.Request(msg.Reply, nil, time.Second) + require_NoError(t, err) + + // It could be we have already acked this specific message, but we haven't received the success response. + // Retrying the ack should not time out and still signal success. + err = msg.AckSync() + require_NoError(t, err) + }) + } +}