Skip to content

Commit

Permalink
externalconn,backupccl: add KMS support to External Connections
Browse files Browse the repository at this point in the history
In cockroachdb#84931 we taught the `ExternalStorage` infrastructure to
recognize the `external` URI scheme. In this change we do the same
but for the `KMS` infrastructure.

Concretely, a user is now able to create an external storage
object that represent a gcp KMS URI. This can be done using the
`CREATE EXTERNAL CONNECTION` syntax. The user is then able to point
an operation in CockroachDB such as BACKUP,RESTORE,SHOW BACKUP to that
KMS using an `external` URI. For example:

```
CREATE EXTERNAL CONNECTION backup AS 'nodelocal://1/foo';
CREATE EXTERNAL CONNECTION backupkms AS 'gs:///cmk?AUTH=implict';

BACKUP INTO 'external://foo' WITH kms='external://backupkms';
```

Under the hood, we implement the `ConnectionDetails` interface
for gcp KMS. This allows us to persist a row in the `external_connections`
table when the object is created, and to `Dial` the underlying resource
when the object is being used. The interfaces had to be tweaked slightly
to accomodate for the fact that they are now being implemented by two
different infrastructures `ExternalStorage` and `KMS`. This is an
expected evolution, and will pave the way for our third and final support
for changefeed `Sinks`. A large portion of this diff is just plumbing the
appropriate environments throught the backup/restore code.

This diff also adds KMS specific tests to `pkg/ccl/cloudccl/gcp` and tweaks
our nightly job to run these tests with the configured credentials.

Informs: cockroachdb#84753

Release note (sql change): GCP KMS can be represented as an External Connection
object, that can be used during a backup or restore using the `external` URI.
  • Loading branch information
adityamaru committed Jul 29, 2022
1 parent 7c7baeb commit 9df1e7c
Show file tree
Hide file tree
Showing 48 changed files with 1,035 additions and 163 deletions.
4 changes: 2 additions & 2 deletions build/teamcity/cockroach/nightlies/cloud_unit_tests_impl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export GOOGLE_APPLICATION_CREDENTIALS="$PWD/.google-credentials.json"

exit_status=0
$BAZEL_BIN/pkg/cmd/bazci/bazci_/bazci --config=ci \
test //pkg/cloud/gcp:gcp_test //pkg/cloud/amazon:amazon_test -- \
test //pkg/cloud/gcp:gcp_test //pkg/cloud/amazon:amazon_test //pkg/ccl/cloudccl/gcp:gcp_test -- \
--test_env=GO_TEST_WRAP_TESTV=1 \
--test_env=GO_TEST_WRAP=1 \
--test_env=GO_TEST_JSON_OUTPUT_FILE=$GO_TEST_JSON_OUTPUT_FILE \
Expand All @@ -37,7 +37,7 @@ $BAZEL_BIN/pkg/cmd/bazci/bazci_/bazci --config=ci \
--test_env=AWS_KMS_REGION="$AWS_KMS_REGION" \
--test_env=AWS_ACCESS_KEY_ID="$AWS_ACCESS_KEY_ID" \
--test_env=AWS_SECRET_ACCESS_KEY="$AWS_SECRET_ACCESS_KEY" \
--test_timeout=60 \
--test_timeout=900 \
|| exit_status=$?

process_test_json \
Expand Down
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ ALL_TESTS = [
"//pkg/ccl/changefeedccl:changefeedccl_test",
"//pkg/ccl/cliccl:cliccl_test",
"//pkg/ccl/cloudccl/externalconn:externalconn_test",
"//pkg/ccl/cloudccl/gcp:gcp_test",
"//pkg/ccl/importerccl:importerccl_test",
"//pkg/ccl/jobsccl/jobsprotectedtsccl:jobsprotectedtsccl_test",
"//pkg/ccl/kvccl/kvfollowerreadsccl:kvfollowerreadsccl_test",
Expand Down Expand Up @@ -656,6 +657,7 @@ GO_TARGETS = [
"//pkg/ccl/cliccl:cliccl",
"//pkg/ccl/cliccl:cliccl_test",
"//pkg/ccl/cloudccl/externalconn:externalconn_test",
"//pkg/ccl/cloudccl/gcp:gcp_test",
"//pkg/ccl/cmdccl/enc_utils:enc_utils",
"//pkg/ccl/cmdccl/enc_utils:enc_utils_lib",
"//pkg/ccl/cmdccl/stub-schema-registry:stub-schema-registry",
Expand Down Expand Up @@ -2061,6 +2063,7 @@ GET_X_DATA_TARGETS = [
"//pkg/ccl/cliccl:get_x_data",
"//pkg/ccl/cliccl/cliflagsccl:get_x_data",
"//pkg/ccl/cloudccl/externalconn:get_x_data",
"//pkg/ccl/cloudccl/gcp:get_x_data",
"//pkg/ccl/cmdccl/enc_utils:get_x_data",
"//pkg/ccl/cmdccl/stub-schema-registry:get_x_data",
"//pkg/ccl/gssapiccl:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ go_test(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/storage",
"//pkg/testutils",
Expand Down
14 changes: 5 additions & 9 deletions pkg/ccl/backupccl/alter_backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func doAlterBackupPlan(
}

ioConf := baseStore.ExternalIOConf()
kmsEnv := backupencryption.MakeBackupKMSEnv(baseStore.Settings(), &ioConf, p.ExecCfg().DB,
p.User(), p.ExecCfg().InternalExecutor)

// Check that at least one of the old keys has been used to encrypt the backup in the past.
// Use the first one that works to decrypt the ENCRYPTION-INFO file(s).
Expand All @@ -158,10 +160,7 @@ func doAlterBackupPlan(
for _, encFile := range opts {
defaultKMSInfo, err = backupencryption.ValidateKMSURIsAgainstFullBackup(ctx, []string{old},
backupencryption.NewEncryptedDataKeyMapFromProtoMap(encFile.EncryptedDataKeyByKMSMasterKeyID),
&backupencryption.BackupKMSEnv{
Settings: baseStore.Settings(),
Conf: &ioConf,
})
&kmsEnv)

if err == nil {
oldKMSFound = true
Expand All @@ -182,20 +181,17 @@ func doAlterBackupPlan(

// Recover the encryption key using the old key, so we can encrypt it again with the new keys.
var plaintextDataKey []byte
plaintextDataKey, err = backupencryption.GetEncryptionKey(ctx, encryption, baseStore.Settings(),
baseStore.ExternalIOConf())
plaintextDataKey, err = backupencryption.GetEncryptionKey(ctx, encryption, &kmsEnv)
if err != nil {
return err
}

kmsEnv := &backupencryption.BackupKMSEnv{Settings: p.ExecCfg().Settings, Conf: &p.ExecCfg().ExternalIODirConfig}

encryptedDataKeyByKMSMasterKeyID := backupencryption.NewEncryptedDataKeyMap()

// Add each new key user wants to add to a new data key map.
for _, kmsURI := range newKms {
masterKeyID, encryptedDataKey, err := backupencryption.GetEncryptedDataKeyFromURI(ctx,
plaintextDataKey, kmsURI, kmsEnv)
plaintextDataKey, kmsURI, &kmsEnv)
if err != nil {
return errors.Wrap(err, "failed to encrypt data key when adding new KMS")
}
Expand Down
38 changes: 24 additions & 14 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func backup(
execCtx sql.JobExecContext,
defaultURI string,
urisByLocalityKV map[string]string,
db *kv.DB,
settings *cluster.Settings,
defaultStore cloud.ExternalStorage,
storageByLocalityKV map[string]*cloudpb.ExternalStorage,
Expand Down Expand Up @@ -174,6 +173,10 @@ func backup(
return roachpb.RowCount{}, errors.Wrap(err, "failed to determine nodes on which to run")
}

kmsEnv := backupencryption.MakeBackupKMSEnv(execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig, execCtx.ExecCfg().DB, execCtx.User(),
execCtx.ExecCfg().InternalExecutor)

backupSpecs, err := distBackupPlanSpecs(
ctx,
planCtx,
Expand All @@ -186,6 +189,7 @@ func backup(
defaultURI,
urisByLocalityKV,
encryption,
&kmsEnv,
roachpb.MVCCFilter(backupManifest.MVCCFilter),
backupManifest.StartTime,
backupManifest.EndTime,
Expand Down Expand Up @@ -246,7 +250,7 @@ func backup(
})

err := backupinfo.WriteBackupManifestCheckpoint(
ctx, defaultURI, encryption, backupManifest, execCtx.ExecCfg(), execCtx.User(),
ctx, defaultURI, encryption, &kmsEnv, backupManifest, execCtx.ExecCfg(), execCtx.User(),
)
if err != nil {
log.Errorf(ctx, "unable to checkpoint backup descriptor: %+v", err)
Expand Down Expand Up @@ -308,16 +312,17 @@ func backup(
return err
}
defer store.Close()
return backupinfo.WriteBackupPartitionDescriptor(ctx, store, filename, encryption, &desc)
return backupinfo.WriteBackupPartitionDescriptor(ctx, store, filename,
encryption, &kmsEnv, &desc)
}(); err != nil {
return roachpb.RowCount{}, err
}
}
}

resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup manifest"})
if err := backupinfo.WriteBackupManifest(ctx, settings, defaultStore, backupbase.BackupManifestName,
encryption, backupManifest); err != nil {
if err := backupinfo.WriteBackupManifest(ctx, defaultStore, backupbase.BackupManifestName,
encryption, &kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, err
}
var tableStatistics []*stats.TableStatisticProto
Expand Down Expand Up @@ -347,12 +352,13 @@ func backup(
}

resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup table statistics"})
if err := backupinfo.WriteTableStatistics(ctx, defaultStore, encryption, &statsTable); err != nil {
if err := backupinfo.WriteTableStatistics(ctx, defaultStore, encryption, &kmsEnv, &statsTable); err != nil {
return roachpb.RowCount{}, err
}

if backupinfo.WriteMetadataSST.Get(&settings.SV) {
if err := backupinfo.WriteBackupMetadataSST(ctx, defaultStore, encryption, backupManifest, tableStatistics); err != nil {
if err := backupinfo.WriteBackupMetadataSST(ctx, defaultStore, encryption, &kmsEnv, backupManifest,
tableStatistics); err != nil {
err = errors.Wrap(err, "writing forward-compat metadata sst")
if !build.IsRelease() {
return roachpb.RowCount{}, err
Expand Down Expand Up @@ -403,6 +409,8 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
resumerSpan := tracing.SpanFromContext(ctx)
details := b.job.Details().(jobspb.BackupDetails)
p := execCtx.(sql.JobExecContext)
kmsEnv := backupencryption.MakeBackupKMSEnv(p.ExecCfg().Settings,
&p.ExecCfg().ExternalIODirConfig, p.ExecCfg().DB, p.User(), p.ExecCfg().InternalExecutor)

var backupManifest *backuppb.BackupManifest

Expand Down Expand Up @@ -452,7 +460,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
}

if err := backupinfo.WriteBackupManifestCheckpoint(
ctx, details.URI, details.EncryptionOptions, backupManifest, p.ExecCfg(), p.User(),
ctx, details.URI, details.EncryptionOptions, &kmsEnv, backupManifest, p.ExecCfg(), p.User(),
); err != nil {
return err
}
Expand Down Expand Up @@ -540,7 +548,8 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
}()

if backupManifest == nil || forceReadBackupManifest {
backupManifest, memSize, err = b.readManifestOnResume(ctx, &mem, p.ExecCfg(), defaultStore, details, p.User())
backupManifest, memSize, err = b.readManifestOnResume(ctx, &mem, p.ExecCfg(), defaultStore,
details, p.User(), &kmsEnv)
if err != nil {
return err
}
Expand Down Expand Up @@ -575,7 +584,6 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
p,
details.URI,
details.URIsByLocalityKV,
p.ExecCfg().DB,
p.ExecCfg().Settings,
defaultStore,
storageByLocalityKV,
Expand All @@ -600,7 +608,8 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
var reloadBackupErr error
mem.Shrink(ctx, memSize)
memSize = 0
backupManifest, memSize, reloadBackupErr = b.readManifestOnResume(ctx, &mem, p.ExecCfg(), defaultStore, details, p.User())
backupManifest, memSize, reloadBackupErr = b.readManifestOnResume(ctx, &mem, p.ExecCfg(),
defaultStore, details, p.User(), &kmsEnv)
if reloadBackupErr != nil {
return errors.Wrap(reloadBackupErr, "could not reload backup manifest when retrying")
}
Expand Down Expand Up @@ -728,27 +737,28 @@ func (b *backupResumer) readManifestOnResume(
defaultStore cloud.ExternalStorage,
details jobspb.BackupDetails,
user username.SQLUsername,
kmsEnv cloud.KMSEnv,
) (*backuppb.BackupManifest, int64, error) {
// We don't read the table descriptors from the backup descriptor, but
// they could be using either the new or the old foreign key
// representations. We should just preserve whatever representation the
// table descriptors were using and leave them alone.
desc, memSize, err := backupinfo.ReadBackupCheckpointManifest(ctx, mem, defaultStore,
backupinfo.BackupManifestCheckpointName, details.EncryptionOptions)
backupinfo.BackupManifestCheckpointName, details.EncryptionOptions, kmsEnv)
if err != nil {
if !errors.Is(err, cloud.ErrFileDoesNotExist) {
return nil, 0, errors.Wrapf(err, "reading backup checkpoint")
}
// Try reading temp checkpoint.
tmpCheckpoint := backupinfo.TempCheckpointFileNameForJob(b.job.ID())
desc, memSize, err = backupinfo.ReadBackupCheckpointManifest(ctx, mem, defaultStore,
tmpCheckpoint, details.EncryptionOptions)
tmpCheckpoint, details.EncryptionOptions, kmsEnv)
if err != nil {
return nil, 0, err
}
// "Rename" temp checkpoint.
if err := backupinfo.WriteBackupManifestCheckpoint(
ctx, details.URI, details.EncryptionOptions, &desc, cfg, user,
ctx, details.URI, details.EncryptionOptions, kmsEnv, &desc, cfg, user,
); err != nil {
mem.Shrink(ctx, memSize)
return nil, 0, errors.Wrapf(err, "renaming temp checkpoint file")
Expand Down
13 changes: 10 additions & 3 deletions pkg/ccl/backupccl/backup_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/blobs"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils"
Expand Down Expand Up @@ -89,7 +90,12 @@ func checkMetadata(
t.Fatal(err)
}

bm, err := backupinfo.NewBackupMetadata(ctx, store, backupinfo.MetadataSSTName, nil)
srv := tc.Servers[0]
execCfg := srv.ExecutorConfig().(sql.ExecutorConfig)
kmsEnv := backupencryption.MakeBackupKMSEnv(srv.ClusterSettings(), &base.ExternalIODirConfig{},
srv.DB(), username.RootUserName(), execCfg.InternalExecutor)
bm, err := backupinfo.NewBackupMetadata(ctx, store, backupinfo.MetadataSSTName,
nil /* encryption */, &kmsEnv)
if err != nil {
t.Fatal(err)
}
Expand All @@ -110,7 +116,7 @@ func checkMetadata(
}
checkFiles(ctx, t, m, bm)
checkTenants(ctx, t, m, bm)
checkStats(ctx, t, store, m, bm)
checkStats(ctx, t, store, m, bm, &kmsEnv)
}

func checkManifest(t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata) {
Expand Down Expand Up @@ -245,8 +251,9 @@ func checkStats(
store cloud.ExternalStorage,
m *backuppb.BackupManifest,
bm *backupinfo.BackupMetadata,
kmsEnv cloud.KMSEnv,
) {
expectedStats, err := backupinfo.GetStatisticsFromBackup(ctx, store, nil, *m)
expectedStats, err := backupinfo.GetStatisticsFromBackup(ctx, store, nil, kmsEnv, *m)
if err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,13 +1098,14 @@ func getBackupDetailAndManifest(
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}

kmsEnv := &backupencryption.BackupKMSEnv{Settings: execCfg.Settings, Conf: &execCfg.ExternalIODirConfig}
kmsEnv := backupencryption.MakeBackupKMSEnv(execCfg.Settings, &execCfg.ExternalIODirConfig,
execCfg.DB, user, execCfg.InternalExecutor)

mem := execCfg.RootMemoryMonitor.MakeBoundAccount()
defer mem.Close(ctx)

prevBackups, encryptionOptions, memSize, err := backupinfo.FetchPreviousBackups(ctx, &mem, user,
makeCloudStorage, prevs, *initialDetails.EncryptionOptions, kmsEnv)
makeCloudStorage, prevs, *initialDetails.EncryptionOptions, &kmsEnv)

if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
Expand Down Expand Up @@ -1185,7 +1186,7 @@ func getBackupDetailAndManifest(
urisByLocalityKV,
prevBackups,
encryptionOptions,
kmsEnv)
&kmsEnv)
if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}
Expand Down
15 changes: 9 additions & 6 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -24,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand All @@ -40,6 +40,7 @@ func distBackupPlanSpecs(
defaultURI string,
urisByLocalityKV map[string]string,
encryption *jobspb.BackupEncryptionOptions,
kmsEnv cloud.KMSEnv,
mvccFilter roachpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) {
Expand All @@ -48,7 +49,6 @@ func distBackupPlanSpecs(
_ = ctx // ctx is currently unused, but this new ctx should be used below in the future.
defer span.Finish()
user := execCtx.User()
execCfg := execCtx.ExecCfg()

var spanPartitions []sql.SpanPartition
var introducedSpanPartitions []sql.SpanPartition
Expand All @@ -67,13 +67,16 @@ func distBackupPlanSpecs(
}

if encryption != nil && encryption.Mode == jobspb.EncryptionMode_KMS {
kms, err := cloud.KMSFromURI(ctx, encryption.KMSInfo.Uri, &backupencryption.BackupKMSEnv{
Settings: execCfg.Settings,
Conf: &execCfg.ExternalIODirConfig,
})
kms, err := cloud.KMSFromURI(ctx, encryption.KMSInfo.Uri, kmsEnv)
if err != nil {
return nil, err
}
defer func() {
err := kms.Close()
if err != nil {
log.Infof(ctx, "failed to close KMS: %+v", err)
}
}()

encryption.Key, err = kms.Decrypt(planCtx.EvalContext().Context,
encryption.KMSInfo.EncryptedDataKey)
Expand Down
Loading

0 comments on commit 9df1e7c

Please sign in to comment.