diff --git a/cdc/capture/http_validator.go b/cdc/capture/http_validator.go new file mode 100644 index 00000000000..37614f14b7a --- /dev/null +++ b/cdc/capture/http_validator.go @@ -0,0 +1,227 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package capture + +import ( + "context" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/entry" + "github.com/pingcap/ticdc/cdc/kv" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/cdc/sink" + "github.com/pingcap/ticdc/pkg/config" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/filter" + "github.com/pingcap/ticdc/pkg/txnutil/gc" + "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/ticdc/pkg/version" + tidbkv "github.com/pingcap/tidb/kv" + "github.com/r3labs/diff" + "github.com/tikv/client-go/v2/oracle" +) + +// verifyCreateChangefeedConfig verify ChangefeedConfig for create a changefeed +func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.ChangefeedConfig, capture *Capture) (*model.ChangeFeedInfo, error) { + // verify sinkURI + if changefeedConfig.SinkURI == "" { + return nil, cerror.ErrSinkURIInvalid.GenWithStackByArgs("sink-uri is empty, can't not create a changefeed without sink-uri") + } + + // verify changefeedID + if err := model.ValidateChangefeedID(changefeedConfig.ID); err != nil { + return nil, cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedConfig.ID) + } + // check if the changefeed exists + cfStatus, err := capture.owner.StatusProvider().GetChangeFeedStatus(ctx, changefeedConfig.ID) + if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) { + return nil, err + } + if cfStatus != nil { + return nil, cerror.ErrChangeFeedAlreadyExists.GenWithStackByArgs(changefeedConfig.ID) + } + + // verify start-ts + if changefeedConfig.StartTS == 0 { + ts, logical, err := capture.pdClient.GetTS(ctx) + if err != nil { + return nil, cerror.ErrPDEtcdAPIError.GenWithStackByArgs("fail to get ts from pd client") + } + changefeedConfig.StartTS = oracle.ComposeTS(ts, logical) + } + + // Ensure the start ts is valid in the next 1 hour. + const ensureTTL = 60 * 60 + if err := gc.EnsureChangefeedStartTsSafety( + ctx, capture.pdClient, changefeedConfig.ID, ensureTTL, changefeedConfig.StartTS); err != nil { + if !cerror.ErrStartTsBeforeGC.Equal(err) { + return nil, cerror.ErrPDEtcdAPIError.Wrap(err) + } + return nil, err + } + + // verify target-ts + if changefeedConfig.TargetTS > 0 && changefeedConfig.TargetTS <= changefeedConfig.StartTS { + return nil, cerror.ErrTargetTsBeforeStartTs.GenWithStackByArgs(changefeedConfig.TargetTS, changefeedConfig.StartTS) + } + + // init replicaConfig + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.ForceReplicate = changefeedConfig.ForceReplicate + if changefeedConfig.MounterWorkerNum != 0 { + replicaConfig.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum + } + if changefeedConfig.SinkConfig != nil { + replicaConfig.Sink = changefeedConfig.SinkConfig + } + if len(changefeedConfig.IgnoreTxnStartTs) != 0 { + replicaConfig.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs + } + if len(changefeedConfig.FilterRules) != 0 { + replicaConfig.Filter.Rules = changefeedConfig.FilterRules + } + + captureInfos, err := capture.owner.StatusProvider().GetCaptures(ctx) + if err != nil { + return nil, err + } + // set sortEngine and EnableOldValue + cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos)) + if err != nil { + return nil, err + } + sortEngine := model.SortUnified + if !cdcClusterVer.ShouldEnableOldValueByDefault() { + replicaConfig.EnableOldValue = false + log.Warn("The TiCDC cluster is built from unknown branch or less than 5.0.0-rc, the old-value are disabled by default.") + if !cdcClusterVer.ShouldEnableUnifiedSorterByDefault() { + sortEngine = model.SortInMemory + } + } + + // init ChangefeedInfo + info := &model.ChangeFeedInfo{ + SinkURI: changefeedConfig.SinkURI, + Opts: make(map[string]string), + CreateTime: time.Now(), + StartTs: changefeedConfig.StartTS, + TargetTs: changefeedConfig.TargetTS, + Config: replicaConfig, + Engine: sortEngine, + State: model.StateNormal, + SyncPointEnabled: false, + SyncPointInterval: 10 * time.Minute, + CreatorVersion: version.ReleaseVersion, + } + + if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable { + ineligibleTables, _, err := verifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS) + if err != nil { + return nil, err + } + if len(ineligibleTables) != 0 { + return nil, cerror.ErrTableIneligible.GenWithStackByArgs(ineligibleTables) + } + } + + tz, err := util.GetTimezone(changefeedConfig.TimeZone) + if err != nil { + return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone)) + } + ctx = util.PutTimezoneInCtx(ctx, tz) + if err := sink.Validate(ctx, info.SinkURI, info.Config, info.Opts); err != nil { + return nil, err + } + + return info, nil +} + +// verifyUpdateChangefeedConfig verify ChangefeedConfig for update a changefeed +func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.ChangefeedConfig, oldInfo *model.ChangeFeedInfo) (*model.ChangeFeedInfo, error) { + newInfo, err := oldInfo.Clone() + if err != nil { + return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error()) + } + // verify target_ts + if changefeedConfig.TargetTS != 0 { + if changefeedConfig.TargetTS <= newInfo.StartTs { + return nil, cerror.ErrChangefeedUpdateRefused.GenWithStack("can not update target-ts:%d less than start-ts:%d", changefeedConfig.TargetTS, newInfo.StartTs) + } + newInfo.TargetTs = changefeedConfig.TargetTS + } + + // verify rules + if len(changefeedConfig.FilterRules) != 0 { + newInfo.Config.Filter.Rules = changefeedConfig.FilterRules + _, err = filter.VerifyRules(newInfo.Config) + if err != nil { + return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error()) + } + } + + if len(changefeedConfig.IgnoreTxnStartTs) != 0 { + newInfo.Config.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs + } + + if changefeedConfig.MounterWorkerNum != 0 { + newInfo.Config.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum + } + + if changefeedConfig.SinkConfig != nil { + newInfo.Config.Sink = changefeedConfig.SinkConfig + } + + // verify sink_uri + if changefeedConfig.SinkURI != "" { + newInfo.SinkURI = changefeedConfig.SinkURI + if err := sink.Validate(ctx, changefeedConfig.SinkURI, newInfo.Config, newInfo.Opts); err != nil { + return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err) + } + } + + if !diff.Changed(oldInfo, newInfo) { + return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs("changefeed config is the same with the old one, do nothing") + } + + return newInfo, nil +} + +func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) { + filter, err := filter.NewFilter(replicaConfig) + if err != nil { + return nil, nil, errors.Trace(err) + } + meta, err := kv.GetSnapshotMeta(storage, startTs) + if err != nil { + return nil, nil, errors.Trace(err) + } + snap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, false /* explicitTables */) + if err != nil { + return nil, nil, errors.Trace(err) + } + + for _, tableInfo := range snap.Tables() { + if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) { + continue + } + if !tableInfo.IsEligible(false /* forceReplicate */) { + ineligibleTables = append(ineligibleTables, tableInfo.TableName) + } else { + eligibleTables = append(eligibleTables, tableInfo.TableName) + } + } + return +} diff --git a/cdc/model/capture.go b/cdc/model/capture.go index f6594786211..e6859ba0567 100644 --- a/cdc/model/capture.go +++ b/cdc/model/capture.go @@ -43,3 +43,13 @@ func (c *CaptureInfo) Unmarshal(data []byte) error { return errors.Annotatef(cerror.WrapError(cerror.ErrUnmarshalFailed, err), "unmarshal data: %v", data) } + +// ListVersionsFromCaptureInfos returns the version list of the CaptureInfo list. +func ListVersionsFromCaptureInfos(captureInfos []*CaptureInfo) []string { + var captureVersions []string + for _, ci := range captureInfos { + captureVersions = append(captureVersions, ci.Version) + } + + return captureVersions +} diff --git a/cdc/model/capture_test.go b/cdc/model/capture_test.go index 688f0bf7726..30ccccf5d6a 100644 --- a/cdc/model/capture_test.go +++ b/cdc/model/capture_test.go @@ -38,3 +38,20 @@ func (s *captureSuite) TestMarshalUnmarshal(c *check.C) { c.Assert(err, check.IsNil) c.Assert(decodedInfo, check.DeepEquals, info) } + +func TestListVersionsFromCaptureInfos(t *testing.T) { + infos := []*CaptureInfo{ + { + ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891", + AdvertiseAddr: "127.0.0.1:8300", + Version: "dev", + }, + { + ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891", + AdvertiseAddr: "127.0.0.1:8300", + Version: "", + }, + } + + require.ElementsMatch(t, []string{"dev", ""}, ListVersionsFromCaptureInfos(infos)) +} diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 76a5eef2e19..a843e37249a 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -25,7 +25,13 @@ import ( "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/cyclic/mark" cerror "github.com/pingcap/ticdc/pkg/errors" +<<<<<<< HEAD "github.com/pingcap/tidb/store/tikv/oracle" +======= + cerrors "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/version" + "github.com/tikv/client-go/v2/oracle" +>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838)) "go.uber.org/zap" ) @@ -48,7 +54,7 @@ const ( StateError FeedState = "error" StateFailed FeedState = "failed" StateStopped FeedState = "stopped" - StateRemoved FeedState = "removed" // deprecated, will be removed in the next version + StateRemoved FeedState = "removed" StateFinished FeedState = "finished" ) @@ -208,10 +214,10 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) { return cloned, err } -// VerifyAndFix verifies changefeed info and may fillin some fields. -// If a must field is not provided, return an error. -// If some necessary filed is missing but can use a default value, fillin it. -func (info *ChangeFeedInfo) VerifyAndFix() error { +// VerifyAndComplete verifies changefeed info and may fill in some fields. +// If a required field is not provided, return an error. +// If some necessary filed is missing but can use a default value, fill in it. +func (info *ChangeFeedInfo) VerifyAndComplete() error { defaultConfig := config.GetDefaultReplicaConfig() if info.Engine == "" { info.Engine = SortUnified @@ -234,6 +240,54 @@ func (info *ChangeFeedInfo) VerifyAndFix() error { return nil } +// FixIncompatible fixes incompatible changefeed meta info. +func (info *ChangeFeedInfo) FixIncompatible() { + creatorVersionGate := version.NewCreatorVersionGate(info.CreatorVersion) + if creatorVersionGate.ChangefeedStateFromAdminJob() { + log.Info("Start fixing incompatible changefeed state", zap.Any("changefeed", info)) + info.fixState() + log.Info("Fix incompatibility changefeed state completed", zap.Any("changefeed", info)) + } +} + +// fixState attempts to fix state loss from upgrading the old owner to the new owner. +func (info *ChangeFeedInfo) fixState() { + // Notice: In the old owner we used AdminJobType field to determine if the task was paused or not, + // we need to handle this field in the new owner. + // Otherwise, we will see that the old version of the task is paused and then upgraded, + // and the task is automatically resumed after the upgrade. + state := info.State + // Upgrading from an old owner, we need to deal with cases where the state is normal, + // but actually contains errors and does not match the admin job type. + if state == StateNormal { + switch info.AdminJobType { + // This corresponds to the case of failure or error. + case AdminNone, AdminResume: + if info.Error != nil { + if cerrors.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) { + state = StateFailed + } else { + state = StateError + } + } + case AdminStop: + state = StateStopped + case AdminFinish: + state = StateFinished + case AdminRemove: + state = StateRemoved + } + } + + if state != info.State { + log.Info("handle old owner inconsistent state", + zap.String("old state", string(info.State)), + zap.String("admin job type", info.AdminJobType.String()), + zap.String("new state", string(state))) + info.State = state + } +} + // CheckErrorHistory checks error history of a changefeed // if having error record older than GC interval, set needSave to true. // if error counts reach threshold, set canInit to false. diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index fb82e75f715..4cfc772e126 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -21,7 +21,11 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" +<<<<<<< HEAD "github.com/pingcap/ticdc/pkg/util/testleak" +======= + cerrors "github.com/pingcap/ticdc/pkg/errors" +>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838)) filter "github.com/pingcap/tidb-tools/pkg/table-filter" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -165,8 +169,14 @@ func (s *configSuite) TestFillV1(c *check.C) { }) } +<<<<<<< HEAD func (s *configSuite) TestVerifyAndFix(c *check.C) { defer testleak.AfterTest(c)() +======= +func TestVerifyAndComplete(t *testing.T) { + t.Parallel() + +>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838)) info := &ChangeFeedInfo{ SinkURI: "blackhole://", Opts: map[string]string{}, @@ -178,9 +188,15 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) { }, } +<<<<<<< HEAD err := info.VerifyAndFix() c.Assert(err, check.IsNil) c.Assert(info.Engine, check.Equals, SortUnified) +======= + err := info.VerifyAndComplete() + require.Nil(t, err) + require.Equal(t, SortUnified, info.Engine) +>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838)) marshalConfig1, err := info.Config.Marshal() c.Assert(err, check.IsNil) @@ -190,8 +206,158 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) { c.Assert(marshalConfig1, check.Equals, marshalConfig2) } +<<<<<<< HEAD func (s *configSuite) TestChangeFeedInfoClone(c *check.C) { defer testleak.AfterTest(c)() +======= +func TestFixIncompatible(t *testing.T) { + // Test to fix incompatible states. + testCases := []struct { + info *ChangeFeedInfo + expectedState FeedState + }{ + { + info: &ChangeFeedInfo{ + AdminJobType: AdminStop, + State: StateNormal, + Error: nil, + CreatorVersion: "", + }, + expectedState: StateStopped, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminStop, + State: StateNormal, + Error: nil, + CreatorVersion: "4.0.14", + }, + expectedState: StateStopped, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminStop, + State: StateNormal, + Error: nil, + CreatorVersion: "5.0.5", + }, + expectedState: StateStopped, + }, + } + + for _, tc := range testCases { + tc.info.FixIncompatible() + require.Equal(t, tc.expectedState, tc.info.State) + } +} + +func TestFixState(t *testing.T) { + t.Parallel() + + testCases := []struct { + info *ChangeFeedInfo + expectedState FeedState + }{ + { + info: &ChangeFeedInfo{ + AdminJobType: AdminNone, + State: StateNormal, + Error: nil, + }, + expectedState: StateNormal, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminResume, + State: StateNormal, + Error: nil, + }, + expectedState: StateNormal, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminNone, + State: StateNormal, + Error: &RunningError{ + Code: string(cerrors.ErrGCTTLExceeded.RFCCode()), + }, + }, + expectedState: StateFailed, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminResume, + State: StateNormal, + Error: &RunningError{ + Code: string(cerrors.ErrGCTTLExceeded.RFCCode()), + }, + }, + expectedState: StateFailed, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminNone, + State: StateNormal, + Error: &RunningError{ + Code: string(cerrors.ErrClusterIDMismatch.RFCCode()), + }, + }, + expectedState: StateError, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminResume, + State: StateNormal, + Error: &RunningError{ + Code: string(cerrors.ErrClusterIDMismatch.RFCCode()), + }, + }, + expectedState: StateError, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminStop, + State: StateNormal, + Error: nil, + }, + expectedState: StateStopped, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminFinish, + State: StateNormal, + Error: nil, + }, + expectedState: StateFinished, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminRemove, + State: StateNormal, + Error: nil, + }, + expectedState: StateRemoved, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminRemove, + State: StateNormal, + Error: nil, + }, + expectedState: StateRemoved, + }, + } + + for _, tc := range testCases { + tc.info.fixState() + require.Equal(t, tc.expectedState, tc.info.State) + } +} + +func TestChangeFeedInfoClone(t *testing.T) { + t.Parallel() + +>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838)) info := &ChangeFeedInfo{ SinkURI: "blackhole://", Opts: map[string]string{}, diff --git a/cdc/model/reactor_state.go b/cdc/model/reactor_state.go index b4e4b8668a3..2ff0db155bf 100644 --- a/cdc/model/reactor_state.go +++ b/cdc/model/reactor_state.go @@ -209,7 +209,7 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er return errors.Trace(err) } if key.Tp == etcd.CDCKeyTypeChangefeedInfo { - if err := s.Info.VerifyAndFix(); err != nil { + if err := s.Info.VerifyAndComplete(); err != nil { return errors.Trace(err) } } diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 3be77b8ccbc..924705353a1 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -74,8 +74,16 @@ type Owner struct { ownerJobQueue []*ownerJob lastTickTime time.Time +<<<<<<< HEAD closed int32 +======= + closed int32 + // bootstrapped specifies whether the owner has been initialized. + // This will only be done when the owner starts the first Tick. + // NOTICE: Do not use it in a method other than tick unexpectedly, as it is not a thread-safe value. + bootstrapped bool +>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838)) newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed } @@ -97,6 +105,8 @@ func NewOwner4Test( pdClient pd.Client, ) *Owner { o := NewOwner(pdClient) + // Most tests do not need to test bootstrap. + o.bootstrapped = true o.newChangefeed = func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed { return newChangefeed4Test(id, gcManager, newDDLPuller, newSink) } @@ -109,8 +119,21 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) failpoint.Return(nil, errors.New("owner run with injected error")) }) failpoint.Inject("sleep-in-owner-tick", nil) +<<<<<<< HEAD ctx := stdCtx.(cdcContext.Context) state := rawState.(*model.GlobalReactorState) +======= + state := rawState.(*orchestrator.GlobalReactorState) + // At the first Tick, we need to do a bootstrap operation. + // Fix incompatible or incorrect meta information. + if !o.bootstrapped { + o.Bootstrap(state) + o.bootstrapped = true + return state, nil + } + + o.captures = state.Captures +>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838)) o.updateMetrics(state) if !o.clusterVersionConsistent(state.Captures) { // sleep one second to avoid printing too much log @@ -127,7 +150,11 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) return nil, errors.Trace(err) } +<<<<<<< HEAD o.handleJobs() +======= + ctx := stdCtx.(cdcContext.Context) +>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838)) for changefeedID, changefeedState := range state.Changefeeds { if changefeedState.Info == nil { o.cleanUpChangefeed(changefeedState) @@ -238,7 +265,29 @@ func (o *Owner) cleanUpChangefeed(state *model.ChangefeedReactorState) { } } +<<<<<<< HEAD func (o *Owner) updateMetrics(state *model.GlobalReactorState) { +======= +// Bootstrap checks if the state contains incompatible or incorrect information and tries to fix it. +func (o *Owner) Bootstrap(state *orchestrator.GlobalReactorState) { + log.Info("Start bootstrapping", zap.Any("state", state)) + fixChangefeedInfos(state) +} + +// fixChangefeedInfos attempts to fix incompatible or incorrect meta information in changefeed state. +func fixChangefeedInfos(state *orchestrator.GlobalReactorState) { + for _, changefeedState := range state.Changefeeds { + if changefeedState != nil { + changefeedState.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + info.FixIncompatible() + return info, true, nil + }) + } + } +} + +func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) { +>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838)) // Keep the value of prometheus expression `rate(counter)` = 1 // Please also change alert rule in ticdc.rules.yml when change the expression value. now := time.Now() diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 94dba900d15..5de20892788 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -186,6 +186,44 @@ func (s *ownerSuite) TestStopChangefeed(c *check.C) { c.Assert(state.Changefeeds, check.Not(check.HasKey), changefeedID) } +func (s *ownerSuite) TestFixChangefeedInfos(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(false) + owner, state, tester := createOwner4Test(ctx, c) + // We need to do bootstrap. + owner.bootstrapped = false + changefeedID := "test-changefeed" + // Mismatched state and admin job. + changefeedInfo := &model.ChangeFeedInfo{ + State: model.StateNormal, + AdminJobType: model.AdminStop, + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + CreatorVersion: "4.0.14", + } + changefeedStr, err := changefeedInfo.Marshal() + c.Assert(err, check.IsNil) + cdcKey := etcd.CDCKey{ + Tp: etcd.CDCKeyTypeChangefeedInfo, + ChangefeedID: changefeedID, + } + tester.MustUpdate(cdcKey.String(), []byte(changefeedStr)) + // For the first tick, we do a bootstrap, and it tries to fix the meta information. + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.bootstrapped, check.IsTrue) + c.Assert(owner.changefeeds, check.Not(check.HasKey), changefeedID) + + // Start tick normally. + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.changefeeds, check.HasKey, changefeedID) + // The meta information is fixed correctly. + c.Assert(owner.changefeeds[changefeedID].state.Info.State, check.Equals, model.StateStopped) +} + func (s *ownerSuite) TestCheckClusterVersion(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go new file mode 100644 index 00000000000..b221c4cadaa --- /dev/null +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -0,0 +1,481 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cli + +import ( + "context" + "net/url" + "strings" + "time" + + "github.com/fatih/color" + "github.com/google/uuid" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/cdc/sink" + cmdcontext "github.com/pingcap/ticdc/pkg/cmd/context" + "github.com/pingcap/ticdc/pkg/cmd/factory" + "github.com/pingcap/ticdc/pkg/cmd/util" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/cyclic" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/filter" + "github.com/pingcap/ticdc/pkg/security" + "github.com/pingcap/ticdc/pkg/txnutil/gc" + ticdcutil "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/ticdc/pkg/version" + "github.com/spf13/cobra" + "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" +) + +// forceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. +var forceEnableOldValueProtocols = []string{ + "canal", + "maxwell", +} + +// changefeedCommonOptions defines common changefeed flags. +type changefeedCommonOptions struct { + noConfirm bool + targetTs uint64 + sinkURI string + configFile string + opts []string + sortEngine string + sortDir string + cyclicReplicaID uint64 + cyclicFilterReplicaIDs []uint + cyclicSyncDDL bool + syncPointEnabled bool + syncPointInterval time.Duration +} + +// newChangefeedCommonOptions creates new changefeed common options. +func newChangefeedCommonOptions() *changefeedCommonOptions { + return &changefeedCommonOptions{} +} + +// addFlags receives a *cobra.Command reference and binds +// flags related to template printing to it. +func (o *changefeedCommonOptions) addFlags(cmd *cobra.Command) { + if o == nil { + return + } + + cmd.PersistentFlags().BoolVar(&o.noConfirm, "no-confirm", false, "Don't ask user whether to ignore ineligible table") + cmd.PersistentFlags().Uint64Var(&o.targetTs, "target-ts", 0, "Target ts of changefeed") + cmd.PersistentFlags().StringVar(&o.sinkURI, "sink-uri", "", "sink uri") + cmd.PersistentFlags().StringVar(&o.configFile, "config", "", "Path of the configuration file") + cmd.PersistentFlags().StringSliceVar(&o.opts, "opts", nil, "Extra options, in the `key=value` format") + cmd.PersistentFlags().StringVar(&o.sortEngine, "sort-engine", model.SortUnified, "sort engine used for data sort") + cmd.PersistentFlags().StringVar(&o.sortDir, "sort-dir", "", "directory used for data sort") + cmd.PersistentFlags().Uint64Var(&o.cyclicReplicaID, "cyclic-replica-id", 0, "(Experimental) Cyclic replication replica ID of changefeed") + cmd.PersistentFlags().UintSliceVar(&o.cyclicFilterReplicaIDs, "cyclic-filter-replica-ids", []uint{}, "(Experimental) Cyclic replication filter replica ID of changefeed") + cmd.PersistentFlags().BoolVar(&o.cyclicSyncDDL, "cyclic-sync-ddl", true, "(Experimental) Cyclic replication sync DDL of changefeed") + cmd.PersistentFlags().BoolVar(&o.syncPointEnabled, "sync-point", false, "(Experimental) Set and Record syncpoint in replication(default off)") + cmd.PersistentFlags().DurationVar(&o.syncPointInterval, "sync-interval", 10*time.Minute, "(Experimental) Set the interval for syncpoint in replication(default 10min)") + _ = cmd.PersistentFlags().MarkHidden("sort-dir") +} + +// strictDecodeConfig do strictDecodeFile check and only verify the rules for now. +func (o *changefeedCommonOptions) strictDecodeConfig(component string, cfg *config.ReplicaConfig) error { + err := util.StrictDecodeFile(o.configFile, component, cfg) + if err != nil { + return err + } + + _, err = filter.VerifyRules(cfg) + + return err +} + +// createChangefeedOptions defines common flags for the `cli changefeed crate` command. +type createChangefeedOptions struct { + commonChangefeedOptions *changefeedCommonOptions + + etcdClient *etcd.CDCEtcdClient + pdClient pd.Client + + pdAddr string + credential *security.Credential + + changefeedID string + disableGCSafePointCheck bool + startTs uint64 + timezone string + + cfg *config.ReplicaConfig +} + +// newCreateChangefeedOptions creates new options for the `cli changefeed create` command. +func newCreateChangefeedOptions(commonChangefeedOptions *changefeedCommonOptions) *createChangefeedOptions { + return &createChangefeedOptions{ + commonChangefeedOptions: commonChangefeedOptions, + } +} + +// addFlags receives a *cobra.Command reference and binds +// flags related to template printing to it. +func (o *createChangefeedOptions) addFlags(cmd *cobra.Command) { + if o == nil { + return + } + + o.commonChangefeedOptions.addFlags(cmd) + cmd.PersistentFlags().StringVarP(&o.changefeedID, "changefeed-id", "c", "", "Replication task (changefeed) ID") + cmd.PersistentFlags().BoolVarP(&o.disableGCSafePointCheck, "disable-gc-check", "", false, "Disable GC safe point check") + cmd.PersistentFlags().Uint64Var(&o.startTs, "start-ts", 0, "Start ts of changefeed") + cmd.PersistentFlags().StringVar(&o.timezone, "tz", "SYSTEM", "timezone used when checking sink uri (changefeed timezone is determined by cdc server)") +} + +// complete adapts from the command line args to the data and client required. +func (o *createChangefeedOptions) complete(ctx context.Context, f factory.Factory, cmd *cobra.Command) error { + etcdClient, err := f.EtcdClient() + if err != nil { + return err + } + + o.etcdClient = etcdClient + + pdClient, err := f.PdClient() + if err != nil { + return err + } + + o.pdClient = pdClient + + o.pdAddr = f.GetPdAddr() + o.credential = f.GetCredential() + + if o.startTs == 0 { + ts, logical, err := o.pdClient.GetTS(ctx) + if err != nil { + return err + } + o.startTs = oracle.ComposeTS(ts, logical) + } + + return o.completeCfg(ctx, cmd) +} + +// completeCfg complete the replica config from file and cmd flags. +func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Command) error { + _, captureInfos, err := o.etcdClient.GetCaptures(ctx) + if err != nil { + return err + } + + cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos)) + if err != nil { + return errors.Trace(err) + } + + cfg := config.GetDefaultReplicaConfig() + if len(o.commonChangefeedOptions.configFile) > 0 { + if err := o.commonChangefeedOptions.strictDecodeConfig("TiCDC changefeed", cfg); err != nil { + return err + } + } + + if !cdcClusterVer.ShouldEnableOldValueByDefault() { + cfg.EnableOldValue = false + log.Warn("The TiCDC cluster is built from an older version, disabling old value by default.", + zap.String("version", cdcClusterVer.String())) + } + + if !cfg.EnableOldValue { + sinkURIParsed, err := url.Parse(o.commonChangefeedOptions.sinkURI) + if err != nil { + return cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + + protocol := sinkURIParsed.Query().Get("protocol") + if protocol != "" { + cfg.Sink.Protocol = protocol + } + for _, fp := range forceEnableOldValueProtocols { + if cfg.Sink.Protocol == fp { + log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", cfg.Sink.Protocol)) + cfg.EnableOldValue = true + break + } + } + + if cfg.ForceReplicate { + log.Error("if use force replicate, old value feature must be enabled") + return cerror.ErrOldValueNotEnabled.GenWithStackByArgs() + } + } + + for _, rules := range cfg.Sink.DispatchRules { + switch strings.ToLower(rules.Dispatcher) { + case "rowid", "index-value": + if cfg.EnableOldValue { + cmd.Printf("[WARN] This index-value distribution mode "+ + "does not guarantee row-level orderliness when "+ + "switching on the old value, so please use caution! dispatch-rules: %#v", rules) + } + } + } + + if o.commonChangefeedOptions.sortEngine == model.SortUnified && !cdcClusterVer.ShouldEnableUnifiedSorterByDefault() { + o.commonChangefeedOptions.sortEngine = model.SortInMemory + log.Warn("The TiCDC cluster is built from an older version, disabling Unified Sorter by default", + zap.String("version", cdcClusterVer.String())) + } + + if o.disableGCSafePointCheck { + cfg.CheckGCSafePoint = false + } + + if o.commonChangefeedOptions.cyclicReplicaID != 0 || len(o.commonChangefeedOptions.cyclicFilterReplicaIDs) != 0 { + if !(o.commonChangefeedOptions.cyclicReplicaID != 0 && len(o.commonChangefeedOptions.cyclicFilterReplicaIDs) != 0) { + return errors.New("invalid cyclic config, please make sure using " + + "nonzero replica ID and specify filter replica IDs") + } + + filter := make([]uint64, 0, len(o.commonChangefeedOptions.cyclicFilterReplicaIDs)) + for _, id := range o.commonChangefeedOptions.cyclicFilterReplicaIDs { + filter = append(filter, uint64(id)) + } + + cfg.Cyclic = &config.CyclicConfig{ + Enable: true, + ReplicaID: o.commonChangefeedOptions.cyclicReplicaID, + FilterReplicaID: filter, + SyncDDL: o.commonChangefeedOptions.cyclicSyncDDL, + // TODO(neil) enable ID bucket. + } + } + // Complete cfg. + o.cfg = cfg + + return nil +} + +// validate checks that the provided attach options are specified. +func (o *createChangefeedOptions) validate(ctx context.Context, cmd *cobra.Command) error { + if o.commonChangefeedOptions.sinkURI == "" { + return errors.New("Creating changefeed without a sink-uri") + } + + err := o.cfg.Validate() + if err != nil { + return err + } + + if err := o.validateStartTs(ctx); err != nil { + return err + } + + if err := o.validateTargetTs(); err != nil { + return err + } + + // user is not allowed to set sort-dir at changefeed level + if o.commonChangefeedOptions.sortDir != "" { + cmd.Printf(color.HiYellowString("[WARN] --sort-dir is deprecated in changefeed settings. " + + "Please use `cdc server --data-dir` to start the cdc server if possible, sort-dir will be set automatically. " + + "The --sort-dir here will be no-op\n")) + return errors.New("Creating changefeed with `--sort-dir`, it's invalid") + } + + switch o.commonChangefeedOptions.sortEngine { + case model.SortUnified, model.SortInMemory: + case model.SortInFile: + // obsolete. But we keep silent here. We create a Unified Sorter when the owner/processor sees this option + // for backward-compatibility. + default: + return errors.Errorf("Creating changefeed with an invalid sort engine(%s), "+ + "`%s` and `%s` are the only valid options.", o.commonChangefeedOptions.sortEngine, model.SortUnified, model.SortInMemory) + } + + return nil +} + +// getInfo constructs the information for the changefeed. +func (o *createChangefeedOptions) getInfo(cmd *cobra.Command) *model.ChangeFeedInfo { + info := &model.ChangeFeedInfo{ + SinkURI: o.commonChangefeedOptions.sinkURI, + Opts: make(map[string]string), + CreateTime: time.Now(), + StartTs: o.startTs, + TargetTs: o.commonChangefeedOptions.targetTs, + Config: o.cfg, + Engine: o.commonChangefeedOptions.sortEngine, + State: model.StateNormal, + SyncPointEnabled: o.commonChangefeedOptions.syncPointEnabled, + SyncPointInterval: o.commonChangefeedOptions.syncPointInterval, + CreatorVersion: version.ReleaseVersion, + } + + if info.Engine == model.SortInFile { + cmd.Printf("[WARN] file sorter is deprecated. " + + "make sure that you DO NOT use it in production. " + + "Adjust \"sort-engine\" to make use of the right sorter.\n") + } + + for _, opt := range o.commonChangefeedOptions.opts { + s := strings.SplitN(opt, "=", 2) + if len(s) <= 0 { + cmd.Printf("omit opt: %s", opt) + continue + } + + var key string + var value string + + key = s[0] + if len(s) > 1 { + value = s[1] + } + info.Opts[key] = value + } + + return info +} + +// validateStartTs checks if startTs is a valid value. +func (o *createChangefeedOptions) validateStartTs(ctx context.Context) error { + if o.disableGCSafePointCheck { + return nil + } + // Ensure the start ts is validate in the next 1 hour. + const ensureTTL = 60 * 60. + return gc.EnsureChangefeedStartTsSafety( + ctx, o.pdClient, o.changefeedID, ensureTTL, o.startTs) +} + +// validateTargetTs checks if targetTs is a valid value. +func (o *createChangefeedOptions) validateTargetTs() error { + if o.commonChangefeedOptions.targetTs > 0 && o.commonChangefeedOptions.targetTs <= o.startTs { + return errors.Errorf("target-ts %d must be larger than start-ts: %d", o.commonChangefeedOptions.targetTs, o.startTs) + } + return nil +} + +// validateSink will create a sink and verify that the configuration is correct. +func (o *createChangefeedOptions) validateSink( + ctx context.Context, cfg *config.ReplicaConfig, opts map[string]string, +) error { + return sink.Validate(ctx, o.commonChangefeedOptions.sinkURI, cfg, opts) +} + +// run the `cli changefeed create` command. +func (o *createChangefeedOptions) run(ctx context.Context, cmd *cobra.Command) error { + id := o.changefeedID + if id == "" { + id = uuid.New().String() + } + if err := model.ValidateChangefeedID(id); err != nil { + return err + } + + if !o.commonChangefeedOptions.noConfirm { + currentPhysical, _, err := o.pdClient.GetTS(ctx) + if err != nil { + return err + } + + if err := confirmLargeDataGap(cmd, currentPhysical, o.startTs); err != nil { + return err + } + } + + ineligibleTables, eligibleTables, err := getTables(o.pdAddr, o.credential, o.cfg, o.startTs) + if err != nil { + return err + } + + if len(ineligibleTables) != 0 { + if o.cfg.ForceReplicate { + cmd.Printf("[WARN] force to replicate some ineligible tables, %#v\n", ineligibleTables) + } else { + cmd.Printf("[WARN] some tables are not eligible to replicate, %#v\n", ineligibleTables) + if !o.commonChangefeedOptions.noConfirm { + if err := confirmIgnoreIneligibleTables(cmd); err != nil { + return err + } + } + } + } + + if o.cfg.Cyclic.IsEnabled() && !cyclic.IsTablesPaired(eligibleTables) { + return errors.New("normal tables and mark tables are not paired, " + + "please run `cdc cli changefeed cyclic create-marktables`") + } + + info := o.getInfo(cmd) + + tz, err := ticdcutil.GetTimezone(o.timezone) + if err != nil { + return errors.Annotate(err, "can not load timezone, Please specify the time zone through environment variable `TZ` or command line parameters `--tz`") + } + + ctx = ticdcutil.PutTimezoneInCtx(ctx, tz) + err = o.validateSink(ctx, info.Config, info.Opts) + if err != nil { + return err + } + + infoStr, err := info.Marshal() + if err != nil { + return err + } + + err = o.etcdClient.CreateChangefeedInfo(ctx, info, id) + if err != nil { + return err + } + + cmd.Printf("Create changefeed successfully!\nID: %s\nInfo: %s\n", id, infoStr) + + return nil +} + +// newCmdCreateChangefeed creates the `cli changefeed create` command. +func newCmdCreateChangefeed(f factory.Factory) *cobra.Command { + commonChangefeedOptions := newChangefeedCommonOptions() + + o := newCreateChangefeedOptions(commonChangefeedOptions) + + command := &cobra.Command{ + Use: "create", + Short: "Create a new replication task (changefeed)", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmdcontext.GetDefaultContext() + + err := o.complete(ctx, f, cmd) + if err != nil { + return err + } + + err = o.validate(ctx, cmd) + if err != nil { + return err + } + + return o.run(ctx, cmd) + }, + } + + o.addFlags(command) + + return command +} diff --git a/pkg/cmd/util/helper.go b/pkg/cmd/util/helper.go new file mode 100644 index 00000000000..c10e13ef6d5 --- /dev/null +++ b/pkg/cmd/util/helper.go @@ -0,0 +1,195 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "encoding/json" + "net/url" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/BurntSushi/toml" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + cmdconetxt "github.com/pingcap/ticdc/pkg/cmd/context" + "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/logutil" + "github.com/pingcap/ticdc/pkg/version" + "github.com/spf13/cobra" + "go.uber.org/zap" + "golang.org/x/net/http/httpproxy" +) + +// Endpoint schemes. +const ( + HTTP = "http" + HTTPS = "https" +) + +// InitCmd initializes the logger, the default context and returns its cancel function. +func InitCmd(cmd *cobra.Command, logCfg *logutil.Config) context.CancelFunc { + // Init log. + err := logutil.InitLogger(logCfg) + if err != nil { + cmd.Printf("init logger error %v\n", errors.ErrorStack(err)) + os.Exit(1) + } + log.Info("init log", zap.String("file", logCfg.File), zap.String("level", logCfg.Level)) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + sig := <-sc + log.Info("got signal to exit", zap.Stringer("signal", sig)) + cancel() + }() + + cmdconetxt.SetDefaultContext(ctx) + + return cancel +} + +// LogHTTPProxies logs HTTP proxy relative environment variables. +func LogHTTPProxies() { + fields := findProxyFields() + if len(fields) > 0 { + log.Info("using proxy config", fields...) + } +} + +func findProxyFields() []zap.Field { + proxyCfg := httpproxy.FromEnvironment() + fields := make([]zap.Field, 0, 3) + if proxyCfg.HTTPProxy != "" { + fields = append(fields, zap.String("http_proxy", proxyCfg.HTTPProxy)) + } + if proxyCfg.HTTPSProxy != "" { + fields = append(fields, zap.String("https_proxy", proxyCfg.HTTPSProxy)) + } + if proxyCfg.NoProxy != "" { + fields = append(fields, zap.String("no_proxy", proxyCfg.NoProxy)) + } + return fields +} + +// StrictDecodeFile decodes the toml file strictly. If any item in confFile file is not mapped +// into the Config struct, issue an error and stop the server from starting. +func StrictDecodeFile(path, component string, cfg interface{}, ignoreCheckItems ...string) error { + metaData, err := toml.DecodeFile(path, cfg) + if err != nil { + return errors.Trace(err) + } + + // check if item is a ignoreCheckItem + hasIgnoreItem := func(item []string) bool { + for _, ignoreCheckItem := range ignoreCheckItems { + if item[0] == ignoreCheckItem { + return true + } + } + return false + } + + if undecoded := metaData.Undecoded(); len(undecoded) > 0 { + var b strings.Builder + hasUnknownConfigSize := 0 + for _, item := range undecoded { + if hasIgnoreItem(item) { + continue + } + + if hasUnknownConfigSize > 0 { + b.WriteString(", ") + } + b.WriteString(item.String()) + hasUnknownConfigSize++ + } + if hasUnknownConfigSize > 0 { + err = errors.Errorf("component %s's config file %s contained unknown configuration options: %s", + component, path, b.String()) + } + } + return errors.Trace(err) +} + +// VerifyPdEndpoint verifies whether the pd endpoint is a valid http or https URL. +// The certificate is required when using https. +func VerifyPdEndpoint(pdEndpoint string, useTLS bool) error { + u, err := url.Parse(pdEndpoint) + if err != nil { + return errors.Annotate(err, "parse PD endpoint") + } + if (u.Scheme != HTTP && u.Scheme != HTTPS) || u.Host == "" { + return errors.New("PD endpoint should be a valid http or https URL") + } + + if useTLS { + if u.Scheme == HTTP { + return errors.New("PD endpoint scheme should be https") + } + } else { + if u.Scheme == HTTPS { + return errors.New("PD endpoint scheme is https, please provide certificate") + } + } + return nil +} + +// JSONPrint will output the data in JSON format. +func JSONPrint(cmd *cobra.Command, v interface{}) error { + data, err := json.MarshalIndent(v, "", " ") + if err != nil { + return err + } + cmd.Printf("%s\n", data) + return nil +} + +// VerifyAndGetTiCDCClusterVersion verifies and gets the version of ticdc. +// If it is an incompatible version, an error is returned. +func VerifyAndGetTiCDCClusterVersion( + ctx context.Context, cdcEtcdCli *etcd.CDCEtcdClient, +) (version.TiCDCClusterVersion, error) { + _, captureInfos, err := cdcEtcdCli.GetCaptures(ctx) + if err != nil { + return version.TiCDCClusterVersion{}, err + } + + cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos)) + if err != nil { + return version.TiCDCClusterVersion{}, err + } + + // Check TiCDC cluster version. + isUnknownVersion, err := version.CheckTiCDCClusterVersion(cdcClusterVer) + if err != nil { + return version.TiCDCClusterVersion{}, err + } + + if isUnknownVersion { + return version.TiCDCClusterVersion{}, errors.NewNoStackError("TiCDC cluster is unknown, please start TiCDC cluster") + } + + return cdcClusterVer, nil +} diff --git a/pkg/version/check.go b/pkg/version/check.go index e4249c816a4..affb71752b0 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -180,16 +180,16 @@ func (v *TiCDCClusterVersion) ShouldEnableUnifiedSorterByDefault() bool { var TiCDCClusterVersionUnknown = TiCDCClusterVersion{isUnknown: true} // GetTiCDCClusterVersion returns the version of ticdc cluster -func GetTiCDCClusterVersion(captureInfos []*model.CaptureInfo) (TiCDCClusterVersion, error) { - if len(captureInfos) == 0 { +func GetTiCDCClusterVersion(captureVersion []string) (TiCDCClusterVersion, error) { + if len(captureVersion) == 0 { return TiCDCClusterVersionUnknown, nil } var minVer *semver.Version - for _, captureInfo := range captureInfos { + for _, versionStr := range captureVersion { var ver *semver.Version var err error - if captureInfo.Version != "" { - ver, err = semver.NewVersion(removeVAndHash(captureInfo.Version)) + if versionStr != "" { + ver, err = semver.NewVersion(removeVAndHash(versionStr)) } else { ver = MinTiCDCVersion } diff --git a/pkg/version/check_test.go b/pkg/version/check_test.go index 2d14b11c23f..dda8f0af595 100644 --- a/pkg/version/check_test.go +++ b/pkg/version/check_test.go @@ -24,8 +24,13 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" +<<<<<<< HEAD "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/util/testleak" +======= + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838)) pd "github.com/tikv/pd/client" "github.com/tikv/pd/pkg/tempurl" ) @@ -173,58 +178,64 @@ func (s *checkSuite) TestReleaseSemver(c *check.C) { func (s *checkSuite) TestGetTiCDCClusterVersion(c *check.C) { defer testleak.AfterTest(c)() testCases := []struct { - captureInfos []*model.CaptureInfo - expected TiCDCClusterVersion + captureVersions []string + expected TiCDCClusterVersion }{ { - captureInfos: []*model.CaptureInfo{}, - expected: TiCDCClusterVersionUnknown, + captureVersions: []string{}, + expected: TiCDCClusterVersionUnknown, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: ""}, - {ID: "capture2", Version: ""}, - {ID: "capture3", Version: ""}, + captureVersions: []string{ + "", + "", + "", }, expected: TiCDCClusterVersion{MinTiCDCVersion, false}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "5.0.1"}, - {ID: "capture2", Version: "4.0.7"}, - {ID: "capture3", Version: "5.0.0-rc"}, + captureVersions: []string{ + "5.0.1", + "4.0.7", + "5.0.0-rc", }, expected: TiCDCClusterVersion{semver.New("4.0.7"), false}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "5.0.0-rc"}, + captureVersions: []string{ + "5.0.0-rc", }, expected: TiCDCClusterVersion{semver.New("5.0.0-rc"), false}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "5.0.0"}, + captureVersions: []string{ + "5.0.0", }, expected: TiCDCClusterVersion{semver.New("5.0.0"), false}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "4.1.0"}, + captureVersions: []string{ + "4.1.0", }, expected: TiCDCClusterVersion{semver.New("4.1.0"), false}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "4.0.10"}, + captureVersions: []string{ + "4.0.10", }, expected: TiCDCClusterVersion{semver.New("4.0.10"), false}, }, } for _, tc := range testCases { +<<<<<<< HEAD ver, err := GetTiCDCClusterVersion(tc.captureInfos) c.Assert(err, check.IsNil) c.Assert(ver, check.DeepEquals, tc.expected) +======= + ver, err := GetTiCDCClusterVersion(tc.captureVersions) + require.Nil(t, err) + require.Equal(t, ver, tc.expected) +>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838)) } } diff --git a/pkg/version/creator_version_gate.go b/pkg/version/creator_version_gate.go new file mode 100644 index 00000000000..a1e235fe46c --- /dev/null +++ b/pkg/version/creator_version_gate.go @@ -0,0 +1,61 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "github.com/coreos/go-semver/semver" +) + +// CreatorVersionGate determines the introduced version and compatibility +// of some features based on the creator's version value. +type CreatorVersionGate struct { + version string +} + +// changefeedStateFromAdminJobVersions specifies the version before +// which we use the admin job type to control the state of the changefeed. +var changefeedStateFromAdminJobVersions = []semver.Version{ + // Introduced in https://github.com/pingcap/ticdc/pull/3014. + *semver.New("4.0.14"), + // Introduced in https://github.com/pingcap/ticdc/pull/2946. + *semver.New("5.0.5"), +} + +// NewCreatorVersionGate creates the creator version gate. +func NewCreatorVersionGate(version string) *CreatorVersionGate { + return &CreatorVersionGate{ + version: version, + } +} + +// ChangefeedStateFromAdminJob determines if admin job is the state +// of changefeed based on the version of the creator. +func (f *CreatorVersionGate) ChangefeedStateFromAdminJob() bool { + // Introduced in https://github.com/pingcap/ticdc/pull/1341. + // The changefeed before it was introduced was using the old owner. + if f.version == "" { + return true + } + + creatorVersion := semver.New(removeVAndHash(f.version)) + for _, version := range changefeedStateFromAdminJobVersions { + // NOTICE: To compare against the same major version. + if creatorVersion.Major == version.Major && + (creatorVersion.Equal(version) || creatorVersion.LessThan(version)) { + return true + } + } + + return false +} diff --git a/pkg/version/creator_version_gate_test.go b/pkg/version/creator_version_gate_test.go new file mode 100644 index 00000000000..5c5e0fa6041 --- /dev/null +++ b/pkg/version/creator_version_gate_test.go @@ -0,0 +1,71 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestChangefeedStateFromAdminJob(t *testing.T) { + t.Parallel() + + testCases := []struct { + creatorVersion string + expected bool + }{ + { + creatorVersion: "", + expected: true, + }, + { + creatorVersion: "4.0.12", + expected: true, + }, + { + creatorVersion: "4.0.14", + expected: true, + }, + { + creatorVersion: "4.0.15", + expected: false, + }, + { + creatorVersion: "5.0.0", + expected: true, + }, + { + creatorVersion: "5.0.1", + expected: true, + }, + { + creatorVersion: "5.1.0", + expected: false, + }, + { + creatorVersion: "5.2.0", + expected: false, + }, + { + creatorVersion: "5.3.0", + expected: false, + }, + } + + for _, tc := range testCases { + creatorVersionGate := CreatorVersionGate{version: tc.creatorVersion} + require.Equal(t, tc.expected, creatorVersionGate.ChangefeedStateFromAdminJob()) + } +}