Skip to content

Commit

Permalink
Merge #120048
Browse files Browse the repository at this point in the history
120048: storage: add new ImportEpoch DeleteRangePredicate  r=msbutler a=stevendanna

When set, MVCCDeleteRangePredicate will delete any values with
ImportEpoch's in their MVCCValueHeader that is >= the given
ImportEpoch predicate.

Release note: none
Epic: None


Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Mar 15, 2024
2 parents ebf92d2 + bf282fc commit e8fda58
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 4 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ message DeleteRangeRequest {
//
// To pass DeleteRangePredicates, the client must also pass UseRangeTombstone.
message DeleteRangePredicates {
// ImportEpoch specifies that all keys with a non-zero
// MVCCValueHeader.ImportEpoch == ImportEmpoch should be deleted.
uint32 import_epoch = 1;

// StartTime specifies an exclusive lower bound to surface keys
// for deletion. If specified, DeleteRange will only issue tombstones to keys
// within the span [startKey, endKey) that also have MVCC versions with
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func DeleteRange(
"GCRangeHint must only be used together with UseRangeTombstone")
}

if args.Predicates.ImportEpoch > 0 && !args.Predicates.StartTime.IsEmpty() {
return result.Result{}, errors.AssertionFailedf(
"DeleteRangePredicate should not have both non-zero ImportEpoch and non-empty StartTime")
}

// Use MVCC range tombstone if requested.
if args.UseRangeTombstone {
if cArgs.Header.Txn != nil {
Expand Down
30 changes: 28 additions & 2 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2662,6 +2662,7 @@ func mvccPutInternal(
versionValue.Value = value
versionValue.LocalTimestamp = opts.LocalTimestamp
versionValue.OmitInRangefeeds = opts.OmitInRangefeeds
versionValue.ImportEpoch = opts.ImportEpoch

if buildutil.CrdbTestBuild {
if seq, seqOK := kvnemesisutil.FromContext(ctx); seqOK {
Expand Down Expand Up @@ -3786,6 +3787,29 @@ func MVCCPredicateDeleteRange(
return nil, &kvpb.LockConflictError{Locks: locks}
}

var stopRunBasedOnPredicate func(k MVCCKey, iter *MVCCIncrementalIterator) (bool, error)
if predicates.ImportEpoch > 0 {
// TODO(ssd): We will likely eventually want something
// that consturcts our iterator opetions based on the
// predicate so that we can use a block-property
// filter for import epochs.
stopRunBasedOnPredicate = func(k MVCCKey, it *MVCCIncrementalIterator) (bool, error) {
rawV, err := it.UnsafeValue()
if err != nil {
return true, err
}
v, err := DecodeMVCCValue(rawV)
if err != nil {
return true, err
}
return v.ImportEpoch != predicates.ImportEpoch, nil
}
} else {
stopRunBasedOnPredicate = func(k MVCCKey, _ *MVCCIncrementalIterator) (bool, error) {
return k.Timestamp.LessEq(predicates.StartTime), nil
}
}

// continueRun returns three bools: the first is true if the current run
// should continue; the second is true if the latest key is a point tombstone;
// the third is true if the latest key is a range tombstone. If a non-nil
Expand Down Expand Up @@ -3837,11 +3861,12 @@ func MVCCPredicateDeleteRange(
}

// The latest key is a live point key. Conduct predicate filtering.
if k.Timestamp.LessEq(predicates.StartTime) {
if stop, err := stopRunBasedOnPredicate(k, iter); err != nil {
return false, false, false, err
} else if stop {
return false, false, false, nil
}

// TODO (msbutler): use MVCCValueHeader to match on job ID predicate
return true, false, false, nil
}

Expand Down Expand Up @@ -4580,6 +4605,7 @@ type MVCCWriteOptions struct {
Stats *enginepb.MVCCStats
ReplayWriteTimestampProtection bool
OmitInRangefeeds bool
ImportEpoch uint32
// MaxLockConflicts is a maximum number of conflicting locks collected before
// returning LockConflictError. Even single-key writes can encounter multiple
// conflicting shared locks, so the limit is important to bound the number of
Expand Down
13 changes: 12 additions & 1 deletion pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,8 +1457,13 @@ func cmdDeleteRangePredicate(e *evalCtx) error {
if e.hasArg("maxBytes") {
e.scanArg("maxBytes", &maxBytes)
}
importEpoch := 0
if e.hasArg("import_epoch") {
e.scanArg("import_epoch", &importEpoch)
}
predicates := kvpb.DeleteRangePredicates{
StartTime: e.getTsWithName("startTime"),
StartTime: e.getTsWithName("startTime"),
ImportEpoch: uint32(importEpoch),
}
rangeThreshold := 64
if e.hasArg("rangeThreshold") {
Expand Down Expand Up @@ -1595,12 +1600,18 @@ func cmdPut(e *evalCtx) error {
val.InitChecksum(key)
}

importEpoch := 0
if e.hasArg("import_epoch") {
e.scanArg("import_epoch", &importEpoch)
}

resolve, resolveStatus := e.getResolve()

return e.withWriter("put", func(rw storage.ReadWriter) error {
opts := storage.MVCCWriteOptions{
Txn: txn,
LocalTimestamp: localTs,
ImportEpoch: uint32(importEpoch),
Stats: e.ms,
ReplayWriteTimestampProtection: e.getAmbiguousReplay(),
MaxLockConflicts: e.getMaxLockConflicts(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/mvcc_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2028,7 +2028,7 @@ func TestMVCCStatsRandomized(t *testing.T) {
)
} else {
rangeTombstoneThreshold := s.rng.Int63n(5)
desc = fmt.Sprintf("mvccPredicateDeleteRange=%s, predicates=%s, rangeTombstoneThreshold=%d",
desc = fmt.Sprintf("mvccPredicateDeleteRange=%s, predicates=%#v rangeTombstoneThreshold=%d",
roachpb.Span{Key: mvccRangeDelKey, EndKey: mvccRangeDelEndKey}, predicates, rangeTombstoneThreshold)
const maxLockConflicts = 0 // unlimited
const targetLockConflictBytes = 0 // unlimited
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# Tests MVCCPredicateDeleteRange with ImportEpoch predicates
#
# We set up data the represents a timeline like.
#
# Import 1 from t [1,2]
# More writes at t3
# Import 2 from t [4,5]
#
# T E|
# 5 2| c5
# 4 2| e4 f4 g4 h4
# 3 | x d3
# 2 1| b2 c2 j2
# 1 1| a1 d1 i1
# | a b c d e f g h i j

run stats ok
put k=a ts=1 v=a1 import_epoch=1
put k=b ts=2 v=b2 import_epoch=1
put k=c ts=2 v=c2 import_epoch=1
del k=c ts=3
put k=c ts=5 v=c5 import_epoch=2
put k=d ts=1 v=d1 import_epoch=1
put k=d ts=3 v=d3
put k=e ts=4 v=e4 import_epoch=2
put k=f ts=4 v=f4 import_epoch=2
put k=g ts=4 v=g4 import_epoch=2
put k=h ts=5 v=h4 import_epoch=2
put k=i ts=1 v=i1 import_epoch=1
put k=j ts=2 v=j2 import_epoch=1
----
>> put k=a ts=1 v=a1 import_epoch=1
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=b ts=2 v=b2 import_epoch=1
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=c ts=2 v=c2 import_epoch=1
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> del k=c ts=3
del: "c": found key true
stats: key_bytes=+12 val_count=+1 live_count=-1 live_bytes=-30 gc_bytes_age=+4074
>> put k=c ts=5 v=c5 import_epoch=2
stats: key_bytes=+12 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 gc_bytes_age=-194
>> put k=d ts=1 v=d1 import_epoch=1
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=d ts=3 v=d3
stats: key_bytes=+12 val_count=+1 val_bytes=+7 live_bytes=-9 gc_bytes_age=+2716
>> put k=e ts=4 v=e4 import_epoch=2
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=f ts=4 v=f4 import_epoch=2
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=g ts=4 v=g4 import_epoch=2
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=h ts=5 v=h4 import_epoch=2
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=i ts=1 v=i1 import_epoch=1
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=j ts=2 v=j2 import_epoch=1
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> at end:
data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a1
data: "b"/2.000000000,0 -> {importEpoch=1}/BYTES/b2
data: "c"/5.000000000,0 -> {importEpoch=2}/BYTES/c5
data: "c"/3.000000000,0 -> /<empty>
data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c2
data: "d"/3.000000000,0 -> /BYTES/d3
data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d1
data: "e"/4.000000000,0 -> {importEpoch=2}/BYTES/e4
data: "f"/4.000000000,0 -> {importEpoch=2}/BYTES/f4
data: "g"/4.000000000,0 -> {importEpoch=2}/BYTES/g4
data: "h"/5.000000000,0 -> {importEpoch=2}/BYTES/h4
data: "i"/1.000000000,0 -> {importEpoch=1}/BYTES/i1
data: "j"/2.000000000,0 -> {importEpoch=1}/BYTES/j2
stats: key_count=10 key_bytes=176 val_count=13 val_bytes=183 live_count=10 live_bytes=291 gc_bytes_age=6596

# Now lets del_range_pred on epoch 2
run stats ok log-ops
del_range_pred k=a end=z ts=10 import_epoch=2 rangeThreshold=3
----
>> del_range_pred k=a end=z ts=10 import_epoch=2 rangeThreshold=3
stats: key_bytes=+12 val_count=+1 range_key_count=+1 range_key_bytes=+14 range_val_count=+1 live_count=-5 live_bytes=-150 gc_bytes_age=+15840
>> at end:
rangekey: {e-h\x00}/[10.000000000,0=/<empty>]
data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a1
data: "b"/2.000000000,0 -> {importEpoch=1}/BYTES/b2
data: "c"/10.000000000,0 -> /<empty>
data: "c"/5.000000000,0 -> {importEpoch=2}/BYTES/c5
data: "c"/3.000000000,0 -> /<empty>
data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c2
data: "d"/3.000000000,0 -> /BYTES/d3
data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d1
data: "e"/4.000000000,0 -> {importEpoch=2}/BYTES/e4
data: "f"/4.000000000,0 -> {importEpoch=2}/BYTES/f4
data: "g"/4.000000000,0 -> {importEpoch=2}/BYTES/g4
data: "h"/5.000000000,0 -> {importEpoch=2}/BYTES/h4
data: "i"/1.000000000,0 -> {importEpoch=1}/BYTES/i1
data: "j"/2.000000000,0 -> {importEpoch=1}/BYTES/j2
logical op: write_value: key="c", ts=10.000000000,0
logical op: delete_range: startKey="e" endKey="h\x00" ts=10.000000000,0
stats: key_count=10 key_bytes=188 val_count=14 val_bytes=183 range_key_count=1 range_key_bytes=14 range_val_count=1 live_count=5 live_bytes=141 gc_bytes_age=22436

# Let's try it again at t11
run stats ok log-ops
del_range_pred k=a end=z ts=11 import_epoch=2 rangeThreshold=3
----
>> del_range_pred k=a end=z ts=11 import_epoch=2 rangeThreshold=3
stats: no change
>> at end:
rangekey: {e-h\x00}/[10.000000000,0=/<empty>]
data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a1
data: "b"/2.000000000,0 -> {importEpoch=1}/BYTES/b2
data: "c"/10.000000000,0 -> /<empty>
data: "c"/5.000000000,0 -> {importEpoch=2}/BYTES/c5
data: "c"/3.000000000,0 -> /<empty>
data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c2
data: "d"/3.000000000,0 -> /BYTES/d3
data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d1
data: "e"/4.000000000,0 -> {importEpoch=2}/BYTES/e4
data: "f"/4.000000000,0 -> {importEpoch=2}/BYTES/f4
data: "g"/4.000000000,0 -> {importEpoch=2}/BYTES/g4
data: "h"/5.000000000,0 -> {importEpoch=2}/BYTES/h4
data: "i"/1.000000000,0 -> {importEpoch=1}/BYTES/i1
data: "j"/2.000000000,0 -> {importEpoch=1}/BYTES/j2
stats: key_count=10 key_bytes=188 val_count=14 val_bytes=183 range_key_count=1 range_key_bytes=14 range_val_count=1 live_count=5 live_bytes=141 gc_bytes_age=22436

run ok
clear_range k=a end=z
----
>> at end:
<no data>

# Restored Import
#
# We set up data that contains 2 imports that have all been restored
# at ts [1,3]
#
run ok
put k=a ts=1 v=a import_epoch=1
put k=b ts=2 v=b import_epoch=2
put k=c ts=2 v=c import_epoch=1
put k=d ts=1 v=d import_epoch=1
put k=e ts=3 v=e import_epoch=2
put k=f ts=1 v=f import_epoch=2
put k=g ts=2 v=g import_epoch=2
put k=h ts=3 v=h import_epoch=2
put k=i ts=4 v=i import_epoch=1
put k=j ts=1 v=j import_epoch=1
----
>> at end:
data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a
data: "b"/2.000000000,0 -> {importEpoch=2}/BYTES/b
data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c
data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d
data: "e"/3.000000000,0 -> {importEpoch=2}/BYTES/e
data: "f"/1.000000000,0 -> {importEpoch=2}/BYTES/f
data: "g"/2.000000000,0 -> {importEpoch=2}/BYTES/g
data: "h"/3.000000000,0 -> {importEpoch=2}/BYTES/h
data: "i"/4.000000000,0 -> {importEpoch=1}/BYTES/i
data: "j"/1.000000000,0 -> {importEpoch=1}/BYTES/j

run stats ok log-ops
del_range_pred k=a end=z ts=5 import_epoch=2 rangeThreshold=3
----
>> del_range_pred k=a end=z ts=5 import_epoch=2 rangeThreshold=3
stats: key_bytes=+12 val_count=+1 range_key_count=+1 range_key_bytes=+14 range_val_count=+1 live_count=-5 live_bytes=-145 gc_bytes_age=+16245
>> at end:
rangekey: {e-h\x00}/[5.000000000,0=/<empty>]
data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a
data: "b"/5.000000000,0 -> /<empty>
data: "b"/2.000000000,0 -> {importEpoch=2}/BYTES/b
data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c
data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d
data: "e"/3.000000000,0 -> {importEpoch=2}/BYTES/e
data: "f"/1.000000000,0 -> {importEpoch=2}/BYTES/f
data: "g"/2.000000000,0 -> {importEpoch=2}/BYTES/g
data: "h"/3.000000000,0 -> {importEpoch=2}/BYTES/h
data: "i"/4.000000000,0 -> {importEpoch=1}/BYTES/i
data: "j"/1.000000000,0 -> {importEpoch=1}/BYTES/j
logical op: write_value: key="b", ts=5.000000000,0
logical op: delete_range: startKey="e" endKey="h\x00" ts=5.000000000,0
stats: key_count=10 key_bytes=152 val_count=11 val_bytes=150 range_key_count=1 range_key_bytes=14 range_val_count=1 live_count=5 live_bytes=145 gc_bytes_age=16245

0 comments on commit e8fda58

Please sign in to comment.