Skip to content

Commit

Permalink
Merge #34548
Browse files Browse the repository at this point in the history
34548: storage: apply lease change side-effects on snapshot recipients r=nvanbenschoten a=nvanbenschoten

Fixes #34025.
Fixes #33624.
Fixes #33335.
Fixes #33151.
Fixes #33149.
Fixes #34159.
Fixes #34293.
Fixes #32813.
Fixes #30886.
Fixes #34228.
Fixes #34321.

It is rare but possible for a replica to become a leaseholder but not learn about this until it applies a snapshot. Immediately upon the snapshot application's `ReplicaState` update, the replica will begin operating as a standard leaseholder.

Before this change, leases acquired in this way would not trigger in-memory side-effects to be performed. This could result in a regression in the new leaseholder's timestamp cache compared to the previous leaseholder's cache, allowing write-skew like we saw in #34025. This could presumably result in other anomalies as well, because all of the steps in `leasePostApply` were skipped (as theorized by #34025 (comment)).

This PR fixes this bug by detecting lease updates when applying snapshots and making sure to react correctly to them. It also likely fixes the referenced issue. The new test demonstrates that without this fix, the serializable violation speculated about in the issue was possible.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Feb 5, 2019
2 parents fc72c94 + 2bfea76 commit 8202611
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 13 deletions.
134 changes: 134 additions & 0 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1822,3 +1822,137 @@ func TestClearRange(t *testing.T) {
verifyKeysWithPrefix(keys.LocalStoreSuggestedCompactionsMin,
[]roachpb.Key{keys.StoreSuggestedCompactionKey(lg1, lg3)})
}

// TestLeaseTransferInSnapshotUpdatesTimestampCache prevents a regression of
// #34025. A Replica is targeted for a lease transfer target when it needs a
// Raft snapshot to catch up. Normally we try to prevent this case, but it is
// possible and hard to prevent entirely. The Replica will only learn that it is
// the new leaseholder when it applies the snapshot. When doing so, it should
// make sure to apply the lease-related side-effects to its in-memory state.
func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
sc := storage.TestStoreConfig(nil)
// We'll control replication by hand.
sc.TestingKnobs.DisableReplicateQueue = true
// Avoid fighting with the merge queue while trying to reproduce this race.
sc.TestingKnobs.DisableMergeQueue = true
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 3)
store2 := mtc.Store(2)

keyA := roachpb.Key("a")
keyB := roachpb.Key("b")
keyC := roachpb.Key("c")

// First, do a couple of writes; we'll use these to determine when
// the dust has settled.
incA := incrementArgs(keyA, 1)
if _, pErr := client.SendWrapped(ctx, mtc.stores[0].TestSender(), incA); pErr != nil {
t.Fatal(pErr)
}
incC := incrementArgs(keyC, 2)
if _, pErr := client.SendWrapped(ctx, mtc.stores[0].TestSender(), incC); pErr != nil {
t.Fatal(pErr)
}

// Split the system range from the rest of the keyspace.
splitArgs := adminSplitArgs(keys.SystemMax)
if _, pErr := client.SendWrapped(ctx, mtc.stores[0].TestSender(), splitArgs); pErr != nil {
t.Fatal(pErr)
}

// Get the range's ID.
repl0 := mtc.stores[0].LookupReplica(roachpb.RKey(keyA))
rangeID := repl0.RangeID

// Replicate the range onto nodes 1 and 2.
// Wait for all replicas to be caught up.
mtc.replicateRange(rangeID, 1, 2)
mtc.waitForValues(keyA, []int64{1, 1, 1})
mtc.waitForValues(keyC, []int64{2, 2, 2})

// Create a transaction that will try to write "under" a served read.
// The read will have been served by the original leaseholder (node 0)
// and the write will be attempted on the new leaseholder (node 2).
// It should not succeed because it should run into the timestamp cache.
db := mtc.dbs[0]
txnOld := client.NewTxn(ctx, db, 0 /* gatewayNodeID */, client.RootTxn)

// Perform a write with txnOld so that its timestamp gets set and so
// that it writes its transaction record.
if err := txnOld.EagerRecord(); err != nil {
t.Fatal(err)
}
if _, err := txnOld.Inc(ctx, keyB, 3); err != nil {
t.Fatal(err)
}

// Read keyC with txnOld, which is updated below. This prevents the
// transaction from refreshing when it hits the serializable error.
if _, err := txnOld.Get(ctx, keyC); err != nil {
t.Fatal(err)
}

// Another client comes along at a higher timestamp and reads. We should
// never be able to write under this time or we would be rewriting history.
if _, err := db.Get(ctx, keyA); err != nil {
t.Fatal(err)
}

// Partition node 2 from the rest of its range. Once partitioned, perform
// another write and truncate the Raft log on the two connected nodes. This
// ensures that that when node 2 comes back up it will require a snapshot
// from Raft.
mtc.transport.Listen(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: rangeID,
RaftMessageHandler: store2,
})

if _, pErr := client.SendWrapped(ctx, mtc.stores[0].TestSender(), incC); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(keyC, []int64{4, 4, 2})

// Truncate the log at index+1 (log entries < N are removed, so this
// includes the increment). This necessitates a snapshot when the
// partitioned replica rejoins the rest of the range.
index, err := repl0.GetLastIndex()
if err != nil {
t.Fatal(err)
}
truncArgs := truncateLogArgs(index+1, rangeID)
truncArgs.Key = keyA
if _, err := client.SendWrapped(ctx, mtc.stores[0].TestSender(), truncArgs); err != nil {
t.Fatal(err)
}

// Finally, transfer the lease to node 2 while it is still unavailable and
// behind. We try to avoid this case when picking new leaseholders in practice,
// but we're never 100% successful.
if err := repl0.AdminTransferLease(ctx, store2.Ident.StoreID); err != nil {
t.Fatal(err)
}

// Remove the partition. A snapshot to node 2 should follow. This snapshot
// will inform node 2 that it is the new leaseholder for the range. Node 2
// should act accordingly and update its internal state to reflect this.
mtc.transport.Listen(store2.Ident.StoreID, store2)
mtc.waitForValues(keyC, []int64{4, 4, 4})

// Perform a write on the new leaseholder underneath the previously served
// read. This write should hit the timestamp cache and flag the txn for a
// restart when we try to commit it below. With the bug in #34025, the new
// leaseholder who heard about the lease transfer from a snapshot had an
// empty timestamp cache and would simply let us write under the previous
// read.
if _, err := txnOld.Inc(ctx, keyA, 4); err != nil {
t.Fatal(err)
}
const exp = `TransactionRetryError: retry txn \(RETRY_SERIALIZABLE\)`
if err := txnOld.Commit(ctx); !testutils.IsError(err, exp) {
t.Fatalf("expected retry error, got: %v; did we write under a read?", err)
}
}
23 changes: 16 additions & 7 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,22 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc storagepb.Com
}
}

// leasePostApply is called when a RequestLease or TransferLease
// request is executed for a range.
func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) {
// leasePostApply updates the Replica's internal state to reflect the
// application of a new Range lease. The method is idempotent, so it can be
// called repeatedly for the same lease safely. However, the method will panic
// if passed a lease with a lower sequence number than the current lease. By
// default, the method will also panic if passed a lease that indicates a
// forward sequence number jump (i.e. a skipped lease). This behavior can
// be disabled by passing permitJump as true.
func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, permitJump bool) {
r.mu.Lock()
replicaID := r.mu.replicaID
// Pull out the last lease known to this Replica. It's possible that this is
// not actually the last lease in the Range's lease sequence because the
// Replica may have missed the application of a lease between prevLease and
// newLease. However, this should only be possible if a snapshot includes a
// lease update. All other forms of lease updates should be continuous
// without jumps (see permitJump).
prevLease := *r.mu.state.Lease
r.mu.Unlock()

Expand Down Expand Up @@ -267,9 +278,7 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) {
}
case s2 == s1+1:
// Lease sequence incremented by 1. Expected case.
case s2 > s1+1:
// Snapshots will never call leasePostApply, so we always expect
// leases to increment one at a time here.
case s2 > s1+1 && !permitJump:
log.Fatalf(ctx, "lease sequence jump, prevLease=%s, newLease=%s",
log.Safe(prevLease), log.Safe(newLease))
}
Expand Down Expand Up @@ -634,7 +643,7 @@ func (r *Replica) handleReplicatedEvalResult(
}

if newLease := rResult.State.Lease; newLease != nil {
r.leasePostApply(ctx, *newLease)
r.leasePostApply(ctx, *newLease, false /* permitJump */)
rResult.State.Lease = nil
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,12 @@ func (r *Replica) applySnapshot(
}
r.store.mu.Unlock()

// Invoke the leasePostApply method to ensure we properly initialize the
// replica according to whether it holds the lease. We allow jumps in the
// lease sequence because there may be multiple lease changes accounted for
// in the snapshot.
r.leasePostApply(ctx, *s.Lease, true /* permitJump */)

r.mu.Lock()
// We set the persisted last index to the last applied index. This is
// not a correctness issue, but means that we may have just transferred
Expand All @@ -967,7 +973,8 @@ func (r *Replica) applySnapshot(
r.store.metrics.subtractMVCCStats(*r.mu.state.Stats)
r.store.metrics.addMVCCStats(*s.Stats)
// Update the rest of the Raft state. Changes to r.mu.state.Desc must be
// managed by r.setDesc, but we called that above, so now it's safe to
// managed by r.setDesc and changes to r.mu.state.Lease must be handled
// by r.leasePostApply, but we called those above, so now it's safe to
// wholesale replace r.mu.state.
r.mu.state = s
r.assertStateLocked(ctx, r.store.Engine())
Expand Down
11 changes: 7 additions & 4 deletions pkg/storage/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,9 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) {
assertRangefeedRetryErr(t, pErrRight, roachpb.RangeFeedRetryError_REASON_RANGE_MERGED)
})
t.Run(roachpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT.String(), func(t *testing.T) {
const partitionStore = 2
mtc, rangeID := setup(t)
defer mtc.Stop()
partitionStore := mtc.Store(2)

mtc.stores[0].SetReplicaGCQueueActive(false)
mtc.stores[1].SetReplicaGCQueueActive(false)
Expand All @@ -493,15 +493,18 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) {
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")},
}

pErr := mtc.Store(partitionStore).RangeFeed(ctx, &req, stream)
pErr := partitionStore.RangeFeed(ctx, &req, stream)
streamErrC <- pErr
}()

// Wait for the first checkpoint event.
waitForInitialCheckpoint(t, stream, streamErrC)

// Partition the replica from the rest of its range.
mtc.transport.GetCircuitBreaker(mtc.idents[partitionStore].NodeID).Break()
mtc.transport.Listen(partitionStore.Ident.StoreID, &unreliableRaftHandler{
rangeID: rangeID,
RaftMessageHandler: partitionStore,
})

// Perform a write on the range.
pArgs := putArgs(roachpb.Key("c"), []byte("val2"))
Expand Down Expand Up @@ -529,7 +532,7 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) {
}

// Remove the partition. Snapshot should follow.
mtc.transport.GetCircuitBreaker(mtc.idents[partitionStore].NodeID).Reset()
mtc.transport.Listen(partitionStore.Ident.StoreID, partitionStore)

// Check the error.
pErr := <-streamErrC
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2312,7 +2312,7 @@ func splitPostApply(
// Invoke the leasePostApply method to ensure we properly initialize
// the replica according to whether it holds the lease. This enables
// the txnWaitQueue.
rightRng.leasePostApply(ctx, rightLease)
rightRng.leasePostApply(ctx, rightLease, false /* permitJump */)

// Add the RHS replica to the store. This step atomically updates
// the EndKey of the LHS replica and also adds the RHS replica
Expand Down

0 comments on commit 8202611

Please sign in to comment.