From 90bb9e9c80b29487b1755d99724d5abc494e3ef0 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 11 Nov 2024 11:57:33 -0800 Subject: [PATCH 1/3] Detect if we receive an ack past our last stream sequence. We also no longer register pre-acks when we detect this from a consumer snapshot since we properly handle this now and this could lead to excessive memory usage. Signed-off-by: Derek Collison --- server/consumer.go | 24 +++++++++---- server/jetstream_cluster.go | 16 +++------ server/jetstream_cluster_1_test.go | 57 ++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 17 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 9080f48a88f..4cb704c56a3 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3062,18 +3062,30 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b return false } - // Check if this ack is above the current pointer to our next to deliver. - // This could happen on a cooperative takeover with high speed deliveries. - if sseq >= o.sseq { - o.sseq = sseq + 1 - } - mset := o.mset if mset == nil || mset.closed.Load() { o.mu.Unlock() return false } + // Check if this ack is above the current pointer to our next to deliver. + // This could happen on a cooperative takeover with high speed deliveries. + if sseq >= o.sseq { + // Let's make sure this is valid. + // This is only received on the consumer leader, so should never be higher + // than the last stream sequence. + var ss StreamState + mset.store.FastState(&ss) + if sseq > ss.LastSeq { + o.srv.Warnf("JetStream consumer '%s > %s > %s' ACK sequence %d past last stream sequence of %d", + o.acc.Name, o.stream, o.name, sseq, ss.LastSeq) + // FIXME(dlc) - For 2.11 onwards should we return an error here to the caller? + o.mu.Unlock() + return false + } + o.sseq = sseq + 1 + } + // Let the owning stream know if we are interest or workqueue retention based. // If this consumer is clustered (o.node != nil) this will be handled by // processReplicatedAck after the ack has propagated. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index cf5321dfb1d..acf42fc6943 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5089,6 +5089,7 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea } panic(err.Error()) } + if err = o.store.Update(state); err != nil { o.mu.RLock() s, acc, mset, name := o.srv, o.acc, o.mset, o.name @@ -5101,17 +5102,10 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea if mset := o.getStream(); mset != nil { var ss StreamState mset.store.FastState(&ss) - if err := o.checkStateForInterestStream(&ss); err == errAckFloorHigherThanLastSeq { - // Register pre-acks unless no state at all for the stream and we would create alot of pre-acks. - mset.mu.Lock() - // Only register if we have a valid FirstSeq. - if ss.FirstSeq > 0 { - for seq := ss.FirstSeq; seq < state.AckFloor.Stream; seq++ { - mset.registerPreAck(o, seq) - } - } - mset.mu.Unlock() - } + // We used to register preacks here if our ack floor was higher than the last sequence. + // Now when streams catch up they properly call checkInterestState() and periodically run this as well. + // If our states drift this could have allocated lots of pre-acks. + o.checkStateForInterestStream(&ss) } } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 14987a7a384..dc413959521 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6664,6 +6664,63 @@ func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) { } } +// Make sure if we received acks that are out of bounds, meaning past our +// last sequence or before our first that they are ignored and errored if applicable. +func TestJetStreamConsumerAckOutOfBounds(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + _, err = js.Publish("foo.bar", []byte("OK")) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe("foo.*", "C") + require_NoError(t, err) + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + msgs[0].AckSync() + + // Now ack way past the last sequence. + _, err = nc.Request("$JS.ACK.TEST.C.1.10000000000.0.0.0", nil, 250*time.Millisecond) + require_Error(t, err, nats.ErrTimeout) + + // Make sure that now changes happened to our state. + ci, err := js.ConsumerInfo("TEST", "C") + require_NoError(t, err) + require_Equal(t, ci.Delivered.Consumer, 1) + require_Equal(t, ci.Delivered.Stream, 1) + require_Equal(t, ci.AckFloor.Consumer, 1) + require_Equal(t, ci.AckFloor.Stream, 1) + + s := c.consumerLeader("$G", "TEST", "C") + s.Shutdown() + s.WaitForShutdown() + c.restartServer(s) + c.waitOnConsumerLeader(globalAccountName, "TEST", "C") + + // Confirm new leader has same state for delivered and ack floor. + ci, err = js.ConsumerInfo("TEST", "C") + require_NoError(t, err) + require_Equal(t, ci.Delivered.Consumer, 1) + require_Equal(t, ci.Delivered.Stream, 1) + require_Equal(t, ci.AckFloor.Consumer, 1) + require_Equal(t, ci.AckFloor.Stream, 1) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. From 96ac2d630ec1734c30898d34b8e6ab27891a2174 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 11 Nov 2024 11:51:24 -0800 Subject: [PATCH 2/3] Use stream's cfg lock to avoid contention on stream list, etc Signed-off-by: Derek Collison --- server/stream.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/stream.go b/server/stream.go index 729ba4c8ca7..a7217a98ab5 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1692,8 +1692,8 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo // Config returns the stream's configuration. func (mset *stream) config() StreamConfig { - mset.mu.RLock() - defer mset.mu.RUnlock() + mset.cfgMu.RLock() + defer mset.cfgMu.RUnlock() return mset.cfg } @@ -3654,7 +3654,6 @@ func (mset *stream) resetSourceInfo() { } } -// Lock should be held. // This will do a reverse scan on startup or leader election // searching for the starting sequence number. // This can be slow in degenerative cases. From 75713792d65e40964012feca3f70a6f71f265f41 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 11 Nov 2024 11:53:07 -0800 Subject: [PATCH 3/3] Staticcheck fix Signed-off-by: Derek Collison --- server/norace_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/norace_test.go b/server/norace_test.go index 05ef611dff7..3472d16f1b5 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -6703,7 +6703,7 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { cc := sjs.cluster sa := cc.streams[globalAccountName]["TEST"] var consumers []string - for cName, _ := range sa.consumers { + for cName := range sa.consumers { consumers = append(consumers, cName) } sjs.mu.Unlock()