Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Pre-acks handling, detecting ack for out of bounds sequence. #6109

Merged
merged 3 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3062,18 +3062,30 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
return false
}

// Check if this ack is above the current pointer to our next to deliver.
// This could happen on a cooperative takeover with high speed deliveries.
if sseq >= o.sseq {
o.sseq = sseq + 1
}

mset := o.mset
if mset == nil || mset.closed.Load() {
o.mu.Unlock()
return false
}

// Check if this ack is above the current pointer to our next to deliver.
// This could happen on a cooperative takeover with high speed deliveries.
if sseq >= o.sseq {
// Let's make sure this is valid.
// This is only received on the consumer leader, so should never be higher
// than the last stream sequence.
var ss StreamState
mset.store.FastState(&ss)
if sseq > ss.LastSeq {
o.srv.Warnf("JetStream consumer '%s > %s > %s' ACK sequence %d past last stream sequence of %d",
o.acc.Name, o.stream, o.name, sseq, ss.LastSeq)
// FIXME(dlc) - For 2.11 onwards should we return an error here to the caller?
o.mu.Unlock()
return false
}
o.sseq = sseq + 1
}

// Let the owning stream know if we are interest or workqueue retention based.
// If this consumer is clustered (o.node != nil) this will be handled by
// processReplicatedAck after the ack has propagated.
Expand Down
16 changes: 5 additions & 11 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5089,6 +5089,7 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
}
panic(err.Error())
}

if err = o.store.Update(state); err != nil {
o.mu.RLock()
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
Expand All @@ -5101,17 +5102,10 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
if mset := o.getStream(); mset != nil {
var ss StreamState
mset.store.FastState(&ss)
if err := o.checkStateForInterestStream(&ss); err == errAckFloorHigherThanLastSeq {
// Register pre-acks unless no state at all for the stream and we would create alot of pre-acks.
mset.mu.Lock()
// Only register if we have a valid FirstSeq.
if ss.FirstSeq > 0 {
for seq := ss.FirstSeq; seq < state.AckFloor.Stream; seq++ {
mset.registerPreAck(o, seq)
}
}
mset.mu.Unlock()
}
// We used to register preacks here if our ack floor was higher than the last sequence.
// Now when streams catch up they properly call checkInterestState() and periodically run this as well.
// If our states drift this could have allocated lots of pre-acks.
o.checkStateForInterestStream(&ss)
}
}

Expand Down
57 changes: 57 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6664,6 +6664,63 @@ func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) {
}
}

// Make sure if we received acks that are out of bounds, meaning past our
// last sequence or before our first that they are ignored and errored if applicable.
func TestJetStreamConsumerAckOutOfBounds(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Retention: nats.WorkQueuePolicy,
Replicas: 3,
})
require_NoError(t, err)

for i := 0; i < 10; i++ {
_, err = js.Publish("foo.bar", []byte("OK"))
require_NoError(t, err)
}

sub, err := js.PullSubscribe("foo.*", "C")
require_NoError(t, err)

msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)
msgs[0].AckSync()

// Now ack way past the last sequence.
_, err = nc.Request("$JS.ACK.TEST.C.1.10000000000.0.0.0", nil, 250*time.Millisecond)
require_Error(t, err, nats.ErrTimeout)

// Make sure that now changes happened to our state.
ci, err := js.ConsumerInfo("TEST", "C")
require_NoError(t, err)
require_Equal(t, ci.Delivered.Consumer, 1)
require_Equal(t, ci.Delivered.Stream, 1)
require_Equal(t, ci.AckFloor.Consumer, 1)
require_Equal(t, ci.AckFloor.Stream, 1)

s := c.consumerLeader("$G", "TEST", "C")
s.Shutdown()
s.WaitForShutdown()
c.restartServer(s)
c.waitOnConsumerLeader(globalAccountName, "TEST", "C")

// Confirm new leader has same state for delivered and ack floor.
ci, err = js.ConsumerInfo("TEST", "C")
require_NoError(t, err)
require_Equal(t, ci.Delivered.Consumer, 1)
require_Equal(t, ci.Delivered.Stream, 1)
require_Equal(t, ci.AckFloor.Consumer, 1)
require_Equal(t, ci.AckFloor.Stream, 1)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
2 changes: 1 addition & 1 deletion server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6703,7 +6703,7 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) {
cc := sjs.cluster
sa := cc.streams[globalAccountName]["TEST"]
var consumers []string
for cName, _ := range sa.consumers {
for cName := range sa.consumers {
consumers = append(consumers, cName)
}
sjs.mu.Unlock()
Expand Down
5 changes: 2 additions & 3 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1692,8 +1692,8 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo

// Config returns the stream's configuration.
func (mset *stream) config() StreamConfig {
mset.mu.RLock()
defer mset.mu.RUnlock()
mset.cfgMu.RLock()
defer mset.cfgMu.RUnlock()
return mset.cfg
}

Expand Down Expand Up @@ -3654,7 +3654,6 @@ func (mset *stream) resetSourceInfo() {
}
}

// Lock should be held.
// This will do a reverse scan on startup or leader election
// searching for the starting sequence number.
// This can be slow in degenerative cases.
Expand Down