Skip to content

Commit

Permalink
backupccl: handle range keys in BACKUP
Browse files Browse the repository at this point in the history
Previously BACKUP would not back up range tombstones. With this patch, BACKUPs
with revision_history will backup range tombstones. Non-revision history backups
are not affected by this diff because MVCCExportToSST filters all tombstones
out of the backup already.

Specifically, this patch replaces the iterators used in the backup_processor
with the pebbleIterator, which has baked in range key support.

Fixes #71155

Release note: none
  • Loading branch information
msbutler committed Jul 21, 2022
1 parent 79edfce commit 1026431
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 25 deletions.
46 changes: 46 additions & 0 deletions pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -321,6 +323,15 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string)
// + expect-pausepoint: expects the schema change job to end up in a paused state because
// of a pausepoint error.
//
// - "kv" [args]
// Issues a kv request
//
// Supported arguments:
//
// + type: kv request type. Currently, only DeleteRange is supported
//
// + target: SQL target. Currently, only table names are supported.
//
// - "nudge-and-wait-for-temp-cleanup"
// Nudges the temporary object reconciliation loop to run, and waits for completion.
func TestDataDriven(t *testing.T) {
Expand Down Expand Up @@ -620,6 +631,15 @@ func TestDataDriven(t *testing.T) {
}
return ""

case "kv":
var request string
d.ScanArgs(t, "request", &request)

var target string
d.ScanArgs(t, "target", &target)
handleKVRequest(ctx, t, lastCreatedServer, ds, request, target)
return ""

case "save-cluster-ts":
server := lastCreatedServer
user := "root"
Expand Down Expand Up @@ -650,6 +670,32 @@ func TestDataDriven(t *testing.T) {
})
}

func handleKVRequest(
ctx context.Context, t *testing.T, server string, ds datadrivenTestState, request, target string,
) {
user := "root"
if request == "DeleteRange" {
var tableID uint32
err := ds.getSQLDB(t, server, user).QueryRow(`SELECT id FROM system.namespace WHERE name = $1`,
target).Scan(&tableID)
require.NoError(t, err)
bankSpan := makeTableSpan(tableID)
dr := roachpb.DeleteRangeRequest{
// Bogus span to make it a valid request.
RequestHeader: roachpb.RequestHeader{
Key: bankSpan.Key,
EndKey: bankSpan.EndKey,
},
UseRangeTombstone: true,
}
if _, err := kv.SendWrapped(ctx, ds.servers[server].DistSenderI().(*kvcoord.DistSender), &dr); err != nil {
t.Fatal(err)
}
} else {
t.Fatalf("Unknown kv request")
}
}

// findMostRecentJobWithType returns the most recently created job of `job_type`
// jobType.
func findMostRecentJobWithType(
Expand Down
89 changes: 65 additions & 24 deletions pkg/ccl/backupccl/file_sst_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,32 +268,13 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) error {

log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName)

// Copy SST content.
sst, err := storage.NewMemSSTIterator(resp.dataSST, false)
if err != nil {
// To speed up SST reading, surface all the point keys first, flush,
// then surface all the range keys and flush.
if err := s.copyPointKeys(resp.dataSST); err != nil {
return err
}
defer sst.Close()

sst.SeekGE(storage.MVCCKey{Key: keys.MinKey})
for {
if valid, err := sst.Valid(); !valid || err != nil {
if err != nil {
return err
}
break
}
k := sst.UnsafeKey()
if k.Timestamp.IsEmpty() {
if err := s.sst.PutUnversioned(k.Key, sst.UnsafeValue()); err != nil {
return err
}
} else {
if err := s.sst.PutRawMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil {
return err
}
}
sst.Next()
if err := s.copyRangeKeys(resp.dataSST); err != nil {
return err
}

// If this span extended the last span added -- that is, picked up where it
Expand Down Expand Up @@ -328,6 +309,66 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) error {
return nil
}

func (s *fileSSTSink) copyPointKeys(dataSST []byte) error {
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
}
iter, err := storage.NewPebbleMemSSTIterator(dataSST, false, iterOpts)
if err != nil {
return err
}
defer iter.Close()

for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() {
if valid, err := iter.Valid(); !valid || err != nil {
if err != nil {
return err
}
break
}
k := iter.UnsafeKey()
if k.Timestamp.IsEmpty() {
if err := s.sst.PutUnversioned(k.Key, iter.UnsafeValue()); err != nil {
return err
}
} else {
if err := s.sst.PutRawMVCC(iter.UnsafeKey(), iter.UnsafeValue()); err != nil {
return err
}
}
}
return nil
}

func (s *fileSSTSink) copyRangeKeys(dataSST []byte) error {
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypeRangesOnly,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
}
iter, err := storage.NewPebbleMemSSTIterator(dataSST, false, iterOpts)
if err != nil {
return err
}
defer iter.Close()

for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() {
if ok, err := iter.Valid(); err != nil {
return err
} else if !ok {
break
}
for _, rkv := range iter.RangeKeys() {
if err := s.sst.PutMVCCRangeKey(rkv.RangeKey, storage.MVCCValue{}); err != nil {
return err
}
}
}
return nil
}

func generateUniqueSSTName(nodeID base.SQLInstanceID) string {
// The data/ prefix, including a /, is intended to group SSTs in most of the
// common file/bucket browse UIs.
Expand Down
84 changes: 84 additions & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/rangekeys
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Tests that Backups without Revisions History and Restore properly handle
# range keys

new-server name=s1
----

exec-sql
CREATE DATABASE orig;
USE orig;
CREATE TABLE foo (i INT PRIMARY KEY, s STRING);
INSERT INTO foo VALUES (1, 'x'),(2,'y');
CREATE TABLE baz (i INT PRIMARY KEY, s STRING);
INSERT INTO baz VALUES (11, 'xx'),(22,'yy');
----

# Ensure a full backup properly captures range keys
# - with foo, delete then insert, and ensure no original data surfaces in restore
# - with baz: chill for now

kv request=DeleteRange target=foo
----

exec-sql
INSERT INTO foo VALUES (3,'z');
----

exec-sql
BACKUP INTO 'nodelocal://0/test-root/';
----

exec-sql
RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' with new_db_name='orig1';
----

query-sql
SELECT count(*) from orig1.foo;
----
1

query-sql
SELECT count(*) from orig1.baz;
----
2

exec-sql
DROP DATABASE orig1 CASCADE
----

# Ensure incremental backup without revision history
# handles range tombstones:
# - with foo, insert and ensure latest data from previous backup surfaces in RESTORE
# - with baz, delete then insert, and ensure no data from previous backup surfaces in RESTORE

exec-sql
INSERT INTO foo VALUES (4,'a'),(5,'b');
----

kv request=DeleteRange target=baz
----

exec-sql
INSERT INTO baz VALUES (33,'zz');
----

exec-sql
BACKUP INTO LATEST IN 'nodelocal://0/test-root/';
----

exec-sql
RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' with new_db_name='orig1';
----

query-sql
SELECT count(*) from orig1.foo
----
3

query-sql
SELECT count(*) from orig1.baz
----
1



Loading

0 comments on commit 1026431

Please sign in to comment.