Skip to content

Commit

Permalink
De-flake don't InstallSnapshot on stop tests (#6156)
Browse files Browse the repository at this point in the history
Example test failure:
```
=== RUN   TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer
    jetstream_cluster_4_test.go:4756: require uint64 equal, but got: 4 != 3
--- FAIL: TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer (1.21s)
```

Explicitly set the expected applied value for the stream/consumer.
Instead of having it be dynamically determined, which could flake.

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
derekcollison authored Nov 21, 2024
2 parents 6c744ba + db109af commit f66c73d
Showing 1 changed file with 13 additions and 23 deletions.
36 changes: 13 additions & 23 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4605,9 +4605,9 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
require_NoError(t, err)

// Wait for all servers to have applied everything.
var maxApplied uint64
// Expect 2: EntryPeerState, streamMsgOp
expectedApplied := uint64(2)
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
maxApplied = 0
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
Expand All @@ -4617,13 +4617,8 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
if err != nil {
return err
}
_, _, applied := mset.node.Progress()
if maxApplied == 0 {
maxApplied = applied
} else if applied < maxApplied {
return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied)
} else if applied > maxApplied {
return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied)
if _, _, applied := mset.node.Progress(); applied != expectedApplied {
return fmt.Errorf("applied doesn't match, expected %d, got %d", expectedApplied, applied)
}
}
return nil
Expand All @@ -4641,7 +4636,7 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
// Validate the snapshot reflects applied.
validateStreamState := func(snap *snapshot) {
t.Helper()
require_Equal(t, snap.lastIndex, maxApplied)
require_Equal(t, snap.lastIndex, expectedApplied)
ss, err := DecodeStreamState(snap.data)
require_NoError(t, err)
require_Equal(t, ss.FirstSeq, 1)
Expand Down Expand Up @@ -4701,9 +4696,9 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) {
require_NoError(t, err)

// Wait for all servers to have applied everything.
var maxApplied uint64
// Expect 4: addPendingRequest, removePendingRequest, updateDeliveredOp, updateAcksOp
expectedApplied := uint64(4)
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
maxApplied = 0
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
Expand All @@ -4717,13 +4712,8 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) {
if o == nil {
return errors.New("consumer not found")
}
_, _, applied := o.node.Progress()
if maxApplied == 0 {
maxApplied = applied
} else if applied < maxApplied {
return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied)
} else if applied > maxApplied {
return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied)
if _, _, applied := o.node.Progress(); applied != expectedApplied {
return fmt.Errorf("applied doesn't match, expected %d, got %d", expectedApplied, applied)
}
}
return nil
Expand All @@ -4743,17 +4733,17 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) {
require_NoError(t, err)

// Validate the snapshot reflects applied.
validateStreamState := func(snap *snapshot) {
validateConsumerState := func(snap *snapshot) {
t.Helper()
require_Equal(t, snap.lastIndex, maxApplied)
require_Equal(t, snap.lastIndex, expectedApplied)
state, err := decodeConsumerState(snap.data)
require_NoError(t, err)
require_Equal(t, state.Delivered.Consumer, 1)
require_Equal(t, state.Delivered.Stream, 1)
}
snap, err := o.node.(*raft).loadLastSnapshot()
require_NoError(t, err)
validateStreamState(snap)
validateConsumerState(snap)

// Simulate a message being delivered, but not calling Applied yet.
err = o.store.UpdateDelivered(2, 2, 1, time.Now().UnixNano())
Expand All @@ -4768,5 +4758,5 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) {
// Validate the snapshot is the same as before.
snap, err = o.node.(*raft).loadLastSnapshot()
require_NoError(t, err)
validateStreamState(snap)
validateConsumerState(snap)
}

0 comments on commit f66c73d

Please sign in to comment.