Skip to content

Commit

Permalink
backupccl: enable mvcc online restore
Browse files Browse the repository at this point in the history
This patch enables mvcc online restore for cluster, database, and table
restores by plumbing the batch request timestamp through the virtual addsstable
request.

Release note: none
  • Loading branch information
msbutler committed Feb 22, 2024
1 parent fd2663a commit 340cc72
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 12 deletions.
21 changes: 17 additions & 4 deletions pkg/ccl/backupccl/restore_online.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -76,9 +77,11 @@ func sendAddRemoteSSTs(
return genSpan(ctx, restoreSpanEntriesCh)
})

fromSystemTenant := isFromSystemTenant(dataToRestore.getTenantRekeys())

restoreWorkers := int(onlineRestoreLinkWorkers.Get(&execCtx.ExecCfg().Settings.SV))
for i := 0; i < restoreWorkers; i++ {
grp.GoCtx(sendAddRemoteSSTWorker(execCtx, restoreSpanEntriesCh, requestFinishedCh))
grp.GoCtx(sendAddRemoteSSTWorker(execCtx, restoreSpanEntriesCh, requestFinishedCh, fromSystemTenant))
}

if err := grp.Wait(); err != nil {
Expand Down Expand Up @@ -107,6 +110,7 @@ func sendAddRemoteSSTWorker(
execCtx sql.JobExecContext,
restoreSpanEntriesCh <-chan execinfrapb.RestoreSpanEntry,
requestFinishedCh chan<- struct{},
fromSystemTenant bool,
) func(context.Context) error {
return func(ctx context.Context) error {
var toAdd []execinfrapb.RestoreFileSpec
Expand All @@ -125,7 +129,7 @@ func sendAddRemoteSSTWorker(
}

for _, file := range toAdd {
if err := sendRemoteAddSSTable(ctx, execCtx, file); err != nil {
if err := sendRemoteAddSSTable(ctx, execCtx, file, fromSystemTenant); err != nil {
return err
}
}
Expand Down Expand Up @@ -210,7 +214,10 @@ func sendAdminScatter(
}

func sendRemoteAddSSTable(
ctx context.Context, execCtx sql.JobExecContext, file execinfrapb.RestoreFileSpec,
ctx context.Context,
execCtx sql.JobExecContext,
file execinfrapb.RestoreFileSpec,
fromSystemTenant bool,
) error {
ctx, sp := tracing.ChildSpan(ctx, "backupccl.sendRemoteAddSSTable")
defer sp.Finish()
Expand Down Expand Up @@ -248,8 +255,14 @@ func sendRemoteAddSSTable(
KeyCount: counts.Rows + counts.IndexEntries,
LiveCount: counts.Rows + counts.IndexEntries,
}

var batchTimestamp hlc.Timestamp
if writeAtBatchTS(ctx, file.BackupFileEntrySpan, fromSystemTenant) {
batchTimestamp = execCtx.ExecCfg().DB.Clock().Now()
}

_, _, err := execCtx.ExecCfg().DB.AddRemoteSSTable(
ctx, file.BackupFileEntrySpan, loc, fileStats)
ctx, file.BackupFileEntrySpan, loc, fileStats, batchTimestamp)
return err
}

Expand Down
93 changes: 93 additions & 0 deletions pkg/ccl/backupccl/restore_online_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@
package backupccl

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud/nodelocal"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -55,6 +62,74 @@ func TestOnlineRestoreBasic(t *testing.T) {
jobutils.WaitForJobToSucceed(t, rSQLDB, downloadJobID)
}

// TestOnlineRestoreTenant runs an online restore of a tenant and ensures the
// restore is not MVCC compliant.
func TestOnlineRestoreTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer nodelocal.ReplaceNodeLocalForTesting(t.TempDir())()

ctx := context.Background()

externalStorage := "nodelocal://1/backup"

params := base.TestClusterArgs{ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
TenantTestingKnobs: &sql.TenantTestingKnobs{
// The tests expect specific tenant IDs to show up.
EnableTenantIDReuse: true,
},
},

DefaultTestTenant: base.TestControlsTenantsExplicitly},
}
const numAccounts = 1

tc, systemDB, dir, cleanupFn := backupRestoreTestSetupWithParams(
t, singleNode, numAccounts, InitManualReplication, params,
)
_, _ = tc, systemDB
defer cleanupFn()
srv := tc.Server(0)

_ = securitytest.EmbeddedTenantIDs()

_, conn10 := serverutils.StartTenant(t, srv, base.TestTenantArgs{TenantID: roachpb.MustMakeTenantID(10)})
defer conn10.Close()
tenant10 := sqlutils.MakeSQLRunner(conn10)
tenant10.Exec(t, `CREATE DATABASE foo; CREATE TABLE foo.bar(i int primary key); INSERT INTO foo.bar VALUES (110), (210)`)

systemDB.Exec(t, fmt.Sprintf(`BACKUP TENANT 10 INTO '%s'`, externalStorage))

restoreTC, rSQLDB, cleanupFnRestored := backupRestoreTestSetupEmpty(t, 1, dir, InitManualReplication, params)
defer cleanupFnRestored()

var preRestoreTs float64
tenant10.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&preRestoreTs)

rSQLDB.Exec(t, fmt.Sprintf("RESTORE TENANT 10 FROM LATEST IN '%s' WITH EXPERIMENTAL DEFERRED COPY", externalStorage))

ten10Stopper := stop.NewStopper()
_, restoreConn10 := serverutils.StartTenant(
t, restoreTC.Server(0), base.TestTenantArgs{
TenantID: roachpb.MustMakeTenantID(10), Stopper: ten10Stopper,
},
)
defer func() {
restoreConn10.Close()
ten10Stopper.Stop(ctx)
}()
restoreTenant10 := sqlutils.MakeSQLRunner(restoreConn10)
restoreTenant10.CheckQueryResults(t, `select * from foo.bar`, tenant10.QueryStr(t, `select * from foo.bar`))

// Ensure the restore of a tenant was not mvcc
var maxRestoreMVCCTimestamp float64
restoreTenant10.QueryRow(t, "SELECT max(crdb_internal_mvcc_timestamp) FROM foo.bar").Scan(&maxRestoreMVCCTimestamp)
require.Greater(t, preRestoreTs, maxRestoreMVCCTimestamp)
}

func TestOnlineRestoreErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -116,6 +191,9 @@ func TestOnlineRestoreErrors(t *testing.T) {
func bankOnlineRestore(
t *testing.T, sqlDB *sqlutils.SQLRunner, numAccounts int, externalStorage string,
) {
var preRestoreTs float64
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&preRestoreTs)

sqlDB.Exec(t, "CREATE DATABASE data")
sqlDB.Exec(t, fmt.Sprintf("RESTORE TABLE data.bank FROM LATEST IN '%s' WITH EXPERIMENTAL DEFERRED COPY", externalStorage))

Expand All @@ -124,6 +202,21 @@ func bankOnlineRestore(
var restoreRowCount int
sqlDB.QueryRow(t, "SELECT count(*) FROM data.bank").Scan(&restoreRowCount)
require.Equal(t, numAccounts, restoreRowCount)

// Check that Online Restore was MVCC
var minRestoreMVCCTimestamp float64
sqlDB.QueryRow(t, "SELECT min(crdb_internal_mvcc_timestamp) FROM data.bank").Scan(&minRestoreMVCCTimestamp)
require.Greater(t, minRestoreMVCCTimestamp, preRestoreTs)

// Check that we can write on top of OR data
var maxRestoreMVCCTimestamp float64
sqlDB.QueryRow(t, "SELECT max(crdb_internal_mvcc_timestamp) FROM data.bank").Scan(&maxRestoreMVCCTimestamp)
sqlDB.Exec(t, "SET sql_safe_updates = false;")
sqlDB.Exec(t, "UPDATE data.bank SET balance = balance+1;")

var updateMVCCTimestamp float64
sqlDB.QueryRow(t, "SELECT min(crdb_internal_mvcc_timestamp) FROM data.bank").Scan(&updateMVCCTimestamp)
require.Greater(t, updateMVCCTimestamp, maxRestoreMVCCTimestamp)
}

func checkLinkingProgress(t *testing.T, sqlDB *sqlutils.SQLRunner) float32 {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,7 +1852,6 @@ func doRestorePlan(
backupdest.DeprecatedResolveBackupManifestsExplicitIncrementals(ctx, &mem, mkStore, from,
endTime, encryption, &kmsEnv, p.User())
}

if err != nil {
return err
}
Expand All @@ -1878,6 +1877,10 @@ func doRestorePlan(
if err := checkManifestsForOnlineCompat(ctx, mainBackupManifests); err != nil {
return err
}
currentClusterVersion := p.ExecCfg().Settings.Version.ActiveVersion(ctx).Version
if currentClusterVersion.Less(clusterversion.V24_1.Version()) {
return errors.Newf("cluster must fully upgrade to version %s to run online restore", clusterversion.V24_1.String())
}
}

if restoreStmt.DescriptorCoverage == tree.AllDescriptors {
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,9 +807,10 @@ func (db *DB) AddRemoteSSTable(
span roachpb.Span,
file kvpb.AddSSTableRequest_RemoteFile,
stats *enginepb.MVCCStats,
batchTimestamp hlc.Timestamp,
) (roachpb.Span, int64, error) {
b := &Batch{}
b.addSSTable(span.Key, span.EndKey, nil, file, false, false, hlc.Timestamp{}, stats, false, hlc.Timestamp{})
b := &Batch{Header: kvpb.Header{Timestamp: batchTimestamp}}
b.addSSTable(span.Key, span.EndKey, nil, file, false, false, hlc.Timestamp{}, stats, false, batchTimestamp)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
return roachpb.Span{}, 0, err
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2122,7 +2122,6 @@ message AddSSTableRequest {
// checking, write-at-request-timestamp, or ingest-as-writes are unsupported.
//
// TODO(dt, msbutler, bilal): This is unsupported.
// TODO(dt, msbutler, bilal): support sst_timestamp_to_request_timestamp.
// TODO(msbutler): rename to ExternalFile.
message RemoteFile {
string locator = 1;
Expand All @@ -2132,7 +2131,7 @@ message AddSSTableRequest {
}
RemoteFile remote_file = 10 [(gogoproto.nullable) = false];

// AddSSTableRequest_PrefixReplacement is used to represent a prefix to be
// AddSStableRequest_PrefixReplacement is used to represent a prefix to be
// replaced and the prefix with which to replace it.
message PrefixReplacement {
bytes from = 1;
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func EvalAddSSTable(
ApproximatePhysicalSize: args.RemoteFile.ApproximatePhysicalSize,
BackingFileSize: args.RemoteFile.BackingFileSize,
Span: roachpb.Span{Key: start.Key, EndKey: end.Key},
RemoteRewriteTimestamp: sstToReqTS,
},
// Since the remote SST could contain keys at any timestamp, consider it
// a history mutation.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ message ReplicatedEvalResult {
string remote_file_path = 6;
uint64 backing_file_size = 7;
uint64 approximate_physical_size = 8;
util.hlc.Timestamp remote_rewrite_timestamp = 9 [(gogoproto.nullable) = false];
}
AddSSTable add_sstable = 17 [(gogoproto.customname) = "AddSSTable"];

Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,21 +648,28 @@ func addSSTablePreApply(
) bool {
if sst.RemoteFilePath != "" {
log.Infof(ctx,
"EXPERIMENTAL AddSSTABLE EXTERNAL %s (size %d, span %s) from %s (size %d)",
"EXPERIMENTAL AddSSTABLE EXTERNAL %s (size %d, span %s) from %s (size %d) at rewrite ts %s",
sst.RemoteFilePath,
sst.ApproximatePhysicalSize,
sst.Span,
sst.RemoteFileLoc,
sst.BackingFileSize,
sst.RemoteRewriteTimestamp,
)

start := storage.EngineKey{Key: sst.Span.Key}
end := storage.EngineKey{Key: sst.Span.EndKey}
var syntheticSuffix []byte
if sst.RemoteRewriteTimestamp.IsSet() {
syntheticSuffix = storage.EncodeMVCCTimestampSuffix(sst.RemoteRewriteTimestamp)
}
externalFile := pebble.ExternalFile{
Locator: remote.Locator(sst.RemoteFileLoc),
ObjName: sst.RemoteFilePath,
Size: sst.ApproximatePhysicalSize,
SmallestUserKey: start.Encode(),
LargestUserKey: end.Encode(),
SyntheticSuffix: syntheticSuffix,
// TODO(dt): pass pebble the backing file size to avoid a stat call.

// TODO(msbutler): I guess we need to figure out if the backing external
Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,21 @@ var MVCCMerger = &pebble.Merger{
},
}

var _ sstable.BlockIntervalSyntheticReplacer = MVCCBlockIntervalSyntheticReplacer{}

type MVCCBlockIntervalSyntheticReplacer struct{}

func (mbsr MVCCBlockIntervalSyntheticReplacer) AdjustIntervalWithSyntheticSuffix(
lower uint64, upper uint64, suffix []byte,
) (adjustedLower uint64, adjustedUpper uint64, err error) {
// Remove the sentinel byte.
synthDecoded, _ := binary.Uvarint(suffix[1:])
if upper >= synthDecoded {
return 0, 0, errors.AssertionFailedf("the synthetic suffix %d is less than or equal to the original upper bound %d", synthDecoded, upper)
}
return synthDecoded, synthDecoded + 1, nil
}

// pebbleDataBlockMVCCTimeIntervalPointCollector implements
// pebble.DataBlockIntervalCollector for point keys.
type pebbleDataBlockMVCCTimeIntervalPointCollector struct {
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (p *pebbleIterator) setOptions(
p.rangeKeyMaskingBuf = encodeMVCCTimestampSuffixToBuf(
p.rangeKeyMaskingBuf, opts.RangeKeyMaskingBelow)
p.options.RangeKeyMasking.Suffix = p.rangeKeyMaskingBuf
p.maskFilter.BlockIntervalFilter.Init(mvccWallTimeIntervalCollector, 0, math.MaxUint64, nil /* syntheticReplacer */)
p.maskFilter.BlockIntervalFilter.Init(mvccWallTimeIntervalCollector, 0, math.MaxUint64, MVCCBlockIntervalSyntheticReplacer{})
p.options.RangeKeyMasking.Filter = p.getBlockPropertyFilterMask
}

Expand Down Expand Up @@ -317,7 +317,8 @@ func (p *pebbleIterator) setOptions(
sstable.NewBlockIntervalFilter(mvccWallTimeIntervalCollector,
uint64(opts.MinTimestamp.WallTime),
uint64(opts.MaxTimestamp.WallTime)+1,
nil /* syntheticReplacer */),
MVCCBlockIntervalSyntheticReplacer{},
),
}
p.options.PointKeyFilters = pkf[:1:2]
// NB: We disable range key block filtering because of complications in
Expand Down

0 comments on commit 340cc72

Please sign in to comment.