From bf282fc978ee0e762a3763cb7e954784bf837694 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 7 Mar 2024 13:06:29 +0000 Subject: [PATCH] storage: add new ImportEpoch DeleteRangePredicate When set, MVCCDeleteRangePredicate will delete any values with ImportEpoch's in their MVCCValueHeader that is >= the given ImportEpoch predicate. Release note: none Epic: None --- pkg/kv/kvpb/api.proto | 4 + pkg/kv/kvserver/batcheval/cmd_delete_range.go | 5 + pkg/storage/mvcc.go | 30 ++- pkg/storage/mvcc_history_test.go | 13 +- pkg/storage/mvcc_stats_test.go | 2 +- .../delete_range_predicate_import_epoch | 180 ++++++++++++++++++ 6 files changed, 230 insertions(+), 4 deletions(-) create mode 100644 pkg/storage/testdata/mvcc_histories/delete_range_predicate_import_epoch diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 4fcedd9f5dab..e260c386d8a4 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -498,6 +498,10 @@ message DeleteRangeRequest { // // To pass DeleteRangePredicates, the client must also pass UseRangeTombstone. message DeleteRangePredicates { + // ImportEpoch specifies that all keys with a non-zero + // MVCCValueHeader.ImportEpoch == ImportEmpoch should be deleted. + uint32 import_epoch = 1; + // StartTime specifies an exclusive lower bound to surface keys // for deletion. If specified, DeleteRange will only issue tombstones to keys // within the span [startKey, endKey) that also have MVCC versions with diff --git a/pkg/kv/kvserver/batcheval/cmd_delete_range.go b/pkg/kv/kvserver/batcheval/cmd_delete_range.go index 159dd0ce4346..966f25a6bb1c 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete_range.go @@ -121,6 +121,11 @@ func DeleteRange( "GCRangeHint must only be used together with UseRangeTombstone") } + if args.Predicates.ImportEpoch > 0 && !args.Predicates.StartTime.IsEmpty() { + return result.Result{}, errors.AssertionFailedf( + "DeleteRangePredicate should not have both non-zero ImportEpoch and non-empty StartTime") + } + // Use MVCC range tombstone if requested. if args.UseRangeTombstone { if cArgs.Header.Txn != nil { diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 79386bbc5a0b..ac230f9105ea 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -2662,6 +2662,7 @@ func mvccPutInternal( versionValue.Value = value versionValue.LocalTimestamp = opts.LocalTimestamp versionValue.OmitInRangefeeds = opts.OmitInRangefeeds + versionValue.ImportEpoch = opts.ImportEpoch if buildutil.CrdbTestBuild { if seq, seqOK := kvnemesisutil.FromContext(ctx); seqOK { @@ -3786,6 +3787,29 @@ func MVCCPredicateDeleteRange( return nil, &kvpb.LockConflictError{Locks: locks} } + var stopRunBasedOnPredicate func(k MVCCKey, iter *MVCCIncrementalIterator) (bool, error) + if predicates.ImportEpoch > 0 { + // TODO(ssd): We will likely eventually want something + // that consturcts our iterator opetions based on the + // predicate so that we can use a block-property + // filter for import epochs. + stopRunBasedOnPredicate = func(k MVCCKey, it *MVCCIncrementalIterator) (bool, error) { + rawV, err := it.UnsafeValue() + if err != nil { + return true, err + } + v, err := DecodeMVCCValue(rawV) + if err != nil { + return true, err + } + return v.ImportEpoch != predicates.ImportEpoch, nil + } + } else { + stopRunBasedOnPredicate = func(k MVCCKey, _ *MVCCIncrementalIterator) (bool, error) { + return k.Timestamp.LessEq(predicates.StartTime), nil + } + } + // continueRun returns three bools: the first is true if the current run // should continue; the second is true if the latest key is a point tombstone; // the third is true if the latest key is a range tombstone. If a non-nil @@ -3837,11 +3861,12 @@ func MVCCPredicateDeleteRange( } // The latest key is a live point key. Conduct predicate filtering. - if k.Timestamp.LessEq(predicates.StartTime) { + if stop, err := stopRunBasedOnPredicate(k, iter); err != nil { + return false, false, false, err + } else if stop { return false, false, false, nil } - // TODO (msbutler): use MVCCValueHeader to match on job ID predicate return true, false, false, nil } @@ -4580,6 +4605,7 @@ type MVCCWriteOptions struct { Stats *enginepb.MVCCStats ReplayWriteTimestampProtection bool OmitInRangefeeds bool + ImportEpoch uint32 // MaxLockConflicts is a maximum number of conflicting locks collected before // returning LockConflictError. Even single-key writes can encounter multiple // conflicting shared locks, so the limit is important to bound the number of diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index d15a6741ef63..dd788983aa34 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -1457,8 +1457,13 @@ func cmdDeleteRangePredicate(e *evalCtx) error { if e.hasArg("maxBytes") { e.scanArg("maxBytes", &maxBytes) } + importEpoch := 0 + if e.hasArg("import_epoch") { + e.scanArg("import_epoch", &importEpoch) + } predicates := kvpb.DeleteRangePredicates{ - StartTime: e.getTsWithName("startTime"), + StartTime: e.getTsWithName("startTime"), + ImportEpoch: uint32(importEpoch), } rangeThreshold := 64 if e.hasArg("rangeThreshold") { @@ -1595,12 +1600,18 @@ func cmdPut(e *evalCtx) error { val.InitChecksum(key) } + importEpoch := 0 + if e.hasArg("import_epoch") { + e.scanArg("import_epoch", &importEpoch) + } + resolve, resolveStatus := e.getResolve() return e.withWriter("put", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ Txn: txn, LocalTimestamp: localTs, + ImportEpoch: uint32(importEpoch), Stats: e.ms, ReplayWriteTimestampProtection: e.getAmbiguousReplay(), MaxLockConflicts: e.getMaxLockConflicts(), diff --git a/pkg/storage/mvcc_stats_test.go b/pkg/storage/mvcc_stats_test.go index e08ed50c54dc..fc505bab5b7b 100644 --- a/pkg/storage/mvcc_stats_test.go +++ b/pkg/storage/mvcc_stats_test.go @@ -2028,7 +2028,7 @@ func TestMVCCStatsRandomized(t *testing.T) { ) } else { rangeTombstoneThreshold := s.rng.Int63n(5) - desc = fmt.Sprintf("mvccPredicateDeleteRange=%s, predicates=%s, rangeTombstoneThreshold=%d", + desc = fmt.Sprintf("mvccPredicateDeleteRange=%s, predicates=%#v rangeTombstoneThreshold=%d", roachpb.Span{Key: mvccRangeDelKey, EndKey: mvccRangeDelEndKey}, predicates, rangeTombstoneThreshold) const maxLockConflicts = 0 // unlimited const targetLockConflictBytes = 0 // unlimited diff --git a/pkg/storage/testdata/mvcc_histories/delete_range_predicate_import_epoch b/pkg/storage/testdata/mvcc_histories/delete_range_predicate_import_epoch new file mode 100644 index 000000000000..b8358b003b34 --- /dev/null +++ b/pkg/storage/testdata/mvcc_histories/delete_range_predicate_import_epoch @@ -0,0 +1,180 @@ +# Tests MVCCPredicateDeleteRange with ImportEpoch predicates +# +# We set up data the represents a timeline like. +# +# Import 1 from t [1,2] +# More writes at t3 +# Import 2 from t [4,5] +# +# T E| +# 5 2| c5 +# 4 2| e4 f4 g4 h4 +# 3 | x d3 +# 2 1| b2 c2 j2 +# 1 1| a1 d1 i1 +# | a b c d e f g h i j + +run stats ok +put k=a ts=1 v=a1 import_epoch=1 +put k=b ts=2 v=b2 import_epoch=1 +put k=c ts=2 v=c2 import_epoch=1 +del k=c ts=3 +put k=c ts=5 v=c5 import_epoch=2 +put k=d ts=1 v=d1 import_epoch=1 +put k=d ts=3 v=d3 +put k=e ts=4 v=e4 import_epoch=2 +put k=f ts=4 v=f4 import_epoch=2 +put k=g ts=4 v=g4 import_epoch=2 +put k=h ts=5 v=h4 import_epoch=2 +put k=i ts=1 v=i1 import_epoch=1 +put k=j ts=2 v=j2 import_epoch=1 +---- +>> put k=a ts=1 v=a1 import_epoch=1 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 +>> put k=b ts=2 v=b2 import_epoch=1 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 +>> put k=c ts=2 v=c2 import_epoch=1 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 +>> del k=c ts=3 +del: "c": found key true +stats: key_bytes=+12 val_count=+1 live_count=-1 live_bytes=-30 gc_bytes_age=+4074 +>> put k=c ts=5 v=c5 import_epoch=2 +stats: key_bytes=+12 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 gc_bytes_age=-194 +>> put k=d ts=1 v=d1 import_epoch=1 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 +>> put k=d ts=3 v=d3 +stats: key_bytes=+12 val_count=+1 val_bytes=+7 live_bytes=-9 gc_bytes_age=+2716 +>> put k=e ts=4 v=e4 import_epoch=2 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 +>> put k=f ts=4 v=f4 import_epoch=2 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 +>> put k=g ts=4 v=g4 import_epoch=2 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 +>> put k=h ts=5 v=h4 import_epoch=2 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 +>> put k=i ts=1 v=i1 import_epoch=1 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 +>> put k=j ts=2 v=j2 import_epoch=1 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 +>> at end: +data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a1 +data: "b"/2.000000000,0 -> {importEpoch=1}/BYTES/b2 +data: "c"/5.000000000,0 -> {importEpoch=2}/BYTES/c5 +data: "c"/3.000000000,0 -> / +data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c2 +data: "d"/3.000000000,0 -> /BYTES/d3 +data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d1 +data: "e"/4.000000000,0 -> {importEpoch=2}/BYTES/e4 +data: "f"/4.000000000,0 -> {importEpoch=2}/BYTES/f4 +data: "g"/4.000000000,0 -> {importEpoch=2}/BYTES/g4 +data: "h"/5.000000000,0 -> {importEpoch=2}/BYTES/h4 +data: "i"/1.000000000,0 -> {importEpoch=1}/BYTES/i1 +data: "j"/2.000000000,0 -> {importEpoch=1}/BYTES/j2 +stats: key_count=10 key_bytes=176 val_count=13 val_bytes=183 live_count=10 live_bytes=291 gc_bytes_age=6596 + +# Now lets del_range_pred on epoch 2 +run stats ok log-ops +del_range_pred k=a end=z ts=10 import_epoch=2 rangeThreshold=3 +---- +>> del_range_pred k=a end=z ts=10 import_epoch=2 rangeThreshold=3 +stats: key_bytes=+12 val_count=+1 range_key_count=+1 range_key_bytes=+14 range_val_count=+1 live_count=-5 live_bytes=-150 gc_bytes_age=+15840 +>> at end: +rangekey: {e-h\x00}/[10.000000000,0=/] +data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a1 +data: "b"/2.000000000,0 -> {importEpoch=1}/BYTES/b2 +data: "c"/10.000000000,0 -> / +data: "c"/5.000000000,0 -> {importEpoch=2}/BYTES/c5 +data: "c"/3.000000000,0 -> / +data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c2 +data: "d"/3.000000000,0 -> /BYTES/d3 +data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d1 +data: "e"/4.000000000,0 -> {importEpoch=2}/BYTES/e4 +data: "f"/4.000000000,0 -> {importEpoch=2}/BYTES/f4 +data: "g"/4.000000000,0 -> {importEpoch=2}/BYTES/g4 +data: "h"/5.000000000,0 -> {importEpoch=2}/BYTES/h4 +data: "i"/1.000000000,0 -> {importEpoch=1}/BYTES/i1 +data: "j"/2.000000000,0 -> {importEpoch=1}/BYTES/j2 +logical op: write_value: key="c", ts=10.000000000,0 +logical op: delete_range: startKey="e" endKey="h\x00" ts=10.000000000,0 +stats: key_count=10 key_bytes=188 val_count=14 val_bytes=183 range_key_count=1 range_key_bytes=14 range_val_count=1 live_count=5 live_bytes=141 gc_bytes_age=22436 + +# Let's try it again at t11 +run stats ok log-ops +del_range_pred k=a end=z ts=11 import_epoch=2 rangeThreshold=3 +---- +>> del_range_pred k=a end=z ts=11 import_epoch=2 rangeThreshold=3 +stats: no change +>> at end: +rangekey: {e-h\x00}/[10.000000000,0=/] +data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a1 +data: "b"/2.000000000,0 -> {importEpoch=1}/BYTES/b2 +data: "c"/10.000000000,0 -> / +data: "c"/5.000000000,0 -> {importEpoch=2}/BYTES/c5 +data: "c"/3.000000000,0 -> / +data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c2 +data: "d"/3.000000000,0 -> /BYTES/d3 +data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d1 +data: "e"/4.000000000,0 -> {importEpoch=2}/BYTES/e4 +data: "f"/4.000000000,0 -> {importEpoch=2}/BYTES/f4 +data: "g"/4.000000000,0 -> {importEpoch=2}/BYTES/g4 +data: "h"/5.000000000,0 -> {importEpoch=2}/BYTES/h4 +data: "i"/1.000000000,0 -> {importEpoch=1}/BYTES/i1 +data: "j"/2.000000000,0 -> {importEpoch=1}/BYTES/j2 +stats: key_count=10 key_bytes=188 val_count=14 val_bytes=183 range_key_count=1 range_key_bytes=14 range_val_count=1 live_count=5 live_bytes=141 gc_bytes_age=22436 + +run ok +clear_range k=a end=z +---- +>> at end: + + +# Restored Import +# +# We set up data that contains 2 imports that have all been restored +# at ts [1,3] +# +run ok +put k=a ts=1 v=a import_epoch=1 +put k=b ts=2 v=b import_epoch=2 +put k=c ts=2 v=c import_epoch=1 +put k=d ts=1 v=d import_epoch=1 +put k=e ts=3 v=e import_epoch=2 +put k=f ts=1 v=f import_epoch=2 +put k=g ts=2 v=g import_epoch=2 +put k=h ts=3 v=h import_epoch=2 +put k=i ts=4 v=i import_epoch=1 +put k=j ts=1 v=j import_epoch=1 +---- +>> at end: +data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a +data: "b"/2.000000000,0 -> {importEpoch=2}/BYTES/b +data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c +data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d +data: "e"/3.000000000,0 -> {importEpoch=2}/BYTES/e +data: "f"/1.000000000,0 -> {importEpoch=2}/BYTES/f +data: "g"/2.000000000,0 -> {importEpoch=2}/BYTES/g +data: "h"/3.000000000,0 -> {importEpoch=2}/BYTES/h +data: "i"/4.000000000,0 -> {importEpoch=1}/BYTES/i +data: "j"/1.000000000,0 -> {importEpoch=1}/BYTES/j + +run stats ok log-ops +del_range_pred k=a end=z ts=5 import_epoch=2 rangeThreshold=3 +---- +>> del_range_pred k=a end=z ts=5 import_epoch=2 rangeThreshold=3 +stats: key_bytes=+12 val_count=+1 range_key_count=+1 range_key_bytes=+14 range_val_count=+1 live_count=-5 live_bytes=-145 gc_bytes_age=+16245 +>> at end: +rangekey: {e-h\x00}/[5.000000000,0=/] +data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a +data: "b"/5.000000000,0 -> / +data: "b"/2.000000000,0 -> {importEpoch=2}/BYTES/b +data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c +data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d +data: "e"/3.000000000,0 -> {importEpoch=2}/BYTES/e +data: "f"/1.000000000,0 -> {importEpoch=2}/BYTES/f +data: "g"/2.000000000,0 -> {importEpoch=2}/BYTES/g +data: "h"/3.000000000,0 -> {importEpoch=2}/BYTES/h +data: "i"/4.000000000,0 -> {importEpoch=1}/BYTES/i +data: "j"/1.000000000,0 -> {importEpoch=1}/BYTES/j +logical op: write_value: key="b", ts=5.000000000,0 +logical op: delete_range: startKey="e" endKey="h\x00" ts=5.000000000,0 +stats: key_count=10 key_bytes=152 val_count=11 val_bytes=150 range_key_count=1 range_key_bytes=14 range_val_count=1 live_count=5 live_bytes=145 gc_bytes_age=16245