From e2ffffc5016fb032508d35870d849aa988f3693b Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Tue, 7 Feb 2023 17:24:53 +0000 Subject: [PATCH] loqrecovery: support mixed version recovery This commit adds mixed version support for half-online loss of quorum recovery service and cli tools. This change would allow user to use loq recovery in partially upgraded clusters by tracking version that generated data and produce recovery plans which will have identical version so that versions could be verified on all steps of recovery. Release note: None --- docs/generated/http/full.md | 4 +- pkg/cli/BUILD.bazel | 1 + pkg/cli/cliflags/flags.go | 10 + pkg/cli/debug.go | 2 + pkg/cli/debug_recover_loss_of_quorum.go | 85 +++--- pkg/cli/debug_recover_loss_of_quorum_test.go | 143 ++++++---- pkg/kv/kvserver/loqrecovery/BUILD.bazel | 8 + pkg/kv/kvserver/loqrecovery/apply.go | 22 ++ pkg/kv/kvserver/loqrecovery/apply_test.go | 100 +++++++ pkg/kv/kvserver/loqrecovery/collect.go | 41 ++- pkg/kv/kvserver/loqrecovery/collect_test.go | 45 ++++ .../loqrecovery/loqrecoverypb/recovery.go | 5 + .../loqrecovery/loqrecoverypb/recovery.proto | 33 ++- .../loqrecoverypb/recovery_test.go | 21 +- pkg/kv/kvserver/loqrecovery/marshalling.go | 116 ++++++++ .../kvserver/loqrecovery/marshalling_test.go | 176 ++++++++++++ pkg/kv/kvserver/loqrecovery/plan.go | 32 ++- pkg/kv/kvserver/loqrecovery/plan_test.go | 100 +++++++ .../kvserver/loqrecovery/recovery_env_test.go | 16 +- pkg/kv/kvserver/loqrecovery/server.go | 45 +++- .../loqrecovery/server_integration_test.go | 87 +++++- pkg/kv/kvserver/loqrecovery/store.go | 8 +- pkg/kv/kvserver/loqrecovery/version.go | 108 ++++++++ pkg/kv/kvserver/loqrecovery/version_test.go | 254 ++++++++++++++++++ pkg/server/serverpb/admin.proto | 8 +- 25 files changed, 1343 insertions(+), 127 deletions(-) create mode 100644 pkg/kv/kvserver/loqrecovery/apply_test.go create mode 100644 pkg/kv/kvserver/loqrecovery/collect_test.go create mode 100644 pkg/kv/kvserver/loqrecovery/marshalling.go create mode 100644 pkg/kv/kvserver/loqrecovery/marshalling_test.go create mode 100644 pkg/kv/kvserver/loqrecovery/plan_test.go create mode 100644 pkg/kv/kvserver/loqrecovery/version.go create mode 100644 pkg/kv/kvserver/loqrecovery/version_test.go diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 924ba3ff4d8f..9445685f498c 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -7450,6 +7450,7 @@ Support status: [reserved](#support-status) | range_descriptor | [cockroach.roachpb.RangeDescriptor](#cockroach.server.serverpb.RecoveryCollectReplicaInfoResponse-cockroach.roachpb.RangeDescriptor) | | | [reserved](#support-status) | | replica_info | [cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ReplicaInfo](#cockroach.server.serverpb.RecoveryCollectReplicaInfoResponse-cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ReplicaInfo) | | | [reserved](#support-status) | | node_stream_restarted | [RecoveryCollectReplicaRestartNodeStream](#cockroach.server.serverpb.RecoveryCollectReplicaInfoResponse-cockroach.server.serverpb.RecoveryCollectReplicaRestartNodeStream) | | | [reserved](#support-status) | +| metadata | [cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ClusterMetadata](#cockroach.server.serverpb.RecoveryCollectReplicaInfoResponse-cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ClusterMetadata) | | | [reserved](#support-status) | @@ -7538,7 +7539,8 @@ Support status: [reserved](#support-status) | ----- | ---- | ----- | ----------- | -------------- | | plan | [cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ReplicaUpdatePlan](#cockroach.server.serverpb.RecoveryStagePlanRequest-cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ReplicaUpdatePlan) | | Plan is replica update plan to stage for application on next restart. Plan could be empty in that case existing plan is removed if present. | [reserved](#support-status) | | all_nodes | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | If all nodes is true, then receiver should act as a coordinator and perform a fan-out to stage plan on all nodes of the cluster. | [reserved](#support-status) | -| force_plan | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | Force plan tells receiver to ignore any plan already staged on the node if it is present and replace it with new plan (including empty one). | [reserved](#support-status) | +| force_plan | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | ForcePlan tells receiver to ignore any plan already staged on the node if it is present and replace it with new plan (including empty one). | [reserved](#support-status) | +| force_local_internal_version | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | ForceLocalInternalVersion tells server to update internal component of plan version to the one of active cluster version. This option needs to be set if target cluster is stuck in recovery where only part of nodes were successfully migrated. | [reserved](#support-status) | diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index 5447de319bc5..12b99c0d1e0f 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -367,6 +367,7 @@ go_test( "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/kv/kvserver/loqrecovery", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb", "//pkg/kv/kvserver/stateloader", "//pkg/roachpb", diff --git a/pkg/cli/cliflags/flags.go b/pkg/cli/cliflags/flags.go index 60845dad8b17..0c06a8bfb85e 100644 --- a/pkg/cli/cliflags/flags.go +++ b/pkg/cli/cliflags/flags.go @@ -1913,6 +1913,16 @@ p - prompt interactively for a confirmation `, } + RecoverIgnoreInternalVersion = FlagInfo{ + Name: "ignore-internal-version", + Description: ` +When set, staging and local store plan application commands will ignore internal +cluster version. This option must only be used to bypass version check if +cluster is stuck in the middle of upgrade and locally stored versions differ +from node to node and previous application or staging attempt failed. +`, + } + PrintKeyLength = FlagInfo{ Name: "print-key-max-length", Description: ` diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index 0b2dbe3b6795..85488b4fc69f 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -1451,6 +1451,8 @@ func init() { cliflags.ConfirmActions.Usage()) f.UintVar(&formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Name, formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Usage()) + f.BoolVar(&debugRecoverExecuteOpts.ignoreInternalVersion, cliflags.RecoverIgnoreInternalVersion.Name, + debugRecoverExecuteOpts.ignoreInternalVersion, cliflags.RecoverIgnoreInternalVersion.Usage()) f = debugMergeLogsCmd.Flags() f.Var(flagutil.Time(&debugMergeLogsOpts.from), "from", diff --git a/pkg/cli/debug_recover_loss_of_quorum.go b/pkg/cli/debug_recover_loss_of_quorum.go index 4c50e8c5e3f9..b792069976b4 100644 --- a/pkg/cli/debug_recover_loss_of_quorum.go +++ b/pkg/cli/debug_recover_loss_of_quorum.go @@ -21,6 +21,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" @@ -28,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/strutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -353,10 +353,9 @@ func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error { defer outFile.Close() writer = outFile } - jsonpb := protoutil.JSONPb{Indent: " "} - out, err := jsonpb.Marshal(&replicaInfo) + out, err := loqrecovery.MarshalReplicaInfo(replicaInfo) if err != nil { - return errors.Wrap(err, "failed to marshal collected replica info") + return err } if _, err := writer.Write(out); err != nil { return errors.Wrap(err, "failed to write collected replica info") @@ -564,33 +563,41 @@ Discarded live replicas: %d planFile = path.Base(debugRecoverPlanOpts.outputFileName) } - jsonpb := protoutil.JSONPb{Indent: " "} var out []byte - if out, err = jsonpb.Marshal(&plan); err != nil { - return errors.Wrap(err, "failed to marshal recovery plan") + if out, err = loqrecovery.MarshalPlan(plan); err != nil { + return err } if _, err = writer.Write(out); err != nil { return errors.Wrap(err, "failed to write recovery plan") } - // No args means we collected connection info from cluster and need to - // preserve flags for subsequent invocation. - remoteArgs := getCLIClusterFlags(len(args) == 0, cmd, func(flag string) bool { - _, filter := planSpecificFlags[flag] - return filter - }) + v := clusterversion.ClusterVersion{ + Version: plan.Version, + } + if v.IsActive(clusterversion.V23_1) { + // No args means we collected connection info from cluster and need to + // preserve flags for subsequent invocation. + remoteArgs := getCLIClusterFlags(len(args) == 0, cmd, func(flag string) bool { + _, filter := planSpecificFlags[flag] + return filter + }) - _, _ = fmt.Fprintf(stderr, `Plan created. + _, _ = fmt.Fprintf(stderr, `Plan created. To stage recovery application in half-online mode invoke: cockroach debug recover apply-plan %s %s Alternatively distribute plan to below nodes and invoke 'debug recover apply-plan --store= %s' on: `, remoteArgs, planFile, planFile) + } else { + _, _ = fmt.Fprintf(stderr, `Plan created. +To complete recovery, distribute plan to below nodes and invoke 'debug recover apply-plan --store= %s' on: +`, planFile) + } for _, node := range report.UpdatedNodes { - _, _ = fmt.Fprintf(stderr, "- node n%d, store(s) %s\n", node.NodeID, strutil.JoinIDs("s", node.StoreIDs)) + _, _ = fmt.Fprintf(stderr, "- node n%d, store(s) %s\n", node.NodeID, + strutil.JoinIDs("s", node.StoreIDs)) } - return nil } @@ -602,9 +609,8 @@ func readReplicaInfoData(fileNames []string) (loqrecoverypb.ClusterReplicaInfo, return loqrecoverypb.ClusterReplicaInfo{}, errors.Wrapf(err, "failed to read replica info file %q", filename) } - var nodeReplicas loqrecoverypb.ClusterReplicaInfo - jsonpb := protoutil.JSONPb{} - if err = jsonpb.Unmarshal(data, &nodeReplicas); err != nil { + nodeReplicas, err := loqrecovery.UnmarshalReplicaInfo(data) + if err != nil { return loqrecoverypb.ClusterReplicaInfo{}, errors.WithHint(errors.Wrapf(err, "failed to unmarshal replica info from file %q", filename), "Ensure that replica info file is generated with the same binary version and file is not corrupted.") @@ -633,8 +639,9 @@ See debug recover command help for more details on how to use this command. } var debugRecoverExecuteOpts struct { - Stores base.StoreSpecList - confirmAction confirmActionFlag + Stores base.StoreSpecList + confirmAction confirmActionFlag + ignoreInternalVersion bool } // runDebugExecuteRecoverPlan is using the following pattern when performing command @@ -655,20 +662,24 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error { return errors.Wrapf(err, "failed to read plan file %q", planFile) } - var nodeUpdates loqrecoverypb.ReplicaUpdatePlan - jsonpb := protoutil.JSONPb{Indent: " "} - if err = jsonpb.Unmarshal(data, &nodeUpdates); err != nil { + nodeUpdates, err := loqrecovery.UnmarshalPlan(data) + if err != nil { return errors.Wrapf(err, "failed to unmarshal plan from file %q", planFile) } if len(debugRecoverExecuteOpts.Stores.Specs) == 0 { - return stageRecoveryOntoCluster(ctx, cmd, planFile, nodeUpdates) + return stageRecoveryOntoCluster(ctx, cmd, planFile, nodeUpdates, + debugRecoverExecuteOpts.ignoreInternalVersion) } - return applyRecoveryToLocalStore(ctx, nodeUpdates) + return applyRecoveryToLocalStore(ctx, nodeUpdates, debugRecoverExecuteOpts.ignoreInternalVersion) } func stageRecoveryOntoCluster( - ctx context.Context, cmd *cobra.Command, planFile string, plan loqrecoverypb.ReplicaUpdatePlan, + ctx context.Context, + cmd *cobra.Command, + planFile string, + plan loqrecoverypb.ReplicaUpdatePlan, + ignoreInternalVersion bool, ) error { c, finish, err := getAdminClient(ctx, serverCfg) if err != nil { @@ -747,7 +758,11 @@ func stageRecoveryOntoCluster( return err } } - sr, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + sr, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &plan, + AllNodes: true, + ForceLocalInternalVersion: ignoreInternalVersion, + }) if err := maybeWrapStagingError("failed to stage loss of quorum recovery plan on cluster", sr, err); err != nil { return err @@ -787,19 +802,21 @@ func sortedKeys[T ~int | ~int32 | ~int64](set map[T]any) []T { } func applyRecoveryToLocalStore( - ctx context.Context, nodeUpdates loqrecoverypb.ReplicaUpdatePlan, + ctx context.Context, nodeUpdates loqrecoverypb.ReplicaUpdatePlan, ignoreInternalVersion bool, ) error { stopper := stop.NewStopper() defer stopper.Stop(ctx) var localNodeID roachpb.NodeID batches := make(map[roachpb.StoreID]storage.Batch) - for _, storeSpec := range debugRecoverExecuteOpts.Stores.Specs { + stores := make([]storage.Engine, len(debugRecoverExecuteOpts.Stores.Specs)) + for i, storeSpec := range debugRecoverExecuteOpts.Stores.Specs { store, err := OpenEngine(storeSpec.Path, stopper, storage.MustExist) if err != nil { return errors.Wrapf(err, "failed to open store at path %q. ensure that store path is "+ "correct and that it is not used by another process", storeSpec.Path) } + stores[i] = store batch := store.NewBatch() defer store.Close() defer batch.Close() @@ -818,6 +835,10 @@ func applyRecoveryToLocalStore( batches[storeIdent.StoreID] = batch } + if err := loqrecovery.CheckEnginesVersion(ctx, stores, nodeUpdates, ignoreInternalVersion); err != nil { + return err + } + updateTime := timeutil.Now() prepReport, err := loqrecovery.PrepareUpdateReplicas( ctx, nodeUpdates, uuid.DefaultGenerator, updateTime, localNodeID, batches) @@ -911,8 +932,8 @@ func runDebugVerify(cmd *cobra.Command, args []string) error { if err != nil { return errors.Wrapf(err, "failed to read plan file %q", planFile) } - jsonpb := protoutil.JSONPb{Indent: " "} - if err = jsonpb.Unmarshal(data, &updatePlan); err != nil { + updatePlan, err = loqrecovery.UnmarshalPlan(data) + if err != nil { return errors.Wrapf(err, "failed to unmarshal plan from file %q", planFile) } } diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index dcfc1376291a..f617dea3f5fb 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -18,10 +18,12 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -86,6 +88,8 @@ func TestCollectInfoFromMultipleStores(t *testing.T) { stores[r.StoreID] = struct{}{} } require.Equal(t, 2, len(stores), "collected replicas from stores") + require.Equal(t, clusterversion.ByKey(clusterversion.BinaryVersionKey), replicas.Version, + "collected version info from stores") } // TestCollectInfoFromOnlineCluster verifies that given a test cluster with @@ -147,6 +151,8 @@ func TestCollectInfoFromOnlineCluster(t *testing.T) { require.Equal(t, totalRanges*2, totalReplicas, "number of collected replicas") require.Equal(t, totalRanges, len(replicas.Descriptors), "number of collected descriptors from metadata") + require.Equal(t, clusterversion.ByKey(clusterversion.BinaryVersionKey), replicas.Version, + "collected version info from stores") } // TestLossOfQuorumRecovery performs a sanity check on end to end recovery workflow. @@ -231,7 +237,7 @@ func TestLossOfQuorumRecovery(t *testing.T) { planFile}) require.NoError(t, err, "failed to run apply plan") // Check that there were at least one mention of replica being promoted. - require.Contains(t, out, "will be updated", "no replica updated were recorded") + require.Contains(t, out, "will be updated", "no replica updates were recorded") require.Contains(t, out, fmt.Sprintf("Updated store(s): s%d", node1ID), "apply plan was not executed on requested node") @@ -303,6 +309,88 @@ func TestLossOfQuorumRecovery(t *testing.T) { "failed to write value to scratch range after recovery") } +// TestStageVersionCheck verifies that we can force plan with different internal +// version onto cluster. To do this, we create a plan with internal version +// above current but matching major and minor. Then we check that staging fails +// and that force flag will update plan version to match local node. +func TestStageVersionCheck(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderDeadlock(t, "slow under deadlock") + + ctx := context.Background() + _, cleanupFn := testutils.TempDir(t) + defer cleanupFn() + + c := NewCLITest(TestCLIParams{ + NoServer: true, + }) + defer c.Cleanup() + + storeReg := server.NewStickyInMemEnginesRegistry() + defer storeReg.CloseAllStickyInMemEngines() + tc := testcluster.NewTestCluster(t, 4, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: map[int]base.TestServerArgs{ + 0: { + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: storeReg, + }, + }, + StoreSpecs: []base.StoreSpec{ + {InMemory: true, StickyInMemoryEngineID: "1"}, + }, + }, + }, + }) + tc.Start(t) + defer tc.Stopper().Stop(ctx) + tc.StopServer(3) + + grpcConn, err := tc.Server(0).RPCContext().GRPCDialNode(tc.Server(0).ServingRPCAddr(), + tc.Server(0).NodeID(), rpc.DefaultClass).Connect(ctx) + require.NoError(t, err, "Failed to create test cluster after recovery") + adminClient := serverpb.NewAdminClient(grpcConn) + v := clusterversion.ByKey(clusterversion.BinaryVersionKey) + v.Internal++ + // To avoid crafting real replicas we use StaleLeaseholderNodeIDs to force + // node to stage plan for verification. + p := loqrecoverypb.ReplicaUpdatePlan{ + PlanID: uuid.FastMakeV4(), + Version: v, + ClusterID: tc.Server(0).StorageClusterID().String(), + DecommissionedNodeIDs: []roachpb.NodeID{4}, + StaleLeaseholderNodeIDs: []roachpb.NodeID{1}, + } + // Attempts to stage plan with different internal version must fail. + _, err = adminClient.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &p, + AllNodes: true, + ForcePlan: false, + ForceLocalInternalVersion: false, + }) + require.ErrorContains(t, err, "doesn't match cluster active version") + // Enable "stuck upgrade bypass" to stage plan on the cluster. + _, err = adminClient.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &p, + AllNodes: true, + ForcePlan: false, + ForceLocalInternalVersion: true, + }) + require.NoError(t, err, "force local must fix incorrect version") + // Check that stored plan has version matching cluster version. + fs, err := storeReg.GetUnderlyingFS(base.StoreSpec{InMemory: true, StickyInMemoryEngineID: "1"}) + require.NoError(t, err, "failed to get shared store fs") + ps := loqrecovery.NewPlanStore("", fs) + p, ok, err := ps.LoadPlan() + require.NoError(t, err, "failed to read node 0 plan") + require.True(t, ok, "plan was not staged") + require.Equal(t, clusterversion.ByKey(clusterversion.BinaryVersionKey), p.Version, + "plan version was not updated") +} + func createIntentOnRangeDescriptor( ctx context.Context, t *testing.T, tcBefore *testcluster.TestCluster, sk roachpb.Key, ) { @@ -517,59 +605,6 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { require.NoError(t, err, "failed to split range after recovery") } -// TestJsonSerialization verifies that all fields serialized in JSON could be -// read back. This specific test addresses issues where default naming scheme -// may not work in combination with other tags correctly. e.g. repeated used -// with omitempty seem to use camelcase unless explicitly specified. -func TestJsonSerialization(t *testing.T) { - defer leaktest.AfterTest(t)() - - nr := loqrecoverypb.ClusterReplicaInfo{ - ClusterID: "id1", - LocalInfo: []loqrecoverypb.NodeReplicaInfo{ - { - Replicas: []loqrecoverypb.ReplicaInfo{ - { - NodeID: 1, - StoreID: 2, - Desc: roachpb.RangeDescriptor{ - RangeID: 3, - StartKey: roachpb.RKey(keys.MetaMin), - EndKey: roachpb.RKey(keys.MetaMax), - InternalReplicas: []roachpb.ReplicaDescriptor{ - { - NodeID: 1, - StoreID: 2, - ReplicaID: 3, - Type: roachpb.VOTER_INCOMING, - }, - }, - NextReplicaID: 4, - Generation: 7, - }, - RaftAppliedIndex: 13, - RaftCommittedIndex: 19, - RaftLogDescriptorChanges: []loqrecoverypb.DescriptorChangeInfo{ - { - ChangeType: 1, - Desc: &roachpb.RangeDescriptor{}, - OtherDesc: &roachpb.RangeDescriptor{}, - }, - }, - }, - }, - }, - }, - } - jsonpb := protoutil.JSONPb{Indent: " "} - data, err := jsonpb.Marshal(&nr) - require.NoError(t, err) - - var crFromJSON loqrecoverypb.ClusterReplicaInfo - require.NoError(t, jsonpb.Unmarshal(data, &crFromJSON)) - require.Equal(t, nr, crFromJSON, "objects before and after serialization") -} - func TestUpdatePlanVsClusterDiff(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index 1cb3e649fe05..f8fc26565f7f 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -6,12 +6,14 @@ go_library( srcs = [ "apply.go", "collect.go", + "marshalling.go", "plan.go", "record.go", "server.go", "store.go", "testing_knobs.go", "utils.go", + "version.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery", visibility = ["//visibility:public"], @@ -53,20 +55,26 @@ go_library( go_test( name = "loqrecovery_test", srcs = [ + "apply_test.go", "collect_raft_log_test.go", + "collect_test.go", "main_test.go", + "marshalling_test.go", + "plan_test.go", "record_test.go", "recovery_env_test.go", "recovery_test.go", "server_integration_test.go", "server_test.go", "store_test.go", + "version_test.go", ], args = ["-test.timeout=295s"], data = glob(["testdata/**"]), embed = [":loqrecovery"], deps = [ "//pkg/base", + "//pkg/clusterversion", "//pkg/keys", "//pkg/kv/kvpb", "//pkg/kv/kvserver", diff --git a/pkg/kv/kvserver/loqrecovery/apply.go b/pkg/kv/kvserver/loqrecovery/apply.go index 3a1ef9516e62..615ea5eb8c16 100644 --- a/pkg/kv/kvserver/loqrecovery/apply.go +++ b/pkg/kv/kvserver/loqrecovery/apply.go @@ -16,6 +16,7 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" @@ -365,6 +366,10 @@ func MaybeApplyPendingRecoveryPlan( } applyPlan := func(nodeID roachpb.NodeID, plan loqrecoverypb.ReplicaUpdatePlan) error { + if err := CheckEnginesVersion(ctx, engines, plan, false); err != nil { + return errors.Wrap(err, "failed to check cluster version against storage") + } + log.Infof(ctx, "applying staged loss of quorum recovery plan %s", plan.PlanID) batches := make(map[roachpb.StoreID]storage.Batch) for _, e := range engines { @@ -436,3 +441,20 @@ func MaybeApplyPendingRecoveryPlan( } return nil } + +func CheckEnginesVersion( + ctx context.Context, + engines []storage.Engine, + plan loqrecoverypb.ReplicaUpdatePlan, + ignoreInternal bool, +) error { + binaryVersion := clusterversion.ByKey(clusterversion.BinaryVersionKey) + binaryMinSupportedVersion := clusterversion.ByKey(clusterversion.BinaryMinSupportedVersionKey) + clusterVersion, err := kvstorage.SynthesizeClusterVersionFromEngines( + ctx, engines, binaryVersion, binaryMinSupportedVersion, + ) + if err != nil { + return errors.Wrap(err, "failed to get cluster version from storage") + } + return checkPlanVersionMatches(plan.Version, clusterVersion.Version, ignoreInternal) +} diff --git a/pkg/kv/kvserver/loqrecovery/apply_test.go b/pkg/kv/kvserver/loqrecovery/apply_test.go new file mode 100644 index 000000000000..0cc83adfcdce --- /dev/null +++ b/pkg/kv/kvserver/loqrecovery/apply_test.go @@ -0,0 +1,100 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package loqrecovery + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/pebble/vfs" + "github.com/stretchr/testify/require" +) + +func TestApplyVerifiesVersion(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + testTime, _ := time.Parse(time.RFC3339, "2022-02-24T01:40:00Z") + clock := timeutil.NewManualTime(testTime) + ps := NewPlanStore("", vfs.NewMem()) + cid := uuid.MakeV4() + engines := []storage.Engine{ + createEngineOrFatal(ctx, t, cid, 1), + createEngineOrFatal(ctx, t, cid, 2), + } + defer engines[1].Close() + defer engines[0].Close() + + assertPlanError := func(t *testing.T, plan loqrecoverypb.ReplicaUpdatePlan, errMsg string) { + require.NoError(t, ps.SavePlan(plan), "failed to stage plan") + err := MaybeApplyPendingRecoveryPlan(ctx, ps, engines, clock) + require.NoError(t, err, "fatal error applying recovery plan") + report, ok, err := readNodeRecoveryStatusInfo(ctx, engines[0]) + require.NoError(t, err, "failed to read application outcome") + require.True(t, ok, "didn't find application outcome in engine") + require.NotEmpty(t, report.AppliedPlanID, "plan was not processed") + require.Contains(t, report.Error, errMsg, "version error not registered") + } + + t.Run("apply plan version is higher than cluster", func(t *testing.T) { + aboveCurrent := clusterversion.ByKey(clusterversion.BinaryVersionKey) + aboveCurrent.Major += 1 + plan := loqrecoverypb.ReplicaUpdatePlan{ + PlanID: uuid.MakeV4(), + ClusterID: cid.String(), + Version: aboveCurrent, + } + assertPlanError(t, plan, "doesn't match cluster active version") + }) + + t.Run("apply plan version lower than current", func(t *testing.T) { + belowMin := clusterversion.ByKey(clusterversion.BinaryMinSupportedVersionKey) + belowMin.Minor -= 1 + plan := loqrecoverypb.ReplicaUpdatePlan{ + PlanID: uuid.MakeV4(), + ClusterID: cid.String(), + Version: belowMin, + } + assertPlanError(t, plan, "doesn't match cluster active version") + }) +} + +func createEngineOrFatal(ctx context.Context, t *testing.T, uuid uuid.UUID, id int) storage.Engine { + eng, err := storage.Open(ctx, + storage.InMemory(), + cluster.MakeClusterSettings(), + storage.CacheSize(1<<20 /* 1 MiB */)) + if err != nil { + t.Fatalf("failed to crate in mem store: %v", err) + } + sIdent := roachpb.StoreIdent{ + ClusterID: uuid, + NodeID: roachpb.NodeID(id), + StoreID: roachpb.StoreID(id), + } + if err = storage.MVCCPutProto( + context.Background(), eng, nil, keys.StoreIdentKey(), hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &sIdent, + ); err != nil { + t.Fatalf("failed to populate test store ident: %v", err) + } + return eng +} diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go index c021ff7a5c2e..487c5d31e90b 100644 --- a/pkg/kv/kvserver/loqrecovery/collect.go +++ b/pkg/kv/kvserver/loqrecovery/collect.go @@ -15,6 +15,7 @@ import ( "io" "math" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" @@ -36,10 +37,6 @@ type CollectionStats struct { func CollectRemoteReplicaInfo( ctx context.Context, c serverpb.AdminClient, ) (loqrecoverypb.ClusterReplicaInfo, CollectionStats, error) { - cInfo, err := c.Cluster(ctx, &serverpb.ClusterRequest{}) - if err != nil { - return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err - } cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{}) if err != nil { return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err @@ -50,6 +47,7 @@ func CollectRemoteReplicaInfo( var clusterReplInfo []loqrecoverypb.NodeReplicaInfo var nodeReplicas []loqrecoverypb.ReplicaInfo var currentNode roachpb.NodeID + var metadata loqrecoverypb.ClusterMetadata for { info, err := cc.Recv() if err != nil { @@ -80,12 +78,23 @@ func CollectRemoteReplicaInfo( if s.NodeID == currentNode { nodeReplicas = nil } + } else if m := info.GetMetadata(); m != nil { + metadata = *m } } + // We don't want to process data outside of safe version range for this CLI + // binary. RPC allows us to communicate with a cluster that is newer than + // the binary, but it will not version gate the data to binary version so we + // can receive entries that we won't be able to persist and process correctly. + if err := checkVersionAllowedByBinary(metadata.Version); err != nil { + return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, errors.Wrap(err, + "unsupported cluster info version") + } return loqrecoverypb.ClusterReplicaInfo{ - ClusterID: cInfo.ClusterID, + ClusterID: metadata.ClusterID, Descriptors: descriptors, LocalInfo: clusterReplInfo, + Version: metadata.Version, }, CollectionStats{ Nodes: len(nodes), Stores: len(stores), @@ -100,6 +109,19 @@ func CollectStoresReplicaInfo( if len(stores) == 0 { return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, errors.New("no stores were provided for info collection") } + + // Synthesizing version from engine ensures that binary is compatible with + // the store, so we don't need to do any extra checks. + binaryVersion := clusterversion.ByKey(clusterversion.BinaryVersionKey) + binaryMinSupportedVersion := clusterversion.ByKey(clusterversion.BinaryMinSupportedVersionKey) + version, err := kvstorage.SynthesizeClusterVersionFromEngines( + ctx, stores, binaryVersion, binaryMinSupportedVersion, + ) + if err != nil { + return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, errors.WithHint(err, + "ensure that used cli has compatible version with storage") + } + var clusterUUID uuid.UUID nodes := make(map[roachpb.NodeID]struct{}) var replicas []loqrecoverypb.ReplicaInfo @@ -115,7 +137,7 @@ func CollectStoresReplicaInfo( return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, errors.New("can't collect info from stored that belong to different clusters") } nodes[ident.NodeID] = struct{}{} - if err := visitStoreReplicas(ctx, reader, ident.StoreID, ident.NodeID, + if err := visitStoreReplicas(ctx, reader, ident.StoreID, ident.NodeID, version, func(info loqrecoverypb.ReplicaInfo) error { replicas = append(replicas, info) return nil @@ -126,6 +148,7 @@ func CollectStoresReplicaInfo( return loqrecoverypb.ClusterReplicaInfo{ ClusterID: clusterUUID.String(), LocalInfo: []loqrecoverypb.NodeReplicaInfo{{Replicas: replicas}}, + Version: version.Version, }, CollectionStats{ Nodes: len(nodes), Stores: len(stores), @@ -137,6 +160,7 @@ func visitStoreReplicas( reader storage.Reader, storeID roachpb.StoreID, nodeID roachpb.NodeID, + targetVersion clusterversion.ClusterVersion, send func(info loqrecoverypb.ReplicaInfo) error, ) error { if err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, reader, func(desc roachpb.RangeDescriptor) error { @@ -160,7 +184,10 @@ func visitStoreReplicas( return err } - localIsLeaseholder := rstate.Lease != nil && rstate.Lease.Replica.StoreID == storeID + var localIsLeaseholder bool + if targetVersion.IsActive(clusterversion.V23_1) { + localIsLeaseholder = rstate.Lease != nil && rstate.Lease.Replica.StoreID == storeID + } return send(loqrecoverypb.ReplicaInfo{ StoreID: storeID, diff --git a/pkg/kv/kvserver/loqrecovery/collect_test.go b/pkg/kv/kvserver/loqrecovery/collect_test.go new file mode 100644 index 000000000000..f41aada36bed --- /dev/null +++ b/pkg/kv/kvserver/loqrecovery/collect_test.go @@ -0,0 +1,45 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package loqrecovery + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +func TestCollectChecksStoreInfo(t *testing.T) { + leaktest.AfterTest(t) + ctx := context.Background() + + eng, err := storage.Open(ctx, + storage.InMemory(), + cluster.MakeClusterSettings(), + storage.CacheSize(1<<20 /* 1 MiB */)) + require.NoError(t, err, "failed to create engine") + defer eng.Close() + v := roachpb.Version{Major: 21, Minor: 2} + if err := kvstorage.WriteClusterVersionToEngines(ctx, []storage.Engine{eng}, + clusterversion.ClusterVersion{Version: v}); err != nil { + t.Fatalf("failed to populate test store cluster version: %v", err) + } + + _, _, err = CollectStoresReplicaInfo(ctx, []storage.Engine{eng}) + require.ErrorContains(t, err, "is too old for running version", + "engine version check not triggered") +} diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go index 78481b7f908a..6313e4825b05 100644 --- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go +++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go @@ -103,8 +103,13 @@ func (m *ClusterReplicaInfo) Merge(o ClusterReplicaInfo) error { return errors.Newf("can't merge cluster info from different cluster: %s != %s", m.ClusterID, o.ClusterID) } + if !m.Version.Equal(o.Version) { + return errors.Newf("can't merge cluster info from different version: %s != %s", m.Version, + o.Version) + } } else { m.ClusterID = o.ClusterID + m.Version = o.Version } if len(o.Descriptors) > 0 { if len(m.Descriptors) > 0 { diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto index 0ba052b2d64a..9e78dd63e261 100644 --- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto +++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto @@ -40,6 +40,11 @@ message DescriptorChangeInfo { // recovery. This information should be enough for recovery algorithm to pick a // survivor replica in when not replicas are available. // Information includes range descriptor as well as parts of raft state. +// When changing this structure, care must be exercised to keep it compatible +// with previous versions when serializing to json. See +// loqrecovery/marshalling.go for details how format is preserved and +// loqrecovery/collect.go on how replica info is generated with respect of +// active cluster version. message ReplicaInfo { int32 node_id = 1 [(gogoproto.customname) = "NodeID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; @@ -51,7 +56,17 @@ message ReplicaInfo { repeated DescriptorChangeInfo raft_log_descriptor_changes = 6 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "raft_log_descriptor_changes,omitempty"]; bool local_assumes_leaseholder = 7 [ - (gogoproto.jsontag) = "raft_log_descriptor_changes,omitempty"]; + (gogoproto.jsontag) = "local_assumes_leaseholder,omitempty"]; +} + +// ClusterMetadata contains info about cluster that planner can use when +// creating a plan. +message ClusterMetadata { + // ClusterID contains id of the cluster from which info was collected. + string cluster_id = 1 [(gogoproto.customname) = "ClusterID"]; + // Version contains effective cluster version of the cluster from which + // info was collected. + roachpb.Version version = 2 [(gogoproto.nullable) = false]; } // Collection of replica information gathered in a collect-info run. @@ -64,6 +79,9 @@ message NodeReplicaInfo { } // Replica info collected from one or more nodes of a cluster. +// When changing this structure, care must be exercised to keep it compatible +// with previous versions when serializing to json. See +// loqrecovery/marshalling.go for details how format is preserved. message ClusterReplicaInfo { // ClusterID contains id of the cluster from which info was collected. string cluster_id = 1 [(gogoproto.customname) = "ClusterID"]; @@ -76,11 +94,17 @@ message ClusterReplicaInfo { // subset of full info. They are not guaranteed to be split by node, each // element should contain disjoint subset of replica infos. repeated NodeReplicaInfo local_info = 3 [(gogoproto.nullable) = false]; + // Version contains effective cluster version of the cluster from which + // info was collected. + roachpb.Version version = 4 [(gogoproto.nullable) = false]; } // ReplicaUpdate contains information that needs to be updated on replica on the node // to make it a designated survivor so that replica could act as a source of truth when // doing loss of quorum recovery. +// When changing this structure, care must be exercised to keep it compatible +// with previous versions when serializing to json. See +// loqrecovery/marshalling.go for details how format is preserved. message ReplicaUpdate { option (gogoproto.goproto_stringer) = false; @@ -101,6 +125,9 @@ message ReplicaUpdate { } // ReplicaUpdatePlan Collection of updates for all recoverable replicas in the cluster. +// When changing this structure, care must be exercised to keep it compatible +// with previous versions when serializing to json. See +// loqrecovery/marshalling.go for details how format is preserved. message ReplicaUpdatePlan { repeated ReplicaUpdate updates = 1 [(gogoproto.nullable) = false]; // PlanID contains ID generated by cli when generating recovery plan and is subsequently @@ -119,6 +146,10 @@ message ReplicaUpdatePlan { // that they hold and that can't be shed because quorum is lost on the ranges. repeated int32 stale_leaseholder_node_ids = 5 [(gogoproto.customname) = "StaleLeaseholderNodeIDs", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + // Version contains version of the plan which is equal to the active cluster + // version of the data that this plan is based on. Plan could only be applied + // to the cluster of the same version. + roachpb.Version version = 6 [(gogoproto.nullable) = false]; } // ReplicaRecoveryRecord is a struct that loss of quorum recovery commands diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery_test.go b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery_test.go index 2db4c86c7238..caab3609a034 100644 --- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery_test.go +++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery_test.go @@ -124,12 +124,13 @@ func TestClusterInfoMergeSameClusterID(t *testing.T) { "should be able to merge partial info with equal cluster ids") } -func TestClusterInfoMergeRejectDifferentClusterIDs(t *testing.T) { +func TestClusterInfoMergeRejectDifferentMetadata(t *testing.T) { uuid1 := uuid.FastMakeV4() uuid2 := uuid.FastMakeV4() info1 := ClusterReplicaInfo{ ClusterID: uuid1.String(), Descriptors: []roachpb.RangeDescriptor{{RangeID: 1}}, + Version: roachpb.Version{Major: 22}, LocalInfo: []NodeReplicaInfo{ { Replicas: []ReplicaInfo{ @@ -139,8 +140,20 @@ func TestClusterInfoMergeRejectDifferentClusterIDs(t *testing.T) { }, }, } - info3 := ClusterReplicaInfo{ + info2 := ClusterReplicaInfo{ ClusterID: uuid2.String(), + Version: roachpb.Version{Major: 22}, + LocalInfo: []NodeReplicaInfo{ + { + Replicas: []ReplicaInfo{ + {StoreID: 3, NodeID: 2}, + }, + }, + }, + } + info3 := ClusterReplicaInfo{ + ClusterID: uuid1.String(), + Version: roachpb.Version{Major: 23}, LocalInfo: []NodeReplicaInfo{ { Replicas: []ReplicaInfo{ @@ -150,12 +163,15 @@ func TestClusterInfoMergeRejectDifferentClusterIDs(t *testing.T) { }, } require.Error(t, info1.Merge(info3), "reject merging of info from different clusters") + require.Error(t, info1.Merge(info2), "reject merging of info from different versions") } func TestClusterInfoInitializeByMerge(t *testing.T) { uuid1 := uuid.FastMakeV4().String() + version1 := roachpb.Version{Major: 22, Minor: 2} info := ClusterReplicaInfo{ ClusterID: uuid1, + Version: version1, Descriptors: []roachpb.RangeDescriptor{{RangeID: 1}}, LocalInfo: []NodeReplicaInfo{ { @@ -169,4 +185,5 @@ func TestClusterInfoInitializeByMerge(t *testing.T) { empty := ClusterReplicaInfo{} require.NoError(t, empty.Merge(info), "should be able to merge into empty struct") require.Equal(t, empty.ClusterID, uuid1, "merge should update empty info fields") + require.Equal(t, empty.Version, version1, "merge should update empty info fields") } diff --git a/pkg/kv/kvserver/loqrecovery/marshalling.go b/pkg/kv/kvserver/loqrecovery/marshalling.go new file mode 100644 index 000000000000..ab889387a598 --- /dev/null +++ b/pkg/kv/kvserver/loqrecovery/marshalling.go @@ -0,0 +1,116 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package loqrecovery + +import ( + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" +) + +// MarshalReplicaInfo serializes info into version dependent format. This is +// needed to create replica info files in format compatible with analyzed +// cluster version. For example if cluster is partially upgraded and node with +// new binary is collecting offline info for processing it should be created +// in a format compatible with node that will produce a plan which must create +// a plan compatible with offline application and could be either old or new +// binary. +func MarshalReplicaInfo(replicaInfo loqrecoverypb.ClusterReplicaInfo) ([]byte, error) { + jsonpb := protoutil.JSONPb{Indent: " "} + + v := clusterversion.ClusterVersion{ + Version: replicaInfo.Version, + } + if v.IsActive(clusterversion.V23_1) { + out, err := jsonpb.Marshal(&replicaInfo) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal replica info") + } + return out, nil + } + + var combined []loqrecoverypb.ReplicaInfo + for _, i := range replicaInfo.LocalInfo { + combined = append(combined, i.Replicas...) + } + // NB: this marshalling is incorrect, but we preserve a bug for backward + // compatibility. Message pointer is implementing interface, not struct + // itself. See Marshal below on how it must be done. + out, err := jsonpb.Marshal(loqrecoverypb.NodeReplicaInfo{Replicas: combined}) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal replica info in legacy (22.1) format") + } + return out, nil +} + +// UnmarshalReplicaInfo tries to guess format and deserialize data. Old format +// expects only a single field, so we try that first, if it fails, fall back to +// current format. +func UnmarshalReplicaInfo(data []byte) (loqrecoverypb.ClusterReplicaInfo, error) { + jsonpb := protoutil.JSONPb{} + var nodeReplicas loqrecoverypb.NodeReplicaInfo + if err := jsonpb.Unmarshal(data, &nodeReplicas); err == nil { + return loqrecoverypb.ClusterReplicaInfo{ + LocalInfo: []loqrecoverypb.NodeReplicaInfo{nodeReplicas}, + Version: legacyInfoFormatVersion, + }, nil + } + + var clusterReplicas loqrecoverypb.ClusterReplicaInfo + if err := jsonpb.Unmarshal(data, &clusterReplicas); err != nil { + return loqrecoverypb.ClusterReplicaInfo{}, err + } + if err := checkVersionAllowedByBinary(clusterReplicas.Version); err != nil { + return loqrecoverypb.ClusterReplicaInfo{}, errors.Wrap(err, "unsupported cluster info version") + } + return clusterReplicas, nil +} + +// legacyPlan mimics serialization of ReplicaUpdatePlan when using value instead +// of pointer and excludes all new fields that would be serialized as empty +// values otherwise. +type legacyPlan struct { + Updates []loqrecoverypb.ReplicaUpdate `json:"updates"` +} + +// MarshalPlan writes replica update plan in format compatible with target +// version. +func MarshalPlan(plan loqrecoverypb.ReplicaUpdatePlan) ([]byte, error) { + jsonpb := protoutil.JSONPb{Indent: " "} + + v := clusterversion.ClusterVersion{ + Version: plan.Version, + } + if v.IsActive(clusterversion.V23_1) { + out, err := jsonpb.Marshal(&plan) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal recovery plan") + } + return out, nil + } + out, err := jsonpb.Marshal(legacyPlan{Updates: plan.Updates}) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal recovery plan in v1 format") + } + return out, nil +} + +// UnmarshalPlan reads json containing replica update plan into struct. There's +// no special version handling for reading plans as formats are compatible. +func UnmarshalPlan(data []byte) (loqrecoverypb.ReplicaUpdatePlan, error) { + var nodeUpdates loqrecoverypb.ReplicaUpdatePlan + jsonpb := protoutil.JSONPb{Indent: " "} + if err := jsonpb.Unmarshal(data, &nodeUpdates); err != nil { + return loqrecoverypb.ReplicaUpdatePlan{}, err + } + return nodeUpdates, nil +} diff --git a/pkg/kv/kvserver/loqrecovery/marshalling_test.go b/pkg/kv/kvserver/loqrecovery/marshalling_test.go new file mode 100644 index 000000000000..8f529495ce3d --- /dev/null +++ b/pkg/kv/kvserver/loqrecovery/marshalling_test.go @@ -0,0 +1,176 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package loqrecovery + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +// TestJsonSerialization verifies that all fields serialized in JSON could be +// read back. This specific test addresses issues where default naming scheme +// may not work in combination with other tags correctly. e.g. repeated used +// with omitempty seem to use camelcase unless explicitly specified. +func TestJsonSerialization(t *testing.T) { + defer leaktest.AfterTest(t)() + + newVersion := clusterversion.ByKey(clusterversion.BinaryVersionKey) + + rs := []loqrecoverypb.ReplicaInfo{ + { + NodeID: 1, + StoreID: 2, + Desc: roachpb.RangeDescriptor{ + RangeID: 3, + StartKey: roachpb.RKey(keys.MetaMin), + EndKey: roachpb.RKey(keys.MetaMax), + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 2, + ReplicaID: 3, + Type: roachpb.VOTER_INCOMING, + }, + }, + NextReplicaID: 4, + Generation: 7, + }, + RaftAppliedIndex: 13, + RaftCommittedIndex: 19, + RaftLogDescriptorChanges: []loqrecoverypb.DescriptorChangeInfo{ + { + ChangeType: 1, + Desc: &roachpb.RangeDescriptor{}, + OtherDesc: &roachpb.RangeDescriptor{}, + }, + }, + LocalAssumesLeaseholder: true, + }, + } + + cr := loqrecoverypb.ClusterReplicaInfo{ + ClusterID: "id1", + Version: newVersion, + LocalInfo: []loqrecoverypb.NodeReplicaInfo{ + { + Replicas: rs, + }, + }, + Descriptors: []roachpb.RangeDescriptor{ + { + RangeID: 1, + StartKey: roachpb.RKey(keys.MetaMin), + EndKey: roachpb.RKey(keys.MetaMax), + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + ReplicaID: 3, + Type: 1, + }, + }, + NextReplicaID: 5, + Generation: 4, + StickyBit: hlc.Timestamp{}, + }, + }, + } + + lcr := loqrecoverypb.ClusterReplicaInfo{ + ClusterID: "id1", + Version: legacyInfoFormatVersion, + LocalInfo: []loqrecoverypb.NodeReplicaInfo{ + { + Replicas: rs, + }, + }, + } + + rup := []loqrecoverypb.ReplicaUpdate{ + { + RangeID: 53, + StartKey: loqrecoverypb.RecoveryKey(keys.MetaMin), + OldReplicaID: 7, + NewReplica: roachpb.ReplicaDescriptor{ + NodeID: 1, + StoreID: 1, + ReplicaID: 17, + Type: 0, + }, + NextReplicaID: 18, + }, + } + + pl := loqrecoverypb.ReplicaUpdatePlan{ + Updates: rup, + PlanID: uuid.FromStringOrNil("00000001-0000-4000-8000-000000000000"), + DecommissionedNodeIDs: []roachpb.NodeID{4, 5}, + ClusterID: "abc", + StaleLeaseholderNodeIDs: []roachpb.NodeID{3}, + Version: newVersion, + } + + lpl := loqrecoverypb.ReplicaUpdatePlan{ + Updates: rup, + } + + t.Run("cluster replica info", func(t *testing.T) { + out, err := MarshalReplicaInfo(cr) + require.NoError(t, err, "failed to marshal replica info") + ucr, err := UnmarshalReplicaInfo(out) + require.NoError(t, err, "failed to unmarshal replica info") + require.Equal(t, cr, ucr, "replica info before and after serialization") + }) + + t.Run("cluster replica info with v1 format", func(t *testing.T) { + out, err := MarshalReplicaInfo(lcr) + require.NoError(t, err, "failed to marshal legacy replica info") + ucr, err := UnmarshalReplicaInfo(out) + require.NoError(t, err, "failed to unmarshal replica info") + require.Equal(t, ucr.Version, legacyInfoFormatVersion, "legacy format should have generated version") + require.Empty(t, ucr.ClusterID, "legacy format should have no cluster id") + require.Nil(t, ucr.Descriptors, "legacy format should have no descriptors") + require.Equal(t, ucr.LocalInfo, ucr.LocalInfo, "replica info") + + // For collected replica info we want to check if raw object could be loaded + // with a legacy loader in previous versions. + jsonpb := protoutil.JSONPb{} + var nr loqrecoverypb.NodeReplicaInfo + require.NoError(t, jsonpb.Unmarshal(out, &nr), "failed to unmarshal with legacy unmashaler") + require.Equal(t, lcr.LocalInfo[0], nr, "replica info before and after serialization") + }) + + t.Run("update plan", func(t *testing.T) { + out, err := MarshalPlan(pl) + require.NoError(t, err, "failed to marshal plan") + upl, err := UnmarshalPlan(out) + require.NoError(t, err, "failed to unmarshal plan") + require.Equal(t, pl, upl, "plan before and after serialization") + }) + + t.Run("update plan with old version", func(t *testing.T) { + out, err := MarshalPlan(lpl) + require.NoError(t, err, "failed to marshal plan") + upl, err := UnmarshalPlan(out) + require.NoError(t, err, "failed to unmarshal plan") + require.Equal(t, lpl, upl, "plan before and after serialization") + require.Contains(t, string(out), "updates", "legacy plan format uses snake naming") + }) +} diff --git a/pkg/kv/kvserver/loqrecovery/plan.go b/pkg/kv/kvserver/loqrecovery/plan.go index 8937b1842d51..45a61991ba08 100644 --- a/pkg/kv/kvserver/loqrecovery/plan.go +++ b/pkg/kv/kvserver/loqrecovery/plan.go @@ -14,6 +14,7 @@ import ( "context" "sort" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -118,6 +119,14 @@ func (u PlannedReplicaUpdate) asReplicaUpdate() loqrecoverypb.ReplicaUpdate { // information not being atomically captured. // An error is returned in case of unrecoverable error in the collected data // that prevents creation of any sane plan or correctable user error. +// Note on versions in clusterInfo. +// Plan expects info collection and info reading functions to ensure that +// provided clusterInfo is compatible with current binary. +// It is planners responsibility to only use features supported by cluster +// version specified in the clusterInfo. Newer features must not be used as it +// may break the cluster if they are not backward compatible. Target versions +// is copied into resulting plan so that cluster could reject higher versions +// of plans. func PlanReplicas( ctx context.Context, clusterInfo loqrecoverypb.ClusterReplicaInfo, @@ -182,12 +191,22 @@ func PlanReplicas( } sort.Sort(roachpb.NodeIDSlice(staleLeaseholderNodes)) + v := clusterversion.ClusterVersion{ + Version: clusterInfo.Version, + } + if v.IsActive(clusterversion.V23_1) { + return loqrecoverypb.ReplicaUpdatePlan{ + Updates: updates, + PlanID: planID, + DecommissionedNodeIDs: decommissionNodeIDs, + ClusterID: clusterInfo.ClusterID, + StaleLeaseholderNodeIDs: staleLeaseholderNodes, + Version: clusterInfo.Version, + }, report, err + } + return loqrecoverypb.ReplicaUpdatePlan{ - Updates: updates, - PlanID: planID, - DecommissionedNodeIDs: decommissionNodeIDs, - ClusterID: clusterInfo.ClusterID, - StaleLeaseholderNodeIDs: staleLeaseholderNodes, + Updates: updates, }, report, err } @@ -258,13 +277,10 @@ func planReplicasWithoutMeta( for _, p := range proposedSurvivors { u, ok := makeReplicaUpdateIfNeeded(ctx, p, availableStoreIDs) if !ok { - log.Infof(ctx, "range r%d didn't lose quorum", p.rangeID()) continue } problems = append(problems, checkDescriptor(p)...) updates = append(updates, u) - log.Infof(ctx, "replica has lost quorum, recovering: %s -> %s", - p.survivor().Desc, u.NewReplica) } sort.Slice(problems, func(i, j int) bool { diff --git a/pkg/kv/kvserver/loqrecovery/plan_test.go b/pkg/kv/kvserver/loqrecovery/plan_test.go new file mode 100644 index 000000000000..45cebe20094d --- /dev/null +++ b/pkg/kv/kvserver/loqrecovery/plan_test.go @@ -0,0 +1,100 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package loqrecovery + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +func TestVersionIsPreserved(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + current := clusterversion.ByKey(clusterversion.BinaryVersionKey) + current.Patch += 1 + + replicaInfo := infoWithVersion(current) + + plan, _, err := PlanReplicas(ctx, replicaInfo, nil, nil, uuid.DefaultGenerator) + require.NoError(t, err, "good version is rejected") + require.Equal(t, current, plan.Version, "plan version was not preserved") +} + +func TestV1PlanContainOnlyRelevantInfo(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + replicaInfo := infoWithVersion(legacyInfoFormatVersion) + // Create a plan that could contain decom node ids and other optional fields + // and check that none of newer fields leak to resulting plan. + replicaInfo.LocalInfo = []loqrecoverypb.NodeReplicaInfo{ + { + Replicas: []loqrecoverypb.ReplicaInfo{ + { + NodeID: 1, + StoreID: 1, + Desc: roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + ReplicaID: 1, + }, + { + NodeID: 2, + StoreID: 2, + ReplicaID: 2, + }, + { + NodeID: 3, + StoreID: 3, + ReplicaID: 3, + }, + }, + NextReplicaID: 4, + Generation: 4, + }, + RaftAppliedIndex: 0, + RaftCommittedIndex: 0, + RaftLogDescriptorChanges: nil, + LocalAssumesLeaseholder: false, + }, + }, + }, + } + + plan, _, err := PlanReplicas(ctx, replicaInfo, nil, nil, uuid.DefaultGenerator) + require.NoError(t, err, "good version is rejected") + require.Nil(t, plan.DecommissionedNodeIDs, "v1 plan should have no decom nodes ids") + require.Nil(t, plan.StaleLeaseholderNodeIDs, "v1 plan should have no stale node ids") + require.Equal(t, uuid.UUID{}, plan.PlanID, "v1 plan should have no plan id") + require.Empty(t, plan.ClusterID, "v1 plan should have no cluster id") + require.Equal(t, roachpb.Version{}, plan.Version, "v1 plan should have no version") +} + +// infoWithVersion creates a skeleton info that passes all checks beside version. +func infoWithVersion(v roachpb.Version) loqrecoverypb.ClusterReplicaInfo { + return loqrecoverypb.ClusterReplicaInfo{ + ClusterID: uuid.FastMakeV4().String(), + Version: v, + } +} diff --git a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go index f96d0dbea785..556acf11e9f7 100644 --- a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go +++ b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -559,10 +560,7 @@ func (e *quorumRecoveryEnv) handleDescriptorData(t *testing.T, d datadriven.Test func (e *quorumRecoveryEnv) handleMakePlan(t *testing.T, d datadriven.TestData) (string, error) { stores := e.parseStoresArg(t, d, false /* defaultToAll */) nodes := e.parseNodesArg(t, d) - plan, report, err := PlanReplicas(context.Background(), loqrecoverypb.ClusterReplicaInfo{ - Descriptors: e.replicas.Descriptors, - LocalInfo: e.replicas.LocalInfo, - }, stores, nodes, e.uuidGen) + plan, report, err := PlanReplicas(context.Background(), e.replicas, stores, nodes, e.uuidGen) if err != nil { return "", err } @@ -620,6 +618,10 @@ func (e *quorumRecoveryEnv) getOrCreateStore( ); err != nil { t.Fatalf("failed to populate test store ident: %v", err) } + v := clusterversion.ByKey(clusterversion.BinaryVersionKey) + if err := kvstorage.WriteClusterVersionToEngines(ctx, []storage.Engine{eng}, clusterversion.ClusterVersion{Version: v}); err != nil { + t.Fatalf("failed to populate test store cluster version: %v", err) + } wrapped.engine = eng wrapped.nodeID = nodeID e.stores[storeID] = wrapped @@ -644,6 +646,12 @@ func (e *quorumRecoveryEnv) handleCollectReplicas( return "", err } } + if len(e.replicas.LocalInfo) == 0 { + // This is unrealistic as we don't have metadata. We need to fake it here + // to pass planner checks. + e.replicas.ClusterID = e.clusterID.String() + e.replicas.Version = clusterversion.ByKey(clusterversion.BinaryVersionKey) + } e.replicas.Descriptors = e.meta return "ok", nil } diff --git a/pkg/kv/kvserver/loqrecovery/server.go b/pkg/kv/kvserver/loqrecovery/server.go index 11b05934da92..049f0abe404e 100644 --- a/pkg/kv/kvserver/loqrecovery/server.go +++ b/pkg/kv/kvserver/loqrecovery/server.go @@ -119,10 +119,11 @@ func (s Server) ServeLocalReplicas( _ *serverpb.RecoveryCollectLocalReplicaInfoRequest, stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer, ) error { + v := s.settings.Version.ActiveVersion(ctx) return s.stores.VisitStores(func(s *kvserver.Store) error { reader := s.TODOEngine().NewSnapshot() defer reader.Close() - return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(), + return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(), v, func(info loqrecoverypb.ReplicaInfo) error { return stream.Send(&serverpb.RecoveryCollectLocalReplicaInfoResponse{ReplicaInfo: &info}) }) @@ -136,6 +137,9 @@ func (s Server) ServeClusterReplicas( kvDB *kv.DB, ) (err error) { // Block requests that require fan-out to other nodes until upgrade is finalized. + // We can't assume that caller is up-to-date with cluster version and process + // regardless of version known by current node as recommended for RPC + // requests because caller is a CLI which that only knows its binary version. if !s.settings.Version.IsActive(ctx, clusterversion.V23_1) { return errors.Newf("loss of quorum recovery service requires cluster upgraded to 23.1") } @@ -150,6 +154,18 @@ func (s Server) ServeClusterReplicas( } }() + v := s.settings.Version.ActiveVersion(ctx) + if err = outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{ + Info: &serverpb.RecoveryCollectReplicaInfoResponse_Metadata{ + Metadata: &loqrecoverypb.ClusterMetadata{ + ClusterID: s.clusterIDContainer.String(), + Version: v.Version, + }, + }, + }); err != nil { + return err + } + err = contextutil.RunWithTimeout(ctx, "scan range descriptors", s.metadataQueryTimeout, func(txnCtx context.Context) error { txn := kvDB.NewTxn(txnCtx, "scan-range-descriptors") @@ -245,6 +261,9 @@ func (s Server) StagePlan( ctx context.Context, req *serverpb.RecoveryStagePlanRequest, ) (*serverpb.RecoveryStagePlanResponse, error) { // Block requests that require fan-out to other nodes until upgrade is finalized. + // We can't assume that caller is up-to-date with cluster version and process + // regardless of version known by current node as recommended for RPC + // requests because caller is a CLI which that only knows its binary version. if !s.settings.Version.IsActive(ctx, clusterversion.V23_1) { return nil, errors.Newf("loss of quorum recovery service requires cluster upgraded to 23.1") } @@ -252,10 +271,19 @@ func (s Server) StagePlan( if !req.ForcePlan && req.Plan == nil { return nil, errors.New("stage plan request can't be used with empty plan without force flag") } - clusterID := s.clusterIDContainer.Get().String() - if req.Plan != nil && req.Plan.ClusterID != clusterID { - return nil, errors.Newf("attempting to stage plan from cluster %s on cluster %s", - req.Plan.ClusterID, clusterID) + if p := req.Plan; p != nil { + clusterID := s.clusterIDContainer.Get().String() + if p.ClusterID != clusterID { + return nil, errors.Newf("attempting to stage plan from cluster %s on cluster %s", + p.ClusterID, clusterID) + } + version := s.settings.Version.ActiveVersion(ctx) + if err := checkPlanVersionMatches(p.Version, version.Version, req.ForceLocalInternalVersion); err != nil { + return nil, errors.Wrap(err, "incompatible plan") + } + // It is safe to always update internal to reflect active version since it + // is allowed by the check above or is not needed. + p.Version.Internal = version.Internal } localNodeID := s.nodeIDContainer.Get() @@ -320,9 +348,10 @@ func (s Server) StagePlan( func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { delete(foundNodes, nodeID) res, err := client.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ - Plan: req.Plan, - AllNodes: false, - ForcePlan: req.ForcePlan, + Plan: req.Plan, + AllNodes: false, + ForcePlan: req.ForcePlan, + ForceLocalInternalVersion: req.ForceLocalInternalVersion, }) if err != nil { nodeErrors = append(nodeErrors, diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index b369fccaae6a..6f2c780deaf3 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" @@ -73,7 +74,7 @@ func TestReplicaCollection(t *testing.T) { // Collect and assert replica metadata. For expectMeta case we sometimes have // meta and sometimes doesn't depending on which node holds the lease. // We just ignore descriptor counts if we are not expecting meta. - assertReplicas := func(liveNodes int, expectMeta bool) { + assertReplicas := func(liveNodes int, expectRangeMeta bool) { var replicas loqrecoverypb.ClusterReplicaInfo var stats loqrecovery.CollectionStats @@ -84,7 +85,7 @@ func TestReplicaCollection(t *testing.T) { cnt := getInfoCounters(replicas) require.Equal(t, liveNodes, cnt.stores, "collected replicas from stores") require.Equal(t, liveNodes, cnt.nodes, "collected replicas from nodes") - if expectMeta { + if expectRangeMeta { require.Equal(t, totalRanges, cnt.descriptors, "number of collected descriptors from metadata") } @@ -92,10 +93,13 @@ func TestReplicaCollection(t *testing.T) { // Check stats counters as well. require.Equal(t, liveNodes, stats.Nodes, "node counter stats") require.Equal(t, liveNodes, stats.Stores, "store counter stats") - if expectMeta { + if expectRangeMeta { require.Equal(t, totalRanges, stats.Descriptors, "range descriptor counter stats") } require.NotEqual(t, replicas.ClusterID, uuid.UUID{}.String(), "cluster UUID must not be empty") + require.Equal(t, replicas.Version, + clusterversion.ByKey(clusterversion.BinaryVersionKey), + "replica info version must match current binary version") } tc.StopServer(0) @@ -291,6 +295,36 @@ func TestStageRecoveryPlans(t *testing.T) { require.Equal(t, &plan.PlanID, statuses[3].PendingPlanID, "incorrect plan id on node 3") } +func TestStageBadVersions(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, _, lReg := prepTestCluster(t, 1) + defer lReg.Close() + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + sk := tc.ScratchRange(t) + plan := makeTestRecoveryPlan(ctx, t, adm) + plan.Updates = []loqrecoverypb.ReplicaUpdate{ + createRecoveryForRange(t, tc, sk, 1), + } + plan.Version = clusterversion.ByKey(clusterversion.BinaryMinSupportedVersionKey) + plan.Version.Major -= 1 + + _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.Error(t, err, "shouldn't stage plan with old version") + + plan.Version.Major += 2 + _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + require.Error(t, err, "shouldn't stage plan with future version") +} + func TestStageConflictingPlans(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -619,6 +653,50 @@ func TestRetrieveApplyStatus(t *testing.T) { require.Equal(t, len(planDetails.UpdatedNodes), applied, "number of applied plans") } +func TestRejectBadVersionApplication(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, reg, pss, lReg := prepTestCluster(t, 3) + defer lReg.Close() + defer reg.CloseAllStickyInMemEngines() + defer tc.Stopper().Stop(ctx) + + adm, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err, "failed to get admin client") + + var replicas loqrecoverypb.ClusterReplicaInfo + testutils.SucceedsSoon(t, func() error { + var err error + replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + return err + }) + plan, _, err := loqrecovery.PlanReplicas(ctx, replicas, nil, nil, uuid.DefaultGenerator) + require.NoError(t, err, "failed to create a plan") + // Lower plan version below compatible one to trigger + plan.Version.Major = 19 + + tc.StopServer(1) + require.NoError(t, pss[1].SavePlan(plan), "failed to inject plan into storage") + lReg.ReopenOrFail(t, 1) + require.NoError(t, tc.RestartServer(1), "failed to restart server") + + r, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + require.NoError(t, err, "failed to run recovery verify") + found := false + for _, s := range r.Statuses { + if s.NodeID == 2 { + require.NotNil(t, s.AppliedPlanID) + require.Equal(t, plan.PlanID, *s.AppliedPlanID) + require.Contains(t, s.Error, "failed to check cluster version against storage") + found = true + } + } + require.True(t, found, "restarted node not found in verify status") +} + func prepTestCluster( t *testing.T, nodes int, ) ( @@ -680,7 +758,7 @@ func createRecoveryForRange( rngD, err := tc.LookupRange(key) require.NoError(t, err, "can't find range for key %s", key) replD, ok := rngD.GetReplicaDescriptor(roachpb.StoreID(storeID)) - require.True(t, ok, "expecting scratch replica on node 3") + require.True(t, ok, "expecting scratch replica on store %d", storeID) replD.ReplicaID += 10 return loqrecoverypb.ReplicaUpdate{ RangeID: rngD.RangeID, @@ -700,5 +778,6 @@ func makeTestRecoveryPlan( return loqrecoverypb.ReplicaUpdatePlan{ PlanID: uuid.MakeV4(), ClusterID: cr.ClusterID, + Version: clusterversion.ByKey(clusterversion.BinaryVersionKey), } } diff --git a/pkg/kv/kvserver/loqrecovery/store.go b/pkg/kv/kvserver/loqrecovery/store.go index 51fedc8b5bbd..0f961327d516 100644 --- a/pkg/kv/kvserver/loqrecovery/store.go +++ b/pkg/kv/kvserver/loqrecovery/store.go @@ -21,7 +21,7 @@ import ( ) const recoveryPlanDir = "loq-recovery-plans" -const planFileName = "staged.json" +const planFileName = "staged.bin" type PlanStore struct { path string @@ -59,9 +59,8 @@ func (s PlanStore) SavePlan(plan loqrecoverypb.ReplicaUpdatePlan) error { return errors.Wrapf(err, "failed to create file %q", tmpFileName) } defer func() { _ = outFile.Close() }() - jsonpb := protoutil.JSONPb{Indent: " "} var out []byte - if out, err = jsonpb.Marshal(&plan); err != nil { + if out, err = protoutil.Marshal(&plan); err != nil { return errors.Wrap(err, "failed to marshal recovery plan") } if _, err = outFile.Write(out); err != nil { @@ -101,8 +100,7 @@ func (s PlanStore) LoadPlan() (loqrecoverypb.ReplicaUpdatePlan, bool, error) { fileName) } var nodeUpdates loqrecoverypb.ReplicaUpdatePlan - jsonpb := protoutil.JSONPb{Indent: " "} - if err = jsonpb.Unmarshal(data, &nodeUpdates); err != nil { + if err = protoutil.Unmarshal(data, &nodeUpdates); err != nil { return loqrecoverypb.ReplicaUpdatePlan{}, false, errors.Wrapf(err, "failed to unmarshal plan from file %q", fileName) } diff --git a/pkg/kv/kvserver/loqrecovery/version.go b/pkg/kv/kvserver/loqrecovery/version.go new file mode 100644 index 000000000000..c82172586b7c --- /dev/null +++ b/pkg/kv/kvserver/loqrecovery/version.go @@ -0,0 +1,108 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package loqrecovery + +import ( + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/errors" +) + +/* +Loss of quorum replica recovery supports multi-version clusters during recovery +process. + +There are a number of areas that are specific for LOQ recovery in terms of +version handling. +Tools are using json as an intermediate format to collect replica info and to +store replica update plan on the CLI side. That could cause problems when +underlying protobufs change. We can't deserialize json with unknown fields or +enums safely. +To address this problem, we have a cluster version embedded in the file and +whenever we generate some data, we need to ensure that generated structs doesn't +contain any new fields that are not supported by the version of the target file. +The reason why we need to version files is that cluster could have a mixed set +of binaries and if new binary is operating in a cluster of previous version, +it should never communicate or write files in a newer format. + +We support versioning for both offline and half-online approaches. + +For offline approach, collection will produce replica files with data compatible +with current active cluster version. Collection could be done by the binary +present on the node, either old or new. +Planner will respect cluster version and produce a plan with a version equal to +active cluster version and only containing features that are compatible with it. +Planning could be done by either new or old binary. +Plan application will ensure that stores contain version which is equal to the +version present in the plan. Application could be performed by the binary +present on the node. + +For half-online approach, collecting CLI will ensure that it supports version +of the cluster reported in collection metadata. Admin rpc replica collection +endpoints will ensure that only data compatible with active cluster version is +collected. CLI will produce a recovery plan with a version equal to active +cluster version and only containing features that are compatible with it. +Staging will verify that plan version is equal to active cluster version down to +internal version as upgrade steps could be version gates to underlying storage +format changes that must be reflected in recovery. +Application will verify that plan version equal to active cluster version upon +restart. +*/ + +// legacyInfoFormatVersion is a version used internally when processing data +// loaded from legacy format files which contained no version info or collected +// from old clusters. +var legacyInfoFormatVersion = clusterversion.ByKey(clusterversion.V22_2) + +// checkVersionAllowedByBinary checks if binary could handle data version. Data +// could be either loaded from files or received from cluster. +func checkVersionAllowedByBinary(version roachpb.Version) error { + return checkVersionAllowedImpl(version, + clusterversion.ByKey(clusterversion.BinaryMinSupportedVersionKey), + clusterversion.ByKey(clusterversion.BinaryVersionKey)) +} + +func checkVersionAllowedImpl(version, minSupported, binaryVersion roachpb.Version) error { + if version.Less(minSupported) { + return errors.Newf("version is too old, minimum supported %s, found %s", minSupported, version) + } + if binaryVersion.Less(version) { + return errors.Newf("version is too new, maximum supported %s, found %s", binaryVersion, version) + } + return nil +} + +// checkPlanVersionMatches verifies that plan with version could be staged or +// applied on cluster with version current. +// Note that patch versions must be zero in accordance with roachpb.Version +// documentation. +func checkPlanVersionMatches(version, current roachpb.Version, ignoreInternal bool) error { + if current.Equal(version) { + return nil + } + + if ignoreInternal { + newVersion := version + newVersion.Internal = current.Internal + if newVersion.Equal(current) { + return nil + } + } + + // We should never have a plan or cluster with a non zero patch version. + // If plan has it, call it out explicitly. If cluster is broken and somehow + // got non zero patch, then a plan generated for that cluster would preserve + // it and pass checks above. + if version.Patch != 0 { + return errors.Newf("recovery plan must not use patch versions %s, active cluster version %s", version, current) + } + return errors.Newf("recovery plan version %s, doesn't match cluster active version %s", version, current) +} diff --git a/pkg/kv/kvserver/loqrecovery/version_test.go b/pkg/kv/kvserver/loqrecovery/version_test.go new file mode 100644 index 000000000000..639bc0ce2ade --- /dev/null +++ b/pkg/kv/kvserver/loqrecovery/version_test.go @@ -0,0 +1,254 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package loqrecovery + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/stretchr/testify/require" +) + +func TestVersionCompatibility(t *testing.T) { + for _, d := range []struct { + name string + version roachpb.Version + minSupported roachpb.Version + current roachpb.Version + err bool + isV1 bool + }{ + { + name: "current version", + version: roachpb.Version{ + Major: 23, + Minor: 1, + }, + minSupported: roachpb.Version{ + Major: 22, + Minor: 2, + }, + current: roachpb.Version{ + Major: 23, + Minor: 1, + }, + }, + { + name: "v1", + version: roachpb.Version{ + Major: 22, + Minor: 2, + }, + minSupported: roachpb.Version{ + Major: 22, + Minor: 2, + }, + current: roachpb.Version{ + Major: 23, + Minor: 2, + }, + isV1: true, + }, + { + name: "in between", + version: roachpb.Version{ + Major: 23, + Minor: 1, + }, + minSupported: roachpb.Version{ + Major: 22, + Minor: 2, + }, + current: roachpb.Version{ + Major: 23, + Minor: 2, + }, + }, + { + name: "too old", + version: roachpb.Version{ + Major: 22, + Minor: 1, + }, + minSupported: roachpb.Version{ + Major: 22, + Minor: 2, + }, + current: roachpb.Version{ + Major: 23, + Minor: 1, + }, + err: true, + }, + { + name: "dev build with release data file", + version: roachpb.Version{ + Major: 22, + Minor: 2, + Internal: 50, + }, + minSupported: roachpb.Version{ + Major: 22 + clusterversion.DevOffset, + Minor: 2, + }, + current: roachpb.Version{ + Major: 22 + clusterversion.DevOffset, + Minor: 2, + Internal: 50, + }, + err: true, + }, + { + name: "dev build with dev data file", + version: roachpb.Version{ + Major: 22 + clusterversion.DevOffset, + Minor: 2, + Internal: 50, + }, + minSupported: roachpb.Version{ + Major: 22 + clusterversion.DevOffset, + Minor: 2, + }, + current: roachpb.Version{ + Major: 22 + clusterversion.DevOffset, + Minor: 2, + Internal: 50, + }, + }, + { + name: "upgrade from v1", + version: roachpb.Version{ + Major: 22, + Minor: 2, + Internal: 1, + }, + minSupported: roachpb.Version{ + Major: 22, + Minor: 2, + }, + current: roachpb.Version{ + Major: 22, + Minor: 2, + Internal: 50, + }, + }, + } { + t.Run(d.name, func(t *testing.T) { + err := checkVersionAllowedImpl(d.version, d.minSupported, d.current) + if d.err { + require.Error(t, err, "version check must fail") + } else { + require.NoError(t, err, "version check must pass") + } + }) + } +} + +func TestPlanVersionMatching(t *testing.T) { + for _, d := range []struct { + name string + plan, cluster roachpb.Version + ignoreInternal bool + err string + }{ + { + name: "matching", + plan: roachpb.Version{ + Major: 23, + Minor: 1, + Internal: 2, + }, + cluster: roachpb.Version{ + Major: 23, + Minor: 1, + Internal: 2, + }, + }, + { + name: "non-matching", + plan: roachpb.Version{ + Major: 23, + Minor: 1, + }, + cluster: roachpb.Version{ + Major: 23, + Minor: 2, + }, + err: "doesn't match cluster active version", + }, + { + name: "non-matching patch", + plan: roachpb.Version{ + Major: 23, + Minor: 1, + Patch: 4, + }, + cluster: roachpb.Version{ + Major: 23, + Minor: 1, + }, + err: "recovery plan must not use patch versions", + }, + { + name: "non-matching internal", + plan: roachpb.Version{ + Major: 23, + Minor: 1, + Internal: 2, + }, + cluster: roachpb.Version{ + Major: 23, + Minor: 1, + Internal: 4, + }, + err: "doesn't match cluster active version", + }, + { + name: "ignore internal", + plan: roachpb.Version{ + Major: 23, + Minor: 1, + Internal: 2, + }, + cluster: roachpb.Version{ + Major: 23, + Minor: 1, + Internal: 4, + }, + ignoreInternal: true, + }, + { + name: "ignore with wrong version", + plan: roachpb.Version{ + Major: 23, + Minor: 1, + Internal: 2, + }, + cluster: roachpb.Version{ + Major: 23, + Minor: 2, + Internal: 4, + }, + ignoreInternal: true, + err: "doesn't match cluster active version", + }, + } { + t.Run(d.name, func(t *testing.T) { + if d.err != "" { + require.ErrorContains(t, checkPlanVersionMatches(d.plan, d.cluster, d.ignoreInternal), + d.err) + } else { + require.NoError(t, checkPlanVersionMatches(d.plan, d.cluster, d.ignoreInternal)) + } + }) + } +} diff --git a/pkg/server/serverpb/admin.proto b/pkg/server/serverpb/admin.proto index 9d7de893ab28..2ea6d0062c87 100644 --- a/pkg/server/serverpb/admin.proto +++ b/pkg/server/serverpb/admin.proto @@ -923,6 +923,7 @@ message RecoveryCollectReplicaInfoResponse { roachpb.RangeDescriptor range_descriptor = 1; cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ReplicaInfo replica_info = 2; RecoveryCollectReplicaRestartNodeStream node_stream_restarted = 3; + cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ClusterMetadata metadata = 4; } } @@ -940,9 +941,14 @@ message RecoveryStagePlanRequest { // If all nodes is true, then receiver should act as a coordinator and perform // a fan-out to stage plan on all nodes of the cluster. bool all_nodes = 2; - // Force plan tells receiver to ignore any plan already staged on the node if it + // ForcePlan tells receiver to ignore any plan already staged on the node if it // is present and replace it with new plan (including empty one). bool force_plan = 3; + // ForceLocalInternalVersion tells server to update internal component of plan + // version to the one of active cluster version. This option needs to be set + // if target cluster is stuck in recovery where only part of nodes were + // successfully migrated. + bool force_local_internal_version = 4; } message RecoveryStagePlanResponse {