Skip to content

Commit

Permalink
[DNM] kv: use write batch for small Raft snapshots
Browse files Browse the repository at this point in the history
This was the original intention with the `SnapshotRequest_Strategy`
structure and was prototyped in cockroachdb#25134, but we never pushed it over
the finish line because we did not find cases where SST ingestion
was disruptive enough to warrant the extra complexity. cockroachdb#62700 tells
a different story.
  • Loading branch information
nvanbenschoten committed Apr 2, 2021
1 parent 3f70efc commit 180cdcd
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 135 deletions.
299 changes: 194 additions & 105 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ type IncomingSnapshot struct {
SnapUUID uuid.UUID
// The storage interface for the underlying SSTs.
SSTStorageScratch *SSTSnapshotStorageScratch
// Non-nil for small snapshots.
Batch storage.Batch
// The Raft log entries for this snapshot.
LogEntries [][]byte
// The replica state at the time the snapshot was generated (never nil).
Expand Down Expand Up @@ -835,77 +837,128 @@ func (r *Replica) applySnapshot(
stats.subsumedReplicas.Sub(start).Seconds()*1000,
)
}
ingestionLog := fmt.Sprintf(
"ingestion=%d@%0.0fms ",
len(inSnap.SSTStorageScratch.SSTs()),
stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000,
)
var ingestionLog string
if inSnap.SSTStorageScratch != nil {
ingestionLog = fmt.Sprintf(
"ingestion=%d@%0.0fms ",
len(inSnap.SSTStorageScratch.SSTs()),
stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000,
)
}
log.Infof(
ctx, "applied snapshot of type %s [%s%s%sid=%s index=%d]", inSnap.snapType, totalLog,
subsumedReplicasLog, ingestionLog, inSnap.SnapUUID.Short(), snap.Metadata.Index,
)
}(timeutil.Now())

unreplicatedSSTFile := &storage.MemFile{}
unreplicatedSST := storage.MakeIngestionSSTWriter(unreplicatedSSTFile)
defer unreplicatedSST.Close()

// Clearing the unreplicated state.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
if err = unreplicatedSST.ClearRawRange(unreplicatedStart, unreplicatedEnd); err != nil {
return errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// Update HardState.
if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}

// Update Raft entries.
var lastTerm uint64
var raftLogSize int64
if len(inSnap.LogEntries) > 0 {
logEntries := make([]raftpb.Entry, len(inSnap.LogEntries))
for i, bytes := range inSnap.LogEntries {
if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil {
if inSnap.SSTStorageScratch != nil {
unreplicatedSSTFile := &storage.MemFile{}
unreplicatedSST := storage.MakeIngestionSSTWriter(unreplicatedSSTFile)
defer unreplicatedSST.Close()

// Clearing the unreplicated state.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
if err = unreplicatedSST.ClearRawRange(unreplicatedStart, unreplicatedEnd); err != nil {
return errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// Update HardState.
if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}

// Update Raft entries.
if len(inSnap.LogEntries) > 0 {
logEntries := make([]raftpb.Entry, len(inSnap.LogEntries))
for i, bytes := range inSnap.LogEntries {
if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil {
return err
}
}
var sideloadedEntriesSize int64
var err error
logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries)
if err != nil {
return err
}
raftLogSize += sideloadedEntriesSize
_, lastTerm, raftLogSize, err = r.append(ctx, &unreplicatedSST, 0, invalidLastTerm, raftLogSize, logEntries)
if err != nil {
return err
}
} else {
lastTerm = invalidLastTerm
}
var sideloadedEntriesSize int64
var err error
logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries)
if err != nil {
return err
r.store.raftEntryCache.Drop(r.RangeID)

// Update TruncatedState if it is unreplicated.
if inSnap.UsesUnreplicatedTruncatedState {
if err := r.raftMu.stateLoader.SetRaftTruncatedState(
ctx, &unreplicatedSST, s.TruncatedState,
); err != nil {
return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer")
}
}
raftLogSize += sideloadedEntriesSize
_, lastTerm, raftLogSize, err = r.append(ctx, &unreplicatedSST, 0, invalidLastTerm, raftLogSize, logEntries)
if err != nil {

if err := unreplicatedSST.Finish(); err != nil {
return err
}
if unreplicatedSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in unreplicatedSST rather than
// buffering in a MemFile first.
if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil {
return err
}
}
} else {
lastTerm = invalidLastTerm
}
r.store.raftEntryCache.Drop(r.RangeID)

// Update TruncatedState if it is unreplicated.
if inSnap.UsesUnreplicatedTruncatedState {
if err := r.raftMu.stateLoader.SetRaftTruncatedState(
ctx, &unreplicatedSST, s.TruncatedState,
); err != nil {
return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer")
// Clearing the unreplicated state.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
if err = inSnap.Batch.ClearRawRange(unreplicatedStart, unreplicatedEnd); err != nil {
return errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}
}

if err := unreplicatedSST.Finish(); err != nil {
return err
}
if unreplicatedSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in unreplicatedSST rather than
// buffering in a MemFile first.
if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil {
return err
// Update HardState.
if err := r.raftMu.stateLoader.SetHardState(ctx, inSnap.Batch, hs); err != nil {
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}

// Update Raft entries.
if len(inSnap.LogEntries) > 0 {
logEntries := make([]raftpb.Entry, len(inSnap.LogEntries))
for i, bytes := range inSnap.LogEntries {
if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil {
return err
}
}
var sideloadedEntriesSize int64
var err error
logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries)
if err != nil {
return err
}
raftLogSize += sideloadedEntriesSize
_, lastTerm, raftLogSize, err = r.append(ctx, inSnap.Batch, 0, invalidLastTerm, raftLogSize, logEntries)
if err != nil {
return err
}
} else {
lastTerm = invalidLastTerm
}
r.store.raftEntryCache.Drop(r.RangeID)

// Update TruncatedState if it is unreplicated.
if inSnap.UsesUnreplicatedTruncatedState {
if err := r.raftMu.stateLoader.SetRaftTruncatedState(
ctx, inSnap.Batch, s.TruncatedState,
); err != nil {
return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer")
}
}
}

Expand Down Expand Up @@ -941,19 +994,30 @@ func (r *Replica) applySnapshot(
// problematic, as it would prevent this store from ever having a new replica
// of the removed range. In this case, however, it's copacetic, as subsumed
// ranges _can't_ have new replicas.
if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSTStorageScratch, s.Desc, subsumedRepls, mergedTombstoneReplicaID); err != nil {
if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSTStorageScratch, inSnap.Batch, s.Desc, subsumedRepls, mergedTombstoneReplicaID); err != nil {
return err
}
stats.subsumedReplicas = timeutil.Now()

// Ingest all SSTs atomically.
if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil {
if err := fn(inSnap, inSnap.snapType, inSnap.SSTStorageScratch.SSTs()); err != nil {
return err
if inSnap.SSTStorageScratch != nil {
if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil {
if err := fn(inSnap, inSnap.snapType, inSnap.SSTStorageScratch.SSTs()); err != nil {
return err
}
}
if err := r.store.engine.IngestExternalFiles(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
}
} else {
if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil {
if err := fn(inSnap, inSnap.snapType, nil); err != nil {
return err
}
}
if err := inSnap.Batch.Commit(true /* sync */); err != nil {
return errors.Wrapf(err, "while ingesting batch")
}
}
if err := r.store.engine.IngestExternalFiles(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
}
stats.ingestion = timeutil.Now()

Expand Down Expand Up @@ -1068,6 +1132,7 @@ func (r *Replica) applySnapshot(
func (r *Replica) clearSubsumedReplicaDiskData(
ctx context.Context,
scratch *SSTSnapshotStorageScratch,
batch storage.Batch,
desc *roachpb.RangeDescriptor,
subsumedRepls []*Replica,
subsumedNextReplicaID roachpb.ReplicaID,
Expand All @@ -1087,31 +1152,44 @@ func (r *Replica) clearSubsumedReplicaDiskData(
destroyReasonRemoved)
sr.mu.Unlock()

// We have to create an SST for the subsumed replica's range-id local keys.
subsumedReplSSTFile := &storage.MemFile{}
subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile)
defer subsumedReplSST.Close()
// NOTE: We set mustClearRange to true because we are setting
// RangeTombstoneKey. Since Clears and Puts need to be done in increasing
// order of keys, it is not safe to use ClearRangeIter.
if err := sr.preDestroyRaftMuLocked(
ctx,
r.store.Engine(),
&subsumedReplSST,
subsumedNextReplicaID,
true, /* clearRangeIDLocalOnly */
true, /* mustClearRange */
); err != nil {
subsumedReplSST.Close()
return err
}
if err := subsumedReplSST.Finish(); err != nil {
return err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemFile first.
if err := scratch.WriteSST(ctx, subsumedReplSSTFile.Data()); err != nil {
if scratch != nil {
// We have to create an SST for the subsumed replica's range-id local keys.
subsumedReplSSTFile := &storage.MemFile{}
subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile)
defer subsumedReplSST.Close()
// NOTE: We set mustClearRange to true because we are setting
// RangeTombstoneKey. Since Clears and Puts need to be done in increasing
// order of keys, it is not safe to use ClearRangeIter.
if err := sr.preDestroyRaftMuLocked(
ctx,
r.store.Engine(),
&subsumedReplSST,
subsumedNextReplicaID,
true, /* clearRangeIDLocalOnly */
true, /* mustClearRange */
); err != nil {
subsumedReplSST.Close()
return err
}
if err := subsumedReplSST.Finish(); err != nil {
return err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemFile first.
if err := scratch.WriteSST(ctx, subsumedReplSSTFile.Data()); err != nil {
return err
}
}
} else {
if err := sr.preDestroyRaftMuLocked(
ctx,
r.store.Engine(),
batch,
subsumedNextReplicaID,
true, /* clearRangeIDLocalOnly */
true, /* mustClearRange */
); err != nil {
return err
}
}
Expand Down Expand Up @@ -1144,25 +1222,36 @@ func (r *Replica) clearSubsumedReplicaDiskData(
// subsume both r1 and r2 in S1.
for i := range keyRanges {
if totalKeyRanges[i].End.Key.Compare(keyRanges[i].End.Key) > 0 {
subsumedReplSSTFile := &storage.MemFile{}
subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile)
defer subsumedReplSST.Close()
if err := storage.ClearRangeWithHeuristic(
r.store.Engine(),
&subsumedReplSST,
keyRanges[i].End.Key,
totalKeyRanges[i].End.Key,
); err != nil {
subsumedReplSST.Close()
return err
}
if err := subsumedReplSST.Finish(); err != nil {
return err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemFile first.
if err := scratch.WriteSST(ctx, subsumedReplSSTFile.Data()); err != nil {
if scratch != nil {
subsumedReplSSTFile := &storage.MemFile{}
subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile)
defer subsumedReplSST.Close()
if err := storage.ClearRangeWithHeuristic(
r.store.Engine(),
&subsumedReplSST,
keyRanges[i].End.Key,
totalKeyRanges[i].End.Key,
); err != nil {
subsumedReplSST.Close()
return err
}
if err := subsumedReplSST.Finish(); err != nil {
return err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemFile first.
if err := scratch.WriteSST(ctx, subsumedReplSSTFile.Data()); err != nil {
return err
}
}
} else {
if err := storage.ClearRangeWithHeuristic(
r.store.Engine(),
batch,
keyRanges[i].End.Key,
totalKeyRanges[i].End.Key,
); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 180cdcd

Please sign in to comment.