From 7470975907e1267a6e091f333a5c21e655b31c8f Mon Sep 17 00:00:00 2001 From: Luis Gustavo Date: Wed, 20 Nov 2024 12:36:24 -0300 Subject: [PATCH] fix check of the backoff and max delivery configs Match the check of the backoff and maxDeliver configs with the official documentation https://docs.nats.io/nats-concepts/jetstream/consumers#:~:text=The%20sequence%20length,MaxDelivery --- server/consumer.go | 4 +-- server/jetstream_consumer_test.go | 57 +++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index b1d316a6d90..f4e7bfbd0ab 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -623,7 +623,7 @@ func checkConsumerCfg( } // Check if we have a BackOff defined that MaxDeliver is within range etc. - if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver != -1 && config.MaxDeliver <= lbo { + if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver != -1 && lbo > config.MaxDeliver { return NewJSConsumerMaxDeliverBackoffError() } @@ -2091,7 +2091,7 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error { } // Check if BackOff is defined, MaxDeliver is within range. - if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && ncfg.MaxDeliver <= lbo { + if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && lbo > ncfg.MaxDeliver { return NewJSConsumerMaxDeliverBackoffError() } diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index a566908ad8b..2fd1aa45572 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -959,6 +959,22 @@ func TestJetStreamConsumerBackOff(t *testing.T) { }, shouldErr: false, }, + { + name: "backoff_with_max_deliver_equal", + config: nats.ConsumerConfig{ + MaxDeliver: 3, + BackOff: []time.Duration{time.Second, time.Minute, time.Hour}, + }, + shouldErr: false, + }, + { + name: "backoff_with_max_deliver_equal_to_zero", + config: nats.ConsumerConfig{ + MaxDeliver: 0, + BackOff: []time.Duration{}, + }, + shouldErr: false, + }, { name: "backoff_with_max_deliver_smaller", config: nats.ConsumerConfig{ @@ -2476,3 +2492,44 @@ func TestJetStreamConsumerBackoffNotRespectedWithMultipleInflightRedeliveries(t } } } + +func TestJetStreamConsumerBackoffWhenBackoffLengthIsEqualToMaxDeliverConfig(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"events.>"}, + }) + require_NoError(t, err) + + maxDeliver := 3 + backoff := []time.Duration{time.Second, 2 * time.Second, 3 * time.Second} + sub, err := js.SubscribeSync( + "events.>", + nats.MaxDeliver(maxDeliver), + nats.BackOff(backoff), + nats.AckExplicit(), + ) + require_NoError(t, err) + + calculateExpectedBackoff := func(numDelivered int) time.Duration { + return backoff[numDelivered-1] + 50*time.Millisecond // 50ms of margin to system overhead + } + + // message to be redelivered using backoff duration. + firstMsgSent := time.Now() + sendStreamMsg(t, nc, "events.first", "msg-1") + _, err = sub.NextMsg(time.Second) + require_NoError(t, err) + require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(1)) + _, err = sub.NextMsg(2 * time.Second) + require_NoError(t, err) + require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(2)) + _, err = sub.NextMsg(3 * time.Second) + require_NoError(t, err) + require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(3)) +}