diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 51eaefed7fa..033c557d2bf 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4605,9 +4605,9 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) { require_NoError(t, err) // Wait for all servers to have applied everything. - var maxApplied uint64 + // Expect 2: EntryPeerState, streamMsgOp + expectedApplied := uint64(2) checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { - maxApplied = 0 for _, s := range c.servers { acc, err := s.lookupAccount(globalAccountName) if err != nil { @@ -4617,13 +4617,8 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) { if err != nil { return err } - _, _, applied := mset.node.Progress() - if maxApplied == 0 { - maxApplied = applied - } else if applied < maxApplied { - return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied) - } else if applied > maxApplied { - return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied) + if _, _, applied := mset.node.Progress(); applied != expectedApplied { + return fmt.Errorf("applied doesn't match, expected %d, got %d", expectedApplied, applied) } } return nil @@ -4641,7 +4636,7 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) { // Validate the snapshot reflects applied. validateStreamState := func(snap *snapshot) { t.Helper() - require_Equal(t, snap.lastIndex, maxApplied) + require_Equal(t, snap.lastIndex, expectedApplied) ss, err := DecodeStreamState(snap.data) require_NoError(t, err) require_Equal(t, ss.FirstSeq, 1) @@ -4701,9 +4696,9 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) { require_NoError(t, err) // Wait for all servers to have applied everything. - var maxApplied uint64 + // Expect 4: addPendingRequest, removePendingRequest, updateDeliveredOp, updateAcksOp + expectedApplied := uint64(4) checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { - maxApplied = 0 for _, s := range c.servers { acc, err := s.lookupAccount(globalAccountName) if err != nil { @@ -4717,13 +4712,8 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) { if o == nil { return errors.New("consumer not found") } - _, _, applied := o.node.Progress() - if maxApplied == 0 { - maxApplied = applied - } else if applied < maxApplied { - return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied) - } else if applied > maxApplied { - return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied) + if _, _, applied := o.node.Progress(); applied != expectedApplied { + return fmt.Errorf("applied doesn't match, expected %d, got %d", expectedApplied, applied) } } return nil @@ -4743,9 +4733,9 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) { require_NoError(t, err) // Validate the snapshot reflects applied. - validateStreamState := func(snap *snapshot) { + validateConsumerState := func(snap *snapshot) { t.Helper() - require_Equal(t, snap.lastIndex, maxApplied) + require_Equal(t, snap.lastIndex, expectedApplied) state, err := decodeConsumerState(snap.data) require_NoError(t, err) require_Equal(t, state.Delivered.Consumer, 1) @@ -4753,7 +4743,7 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) { } snap, err := o.node.(*raft).loadLastSnapshot() require_NoError(t, err) - validateStreamState(snap) + validateConsumerState(snap) // Simulate a message being delivered, but not calling Applied yet. err = o.store.UpdateDelivered(2, 2, 1, time.Now().UnixNano()) @@ -4768,5 +4758,5 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) { // Validate the snapshot is the same as before. snap, err = o.node.(*raft).loadLastSnapshot() require_NoError(t, err) - validateStreamState(snap) + validateConsumerState(snap) }