Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handling for stream mismatch scenarios. #2702

Merged
merged 5 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1953,6 +1953,7 @@ func (mb *msgBlock) compact() {
}
// Always set last.
mb.last.seq = seq &^ ebit

// Advance to next record.
index += rl
}
Expand Down Expand Up @@ -2605,7 +2606,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
writeIndex := ts-mb.lwits > int64(2*time.Second)

// Accounting
mb.updateAccounting(seq, ts, rl)
mb.updateAccounting(seq&^ebit, ts, rl)

// Check if we are tracking per subject for our simple state.
if len(subj) > 0 && mb.fss != nil {
Expand Down Expand Up @@ -3840,6 +3841,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
}
fs.lmb.first.seq = fs.state.FirstSeq
fs.lmb.last.seq = fs.state.LastSeq

fs.lmb.writeIndexInfo()

cb := fs.scb
Expand Down
62 changes: 54 additions & 8 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1679,10 +1679,8 @@ func (mset *stream) resetClusteredState(err error) bool {
// This will reset the stream and consumers.
// Should be done in separate go routine.
func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) {
js.processClusterCreateStream(acc, sa)

// Check consumers.
js.mu.Lock()
// Check and collect consumers first.
js.mu.RLock()
var consumers []*consumerAssignment
if cc := js.cluster; cc != nil && cc.meta != nil {
ourID := cc.meta.ID()
Expand All @@ -1693,8 +1691,11 @@ func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) {
}
}
}
js.mu.Unlock()
js.mu.RUnlock()

// Reset stream.
js.processClusterCreateStream(acc, sa)
// Reset consumers.
for _, ca := range consumers {
js.processClusterCreateConsumer(ca, nil, false)
}
Expand All @@ -1718,12 +1719,18 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

subject, reply, hdr, msg, lseq, ts, err := decodeStreamMsg(buf[1:])
if err != nil {
if node := mset.raftNode(); node != nil {
s.Errorf("JetStream cluster could not decode stream msg for '%s > %s' [%s]",
mset.account(), mset.name(), node.Group())
}
panic(err.Error())
}

// Check for flowcontrol here.
if !isRecovering && len(msg) == 0 && len(hdr) > 0 && reply != _EMPTY_ && isControlHdr(hdr) {
mset.sendFlowControlReply(reply)
if len(msg) == 0 && len(hdr) > 0 && reply != _EMPTY_ && isControlHdr(hdr) {
if !isRecovering {
mset.sendFlowControlReply(reply)
}
continue
}

Expand Down Expand Up @@ -1761,6 +1768,11 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
case deleteMsgOp:
md, err := decodeMsgDelete(buf[1:])
if err != nil {
if node := mset.raftNode(); node != nil {
s := js.srv
s.Errorf("JetStream cluster could not decode delete msg for '%s > %s' [%s]",
mset.account(), mset.name(), node.Group())
}
panic(err.Error())
}
s, cc := js.server(), js.cluster
Expand Down Expand Up @@ -1802,6 +1814,11 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
case purgeStreamOp:
sp, err := decodeStreamPurge(buf[1:])
if err != nil {
if node := mset.raftNode(); node != nil {
s := js.srv
s.Errorf("JetStream cluster could not decode purge msg for '%s > %s' [%s]",
mset.account(), mset.name(), node.Group())
}
panic(err.Error())
}
// Ignore if we are recovering and we have already processed.
Expand Down Expand Up @@ -2896,6 +2913,16 @@ func (cc *jetStreamCluster) isConsumerAssigned(a *Account, stream, consumer stri
return false
}

// Returns our stream and underlying raft node.
func (o *consumer) streamAndNode() (*stream, RaftNode) {
if o == nil {
return nil, nil
}
o.mu.RLock()
defer o.mu.RUnlock()
return o.mset, o.node
}

func (o *consumer) raftGroup() *raftGroup {
if o == nil {
return nil
Expand Down Expand Up @@ -3002,6 +3029,11 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
// No-op needed?
state, err := decodeConsumerState(e.Data)
if err != nil {
if mset, node := o.streamAndNode(); mset != nil && node != nil {
s := js.srv
s.Errorf("JetStream cluster could not decode consumer snapshot for '%s > %s > %s' [%s]",
mset.account(), mset.name(), o, node.Group())
}
panic(err.Error())
}
o.store.Update(state)
Expand All @@ -3026,6 +3058,11 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
if !isLeader {
dseq, sseq, dc, ts, err := decodeDeliveredUpdate(buf[1:])
if err != nil {
if mset, node := o.streamAndNode(); mset != nil && node != nil {
s := js.srv
s.Errorf("JetStream cluster could not decode consumer delivered update for '%s > %s > %s' [%s]",
mset.account(), mset.name(), o, node.Group())
}
panic(err.Error())
}
if err := o.store.UpdateDelivered(dseq, sseq, dc, ts); err != nil {
Expand All @@ -3039,6 +3076,11 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
case updateAcksOp:
dseq, sseq, err := decodeAckUpdate(buf[1:])
if err != nil {
if mset, node := o.streamAndNode(); mset != nil && node != nil {
s := js.srv
s.Errorf("JetStream cluster could not decode consumer ack update for '%s > %s > %s' [%s]",
mset.account(), mset.name(), o, node.Group())
}
panic(err.Error())
}
o.processReplicatedAck(dseq, sseq)
Expand Down Expand Up @@ -4522,6 +4564,7 @@ type streamSnapshot struct {
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
LastSeq uint64 `json:"last_seq"`
Failed uint64 `json:"clfs"`
Deleted []uint64 `json:"deleted,omitempty"`
}

Expand All @@ -4536,6 +4579,7 @@ func (mset *stream) stateSnapshot() []byte {
Bytes: state.Bytes,
FirstSeq: state.FirstSeq,
LastSeq: state.LastSeq,
Failed: mset.clfs,
Deleted: state.Deleted,
}
b, _ := json.Marshal(snap)
Expand Down Expand Up @@ -4638,7 +4682,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [

// Do proposal.
err := mset.node.Propose(esm)
if err != nil {
if err != nil && mset.clseq > 0 {
mset.clseq--
}
mset.clMu.Unlock()
Expand Down Expand Up @@ -4686,6 +4730,7 @@ func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) {
if snap.FirstSeq > state.FirstSeq {
mset.store.Compact(snap.FirstSeq)
state = mset.store.State()
mset.setLastSeq(snap.LastSeq)
}
// Range the deleted and delete if applicable.
for _, dseq := range snap.Deleted {
Expand Down Expand Up @@ -4773,6 +4818,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) error {
mset.processSnapshotDeletes(snap)

mset.mu.Lock()
mset.clfs = snap.Failed
state := mset.store.State()
sreq := mset.calculateSyncRequest(&state, snap)
s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node
Expand Down
3 changes: 1 addition & 2 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10519,11 +10519,10 @@ func (c *cluster) waitOnServerCurrent(s *Server) {
c.t.Helper()
expires := time.Now().Add(20 * time.Second)
for time.Now().Before(expires) {
time.Sleep(100 * time.Millisecond)
if s.JetStreamIsCurrent() {
time.Sleep(100 * time.Millisecond)
return
}
time.Sleep(100 * time.Millisecond)
}
c.t.Fatalf("Expected server %q to eventually be current", s)
}
Expand Down
171 changes: 171 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3747,3 +3747,174 @@ func TestNoRaceJetStreamKeyValueCompaction(t *testing.T) {
}
}
}

// Trying to recreate an issue rip saw with KV and server restarts complaining about
// mismatch for a few minutes and growing memory.
func TestNoRaceJetStreamClusterStreamSeqMismatchIssue(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "MM",
Replicas: 3,
TTL: 500 * time.Millisecond,
})
require_NoError(t, err)

for i := 1; i <= 10; i++ {
if _, err := kv.PutString("k", "1"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// Close in case we are connected here. Will recreate.
nc.Close()

// Shutdown a non-leader.
s := c.randomNonStreamLeader("$G", "KV_MM")
s.Shutdown()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err = js.KeyValue("MM")
require_NoError(t, err)

// Now change the state of the stream such that we have to do a compact upon restart
// of the downed server.
for i := 1; i <= 10; i++ {
if _, err := kv.PutString("k", "2"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

// Raft could save us here so need to run a compact on the leader.
snapshotLeader := func() {
sl := c.streamLeader("$G", "KV_MM")
if sl == nil {
t.Fatalf("Did not get the leader")
}
mset, err := sl.GlobalAccount().lookupStream("KV_MM")
require_NoError(t, err)
node := mset.raftNode()
if node == nil {
t.Fatalf("Could not get stream group")
}
if err := node.InstallSnapshot(mset.stateSnapshot()); err != nil {
t.Fatalf("Error installing snapshot: %v", err)
}
}

// Now wait for expiration
time.Sleep(time.Second)

snapshotLeader()

s = c.restartServer(s)
c.waitOnServerCurrent(s)

// We want to make sure we do not reset the raft state on a catchup due to no request yield.
// Bug was if we did not actually request any help from snapshot we did not set mset.lseq properly.
// So when we send next batch that would cause raft reset due to cluster reset for our stream.
mset, err := s.GlobalAccount().lookupStream("KV_MM")
require_NoError(t, err)

for i := 1; i <= 10; i++ {
if _, err := kv.PutString("k1", "X"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

c.waitOnStreamCurrent(s, "$G", "KV_MM")

// Make sure we did not reset our stream.
msetNew, err := s.GlobalAccount().lookupStream("KV_MM")
require_NoError(t, err)
if msetNew != mset {
t.Fatalf("Stream was reset")
}
}

func TestNoRaceJetStreamClusterStreamDropCLFS(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "CLFS",
Replicas: 3,
})
require_NoError(t, err)

// Will work
_, err = kv.Create("k.1", []byte("X"))
require_NoError(t, err)
// Drive up CLFS state on leader.
for i := 0; i < 10; i++ {
_, err = kv.Create("k.1", []byte("X"))
require_Error(t, err)
}
// Bookend with new key success.
_, err = kv.Create("k.2", []byte("Z"))
require_NoError(t, err)

// Close in case we are connected here. Will recreate.
nc.Close()

// Shutdown, which will also clear clfs.
s := c.randomNonStreamLeader("$G", "KV_CLFS")
s.Shutdown()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err = js.KeyValue("CLFS")
require_NoError(t, err)

// Drive up CLFS state on leader.
for i := 0; i < 10; i++ {
_, err = kv.Create("k.1", []byte("X"))
require_Error(t, err)
}

sl := c.streamLeader("$G", "KV_CLFS")
if sl == nil {
t.Fatalf("Did not get the leader")
}
mset, err := sl.GlobalAccount().lookupStream("KV_CLFS")
require_NoError(t, err)
node := mset.raftNode()
if node == nil {
t.Fatalf("Could not get stream group")
}
if err := node.InstallSnapshot(mset.stateSnapshot()); err != nil {
t.Fatalf("Error installing snapshot: %v", err)
}

_, err = kv.Create("k.3", []byte("ZZZ"))
require_NoError(t, err)

s = c.restartServer(s)
c.waitOnServerCurrent(s)

mset, err = s.GlobalAccount().lookupStream("KV_CLFS")
require_NoError(t, err)

_, err = kv.Create("k.4", []byte("YYY"))
require_NoError(t, err)

c.waitOnStreamCurrent(s, "$G", "KV_CLFS")

// Make sure we did not reset our stream.
msetNew, err := s.GlobalAccount().lookupStream("KV_CLFS")
require_NoError(t, err)
if msetNew != mset {
t.Fatalf("Stream was reset")
}
}
2 changes: 1 addition & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2686,7 +2686,7 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Ac

hdr, msg := c.msgParts(rmsg)

// If we are not receiving directly from a client we should move this this Go routine.
// If we are not receiving directly from a client we should move this to another Go routine.
if c.kind != CLIENT {
mset.queueInboundMsg(subject, reply, hdr, msg)
return
Expand Down