Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: trim state used from snapshots #72314

Merged
merged 1 commit into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func mergeCheckingTimestampCaches(

// Install a filter to capture the Raft snapshot.
snapshotFilter = func(inSnap kvserver.IncomingSnapshot) {
if inSnap.State.Desc.RangeID == lhsDesc.RangeID {
if inSnap.Desc.RangeID == lhsDesc.RangeID {
snapChan <- inSnap
}
}
Expand Down Expand Up @@ -809,7 +809,7 @@ func mergeCheckingTimestampCaches(
case <-time.After(45 * time.Second):
t.Fatal("timed out waiting for snapChan")
}
inSnapDesc := inSnap.State.Desc
inSnapDesc := inSnap.Desc
require.Equal(t, lhsDesc.StartKey, inSnapDesc.StartKey)
require.Equal(t, rhsDesc.EndKey, inSnapDesc.EndKey)

Expand Down Expand Up @@ -3731,7 +3731,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
// on in the test. This function verifies that the subsumed replicas have
// been handled properly.
if snapType != kvserver.SnapshotRequest_VIA_SNAPSHOT_QUEUE ||
inSnap.State.Desc.RangeID != rangeIds[string(keyA)] {
inSnap.Desc.RangeID != rangeIds[string(keyA)] {
return nil
}

Expand Down Expand Up @@ -3779,8 +3779,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {

// Construct SSTs for the the first 4 bullets as numbered above, but only
// ultimately keep the last one.
keyRanges := rditer.MakeReplicatedKeyRanges(inSnap.State.Desc)
it := rditer.NewReplicaEngineDataIterator(inSnap.State.Desc, sendingEng, true /* replicatedOnly */)
keyRanges := rditer.MakeReplicatedKeyRanges(inSnap.Desc)
it := rditer.NewReplicaEngineDataIterator(inSnap.Desc, sendingEng, true /* replicatedOnly */)
defer it.Close()
// Write a range deletion tombstone to each of the SSTs then put in the
// kv entries from the sender of the snapshot.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3648,7 +3648,7 @@ func TestTenantID(t *testing.T) {
request_type kvserver.SnapshotRequest_Type,
strings []string,
) error {
if snapshot.State.Desc.RangeID == repl.RangeID {
if snapshot.Desc.RangeID == repl.RangeID {
select {
case sawSnapshot <- struct{}{}:
default:
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
ctx context.Context, inSnap IncomingSnapshot,
) (_ handleRaftReadyStats, _ string, foo error) {
var stats handleRaftReadyStats
if inSnap.State != nil {
if inSnap.Desc != nil {
stats.snap.offered = true
}

Expand Down Expand Up @@ -584,7 +584,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
leaderID = roachpb.ReplicaID(rd.SoftState.Lead)
}

if inSnap.State != nil {
if inSnap.Desc != nil {
if !raft.IsEmptySnap(rd.Snapshot) {
snapUUID, err := uuid.FromBytes(rd.Snapshot.Data)
if err != nil {
Expand Down Expand Up @@ -1719,7 +1719,7 @@ func (r *Replica) maybeAcquireSnapshotMergeLock(
// installed a placeholder for snapshot's keyspace. No merge lock needed.
return nil, func() {}
}
for endKey.Less(inSnap.State.Desc.EndKey) {
for endKey.Less(inSnap.Desc.EndKey) {
sRepl := r.store.LookupReplica(endKey)
if sRepl == nil || !endKey.Equal(sRepl.Desc().StartKey) {
log.Fatalf(ctx, "snapshot widens existing replica, but no replica exists for subsumed key %s", endKey)
Expand Down
56 changes: 33 additions & 23 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,11 @@ type IncomingSnapshot struct {
SnapUUID uuid.UUID
// The storage interface for the underlying SSTs.
SSTStorageScratch *SSTSnapshotStorageScratch
// The replica state at the time the snapshot was generated (never nil).
State *kvserverpb.ReplicaState
snapType SnapshotRequest_Type
placeholder *ReplicaPlaceholder
// The descriptor in the snapshot, never nil.
Desc *roachpb.RangeDescriptor
snapType SnapshotRequest_Type
placeholder *ReplicaPlaceholder
raftAppliedIndex uint64 // logging only
}

func (s *IncomingSnapshot) String() string {
Expand All @@ -527,10 +528,10 @@ func (s *IncomingSnapshot) String() string {

// SafeFormat implements the redact.SafeFormatter interface.
func (s *IncomingSnapshot) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("%s snapshot %s at applied index %d", s.snapType, s.SnapUUID.Short(), s.State.RaftAppliedIndex)
w.Printf("%s snapshot %s at applied index %d", s.snapType, s.SnapUUID.Short(), s.raftAppliedIndex)
}

// snapshot creates an OutgoingSnapshot containing a rocksdb snapshot for the
// snapshot creates an OutgoingSnapshot containing a pebble snapshot for the
// given range. Note that snapshot() is called without Replica.raftMu held.
func snapshot(
ctx context.Context,
Expand Down Expand Up @@ -758,9 +759,9 @@ func (r *Replica) applySnapshot(
hs raftpb.HardState,
subsumedRepls []*Replica,
) (err error) {
s := *inSnap.State
if s.Desc.RangeID != r.RangeID {
log.Fatalf(ctx, "unexpected range ID %d", s.Desc.RangeID)
desc := inSnap.Desc
if desc.RangeID != r.RangeID {
log.Fatalf(ctx, "unexpected range ID %d", desc.RangeID)
}

isInitialSnap := !r.IsInitialized()
Expand Down Expand Up @@ -852,7 +853,11 @@ func (r *Replica) applySnapshot(
r.store.raftEntryCache.Drop(r.RangeID)

if err := r.raftMu.stateLoader.SetRaftTruncatedState(
ctx, &unreplicatedSST, s.TruncatedState,
ctx, &unreplicatedSST,
&roachpb.RaftTruncatedState{
Index: nonemptySnap.Metadata.Index,
Term: nonemptySnap.Metadata.Term,
},
); err != nil {
return errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
}
Expand All @@ -868,19 +873,14 @@ func (r *Replica) applySnapshot(
}
}

if s.RaftAppliedIndex != nonemptySnap.Metadata.Index {
log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d",
s.RaftAppliedIndex, nonemptySnap.Metadata.Index)
}

// If we're subsuming a replica below, we don't have its last NextReplicaID,
// nor can we obtain it. That's OK: we can just be conservative and use the
// maximum possible replica ID. preDestroyRaftMuLocked will write a replica
// tombstone using this maximum possible replica ID, which would normally be
// problematic, as it would prevent this store from ever having a new replica
// of the removed range. In this case, however, it's copacetic, as subsumed
// ranges _can't_ have new replicas.
if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSTStorageScratch, s.Desc, subsumedRepls, mergedTombstoneReplicaID); err != nil {
if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSTStorageScratch, desc, subsumedRepls, mergedTombstoneReplicaID); err != nil {
return err
}
stats.subsumedReplicas = timeutil.Now()
Expand All @@ -896,6 +896,16 @@ func (r *Replica) applySnapshot(
}
stats.ingestion = timeutil.Now()

state, err := stateloader.Make(desc.RangeID).Load(ctx, r.store.engine, desc)
if err != nil {
log.Fatalf(ctx, "unable to load replica state: %s", err)
}

if state.RaftAppliedIndex != nonemptySnap.Metadata.Index {
log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d",
state.RaftAppliedIndex, nonemptySnap.Metadata.Index)
}

// The on-disk state is now committed, but the corresponding in-memory state
// has not yet been updated. Any errors past this point must therefore be
// treated as fatal.
Expand Down Expand Up @@ -926,7 +936,7 @@ func (r *Replica) applySnapshot(
log.Fatalf(ctx, "unable to remove placeholder: %s", err)
}
}
r.setDescLockedRaftMuLocked(ctx, s.Desc)
r.setDescLockedRaftMuLocked(ctx, desc)
if err := r.store.maybeMarkReplicaInitializedLockedReplLocked(ctx, r); err != nil {
log.Fatalf(ctx, "unable to mark replica initialized while applying snapshot: %+v", err)
}
Expand All @@ -942,18 +952,18 @@ func (r *Replica) applySnapshot(
// performance implications are not likely to be drastic. If our
// feelings about this ever change, we can add a LastIndex field to
// raftpb.SnapshotMetadata.
r.mu.lastIndex = s.RaftAppliedIndex
r.mu.lastIndex = state.RaftAppliedIndex
r.mu.lastTerm = invalidLastTerm
r.mu.raftLogSize = 0
// Update the store stats for the data in the snapshot.
r.store.metrics.subtractMVCCStats(ctx, r.mu.tenantID, *r.mu.state.Stats)
r.store.metrics.addMVCCStats(ctx, r.mu.tenantID, *s.Stats)
r.store.metrics.addMVCCStats(ctx, r.mu.tenantID, *state.Stats)
lastKnownLease := r.mu.state.Lease
// Update the rest of the Raft state. Changes to r.mu.state.Desc must be
// managed by r.setDescRaftMuLocked and changes to r.mu.state.Lease must be handled
// by r.leasePostApply, but we called those above, so now it's safe to
// wholesale replace r.mu.state.
r.mu.state = s
r.mu.state = state
// Snapshots typically have fewer log entries than the leaseholder. The next
// time we hold the lease, recompute the log size before making decisions.
r.mu.raftLogSizeTrusted = false
Expand All @@ -962,13 +972,13 @@ func (r *Replica) applySnapshot(
// replica according to whether it holds the lease. We allow jumps in the
// lease sequence because there may be multiple lease changes accounted for
// in the snapshot.
r.leasePostApplyLocked(ctx, lastKnownLease, s.Lease /* newLease */, prioReadSum, allowLeaseJump)
r.leasePostApplyLocked(ctx, lastKnownLease, state.Lease /* newLease */, prioReadSum, allowLeaseJump)

// Similarly, if we subsumed any replicas through the snapshot (meaning that
// we missed the application of a merge) and we are the new leaseholder, we
// make sure to update the timestamp cache using the prior read summary to
// account for any reads that were served on the right-hand side range(s).
if len(subsumedRepls) > 0 && s.Lease.Replica.ReplicaID == r.mu.replicaID && prioReadSum != nil {
if len(subsumedRepls) > 0 && state.Lease.Replica.ReplicaID == r.mu.replicaID && prioReadSum != nil {
applyReadSummaryToTimestampCache(r.store.tsCache, r.descRLocked(), *prioReadSum)
}

Expand All @@ -995,7 +1005,7 @@ func (r *Replica) applySnapshot(
// Update the replica's cached byte thresholds. This is a no-op if the system
// config is not available, in which case we rely on the next gossip update
// to perform the update.
if err := r.updateRangeInfo(ctx, s.Desc); err != nil {
if err := r.updateRangeInfo(ctx, desc); err != nil {
log.Fatalf(ctx, "unable to update range info while applying snapshot: %+v", err)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,9 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
inSnap := IncomingSnapshot{
SnapUUID: snapUUID,
SSTStorageScratch: kvSS.scratch,
State: &header.State,
Desc: header.State.Desc,
snapType: header.Type,
raftAppliedIndex: header.State.RaftAppliedIndex,
}

kvSS.status = fmt.Sprintf("ssts: %d", len(kvSS.scratch.SSTs()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2817,7 +2817,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) {
if err := s.processRaftSnapshotRequest(ctx, req,
IncomingSnapshot{
SnapUUID: uuid.MakeV4(),
State: &kvserverpb.ReplicaState{Desc: repl1.Desc()},
Desc: repl1.Desc(),
placeholder: placeholder,
},
); err != nil {
Expand Down