Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add new ImportEpoch DeleteRangePredicate #120048

Merged
merged 1 commit into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ message DeleteRangeRequest {
//
// To pass DeleteRangePredicates, the client must also pass UseRangeTombstone.
message DeleteRangePredicates {
// ImportEpoch specifies that all keys with a non-zero
// MVCCValueHeader.ImportEpoch == ImportEmpoch should be deleted.
uint32 import_epoch = 1;
stevendanna marked this conversation as resolved.
Show resolved Hide resolved

// StartTime specifies an exclusive lower bound to surface keys
// for deletion. If specified, DeleteRange will only issue tombstones to keys
// within the span [startKey, endKey) that also have MVCC versions with
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func DeleteRange(
"GCRangeHint must only be used together with UseRangeTombstone")
}

if args.Predicates.ImportEpoch > 0 && !args.Predicates.StartTime.IsEmpty() {
return result.Result{}, errors.AssertionFailedf(
"DeleteRangePredicate should not have both non-zero ImportEpoch and non-empty StartTime")
}

// Use MVCC range tombstone if requested.
if args.UseRangeTombstone {
if cArgs.Header.Txn != nil {
Expand Down
30 changes: 28 additions & 2 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2662,6 +2662,7 @@ func mvccPutInternal(
versionValue.Value = value
versionValue.LocalTimestamp = opts.LocalTimestamp
versionValue.OmitInRangefeeds = opts.OmitInRangefeeds
versionValue.ImportEpoch = opts.ImportEpoch

if buildutil.CrdbTestBuild {
if seq, seqOK := kvnemesisutil.FromContext(ctx); seqOK {
Expand Down Expand Up @@ -3786,6 +3787,29 @@ func MVCCPredicateDeleteRange(
return nil, &kvpb.LockConflictError{Locks: locks}
}

var stopRunBasedOnPredicate func(k MVCCKey, iter *MVCCIncrementalIterator) (bool, error)
if predicates.ImportEpoch > 0 {
// TODO(ssd): We will likely eventually want something
// that consturcts our iterator opetions based on the
// predicate so that we can use a block-property
// filter for import epochs.
stopRunBasedOnPredicate = func(k MVCCKey, it *MVCCIncrementalIterator) (bool, error) {
rawV, err := it.UnsafeValue()
if err != nil {
return true, err
}
v, err := DecodeMVCCValue(rawV)
if err != nil {
return true, err
}
return v.ImportEpoch != predicates.ImportEpoch, nil
}
} else {
stevendanna marked this conversation as resolved.
Show resolved Hide resolved
stopRunBasedOnPredicate = func(k MVCCKey, _ *MVCCIncrementalIterator) (bool, error) {
return k.Timestamp.LessEq(predicates.StartTime), nil
}
}

// continueRun returns three bools: the first is true if the current run
// should continue; the second is true if the latest key is a point tombstone;
// the third is true if the latest key is a range tombstone. If a non-nil
msbutler marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -3837,11 +3861,12 @@ func MVCCPredicateDeleteRange(
}

// The latest key is a live point key. Conduct predicate filtering.
if k.Timestamp.LessEq(predicates.StartTime) {
if stop, err := stopRunBasedOnPredicate(k, iter); err != nil {
return false, false, false, err
} else if stop {
return false, false, false, nil
}

// TODO (msbutler): use MVCCValueHeader to match on job ID predicate
return true, false, false, nil
}

Expand Down Expand Up @@ -4580,6 +4605,7 @@ type MVCCWriteOptions struct {
Stats *enginepb.MVCCStats
ReplayWriteTimestampProtection bool
OmitInRangefeeds bool
ImportEpoch uint32
// MaxLockConflicts is a maximum number of conflicting locks collected before
// returning LockConflictError. Even single-key writes can encounter multiple
// conflicting shared locks, so the limit is important to bound the number of
Expand Down
13 changes: 12 additions & 1 deletion pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,8 +1457,13 @@ func cmdDeleteRangePredicate(e *evalCtx) error {
if e.hasArg("maxBytes") {
e.scanArg("maxBytes", &maxBytes)
}
importEpoch := 0
if e.hasArg("import_epoch") {
e.scanArg("import_epoch", &importEpoch)
}
predicates := kvpb.DeleteRangePredicates{
StartTime: e.getTsWithName("startTime"),
StartTime: e.getTsWithName("startTime"),
ImportEpoch: uint32(importEpoch),
}
rangeThreshold := 64
if e.hasArg("rangeThreshold") {
Expand Down Expand Up @@ -1595,12 +1600,18 @@ func cmdPut(e *evalCtx) error {
val.InitChecksum(key)
}

importEpoch := 0
if e.hasArg("import_epoch") {
e.scanArg("import_epoch", &importEpoch)
}

resolve, resolveStatus := e.getResolve()

return e.withWriter("put", func(rw storage.ReadWriter) error {
opts := storage.MVCCWriteOptions{
Txn: txn,
LocalTimestamp: localTs,
ImportEpoch: uint32(importEpoch),
Stats: e.ms,
ReplayWriteTimestampProtection: e.getAmbiguousReplay(),
MaxLockConflicts: e.getMaxLockConflicts(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/mvcc_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2028,7 +2028,7 @@ func TestMVCCStatsRandomized(t *testing.T) {
)
} else {
rangeTombstoneThreshold := s.rng.Int63n(5)
desc = fmt.Sprintf("mvccPredicateDeleteRange=%s, predicates=%s, rangeTombstoneThreshold=%d",
desc = fmt.Sprintf("mvccPredicateDeleteRange=%s, predicates=%#v rangeTombstoneThreshold=%d",
roachpb.Span{Key: mvccRangeDelKey, EndKey: mvccRangeDelEndKey}, predicates, rangeTombstoneThreshold)
const maxLockConflicts = 0 // unlimited
const targetLockConflictBytes = 0 // unlimited
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# Tests MVCCPredicateDeleteRange with ImportEpoch predicates
#
# We set up data the represents a timeline like.
#
# Import 1 from t [1,2]
# More writes at t3
# Import 2 from t [4,5]
#
# T E|
# 5 2| c5
stevendanna marked this conversation as resolved.
Show resolved Hide resolved
# 4 2| e4 f4 g4 h4
# 3 | x d3
# 2 1| b2 c2 j2
# 1 1| a1 d1 i1
# | a b c d e f g h i j

run stats ok
put k=a ts=1 v=a1 import_epoch=1
put k=b ts=2 v=b2 import_epoch=1
put k=c ts=2 v=c2 import_epoch=1
del k=c ts=3
put k=c ts=5 v=c5 import_epoch=2
put k=d ts=1 v=d1 import_epoch=1
put k=d ts=3 v=d3
put k=e ts=4 v=e4 import_epoch=2
put k=f ts=4 v=f4 import_epoch=2
put k=g ts=4 v=g4 import_epoch=2
put k=h ts=5 v=h4 import_epoch=2
put k=i ts=1 v=i1 import_epoch=1
put k=j ts=2 v=j2 import_epoch=1
----
>> put k=a ts=1 v=a1 import_epoch=1
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=b ts=2 v=b2 import_epoch=1
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=c ts=2 v=c2 import_epoch=1
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> del k=c ts=3
del: "c": found key true
stats: key_bytes=+12 val_count=+1 live_count=-1 live_bytes=-30 gc_bytes_age=+4074
>> put k=c ts=5 v=c5 import_epoch=2
stats: key_bytes=+12 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30 gc_bytes_age=-194
>> put k=d ts=1 v=d1 import_epoch=1
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=d ts=3 v=d3
stats: key_bytes=+12 val_count=+1 val_bytes=+7 live_bytes=-9 gc_bytes_age=+2716
>> put k=e ts=4 v=e4 import_epoch=2
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=f ts=4 v=f4 import_epoch=2
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=g ts=4 v=g4 import_epoch=2
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=h ts=5 v=h4 import_epoch=2
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=i ts=1 v=i1 import_epoch=1
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> put k=j ts=2 v=j2 import_epoch=1
stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+16 live_count=+1 live_bytes=+30
>> at end:
data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a1
data: "b"/2.000000000,0 -> {importEpoch=1}/BYTES/b2
data: "c"/5.000000000,0 -> {importEpoch=2}/BYTES/c5
data: "c"/3.000000000,0 -> /<empty>
data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c2
data: "d"/3.000000000,0 -> /BYTES/d3
data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d1
data: "e"/4.000000000,0 -> {importEpoch=2}/BYTES/e4
data: "f"/4.000000000,0 -> {importEpoch=2}/BYTES/f4
data: "g"/4.000000000,0 -> {importEpoch=2}/BYTES/g4
data: "h"/5.000000000,0 -> {importEpoch=2}/BYTES/h4
data: "i"/1.000000000,0 -> {importEpoch=1}/BYTES/i1
data: "j"/2.000000000,0 -> {importEpoch=1}/BYTES/j2
stats: key_count=10 key_bytes=176 val_count=13 val_bytes=183 live_count=10 live_bytes=291 gc_bytes_age=6596

# Now lets del_range_pred on epoch 2
run stats ok log-ops
del_range_pred k=a end=z ts=10 import_epoch=2 rangeThreshold=3
----
>> del_range_pred k=a end=z ts=10 import_epoch=2 rangeThreshold=3
stats: key_bytes=+12 val_count=+1 range_key_count=+1 range_key_bytes=+14 range_val_count=+1 live_count=-5 live_bytes=-150 gc_bytes_age=+15840
>> at end:
rangekey: {e-h\x00}/[10.000000000,0=/<empty>]
data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a1
data: "b"/2.000000000,0 -> {importEpoch=1}/BYTES/b2
data: "c"/10.000000000,0 -> /<empty>
data: "c"/5.000000000,0 -> {importEpoch=2}/BYTES/c5
data: "c"/3.000000000,0 -> /<empty>
data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c2
data: "d"/3.000000000,0 -> /BYTES/d3
data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d1
data: "e"/4.000000000,0 -> {importEpoch=2}/BYTES/e4
data: "f"/4.000000000,0 -> {importEpoch=2}/BYTES/f4
data: "g"/4.000000000,0 -> {importEpoch=2}/BYTES/g4
data: "h"/5.000000000,0 -> {importEpoch=2}/BYTES/h4
data: "i"/1.000000000,0 -> {importEpoch=1}/BYTES/i1
data: "j"/2.000000000,0 -> {importEpoch=1}/BYTES/j2
logical op: write_value: key="c", ts=10.000000000,0
logical op: delete_range: startKey="e" endKey="h\x00" ts=10.000000000,0
stats: key_count=10 key_bytes=188 val_count=14 val_bytes=183 range_key_count=1 range_key_bytes=14 range_val_count=1 live_count=5 live_bytes=141 gc_bytes_age=22436

# Let's try it again at t11
run stats ok log-ops
del_range_pred k=a end=z ts=11 import_epoch=2 rangeThreshold=3
----
>> del_range_pred k=a end=z ts=11 import_epoch=2 rangeThreshold=3
stats: no change
>> at end:
rangekey: {e-h\x00}/[10.000000000,0=/<empty>]
data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a1
data: "b"/2.000000000,0 -> {importEpoch=1}/BYTES/b2
data: "c"/10.000000000,0 -> /<empty>
data: "c"/5.000000000,0 -> {importEpoch=2}/BYTES/c5
data: "c"/3.000000000,0 -> /<empty>
data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c2
data: "d"/3.000000000,0 -> /BYTES/d3
data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d1
data: "e"/4.000000000,0 -> {importEpoch=2}/BYTES/e4
data: "f"/4.000000000,0 -> {importEpoch=2}/BYTES/f4
data: "g"/4.000000000,0 -> {importEpoch=2}/BYTES/g4
data: "h"/5.000000000,0 -> {importEpoch=2}/BYTES/h4
data: "i"/1.000000000,0 -> {importEpoch=1}/BYTES/i1
data: "j"/2.000000000,0 -> {importEpoch=1}/BYTES/j2
stats: key_count=10 key_bytes=188 val_count=14 val_bytes=183 range_key_count=1 range_key_bytes=14 range_val_count=1 live_count=5 live_bytes=141 gc_bytes_age=22436

run ok
clear_range k=a end=z
----
>> at end:
<no data>

# Restored Import
#
# We set up data that contains 2 imports that have all been restored
# at ts [1,3]
#
run ok
put k=a ts=1 v=a import_epoch=1
put k=b ts=2 v=b import_epoch=2
put k=c ts=2 v=c import_epoch=1
put k=d ts=1 v=d import_epoch=1
put k=e ts=3 v=e import_epoch=2
put k=f ts=1 v=f import_epoch=2
put k=g ts=2 v=g import_epoch=2
put k=h ts=3 v=h import_epoch=2
put k=i ts=4 v=i import_epoch=1
put k=j ts=1 v=j import_epoch=1
----
>> at end:
data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a
data: "b"/2.000000000,0 -> {importEpoch=2}/BYTES/b
data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c
data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d
data: "e"/3.000000000,0 -> {importEpoch=2}/BYTES/e
data: "f"/1.000000000,0 -> {importEpoch=2}/BYTES/f
data: "g"/2.000000000,0 -> {importEpoch=2}/BYTES/g
data: "h"/3.000000000,0 -> {importEpoch=2}/BYTES/h
data: "i"/4.000000000,0 -> {importEpoch=1}/BYTES/i
data: "j"/1.000000000,0 -> {importEpoch=1}/BYTES/j

run stats ok log-ops
del_range_pred k=a end=z ts=5 import_epoch=2 rangeThreshold=3
----
>> del_range_pred k=a end=z ts=5 import_epoch=2 rangeThreshold=3
stats: key_bytes=+12 val_count=+1 range_key_count=+1 range_key_bytes=+14 range_val_count=+1 live_count=-5 live_bytes=-145 gc_bytes_age=+16245
>> at end:
rangekey: {e-h\x00}/[5.000000000,0=/<empty>]
data: "a"/1.000000000,0 -> {importEpoch=1}/BYTES/a
data: "b"/5.000000000,0 -> /<empty>
data: "b"/2.000000000,0 -> {importEpoch=2}/BYTES/b
data: "c"/2.000000000,0 -> {importEpoch=1}/BYTES/c
data: "d"/1.000000000,0 -> {importEpoch=1}/BYTES/d
data: "e"/3.000000000,0 -> {importEpoch=2}/BYTES/e
data: "f"/1.000000000,0 -> {importEpoch=2}/BYTES/f
data: "g"/2.000000000,0 -> {importEpoch=2}/BYTES/g
data: "h"/3.000000000,0 -> {importEpoch=2}/BYTES/h
data: "i"/4.000000000,0 -> {importEpoch=1}/BYTES/i
data: "j"/1.000000000,0 -> {importEpoch=1}/BYTES/j
logical op: write_value: key="b", ts=5.000000000,0
logical op: delete_range: startKey="e" endKey="h\x00" ts=5.000000000,0
stats: key_count=10 key_bytes=152 val_count=11 val_bytes=150 range_key_count=1 range_key_bytes=14 range_val_count=1 live_count=5 live_bytes=145 gc_bytes_age=16245
Loading