From ca52c66469c9b9b663b499ca0aa9405fde97fc41 Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Wed, 7 Jul 2021 03:31:29 -0500 Subject: [PATCH] changefeed: Fix changefeed fails immediately after being created successfully. (#2115) --- cdc/owner/changefeed.go | 8 ++------ cdc/owner/changefeed_test.go | 4 ++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 64ea4c40555..8a00a968368 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -183,11 +183,9 @@ LOOP: failpoint.Inject("NewChangefeedNoRetryError", func() { failpoint.Return(cerror.ErrStartTsBeforeGC.GenWithStackByArgs(checkpointTs-300, checkpointTs)) }) - failpoint.Inject("NewChangefeedRetryError", func() { failpoint.Return(errors.New("failpoint injected retriable error")) }) - if c.state.Info.Config.CheckGCSafePoint { err := util.CheckSafetyOfStartTs(ctx, ctx.GlobalVars().PDClient, c.state.ID, checkpointTs) if err != nil { @@ -262,10 +260,8 @@ func (c *changefeed) preflightCheck(captures map[model.CaptureID]*model.CaptureI if status == nil { status = &model.ChangeFeedStatus{ // the changefeed status is nil when the changefeed is just created. - // the txn in start ts is not replicated at that time, - // so the checkpoint ts and resolved ts should less than start ts. - ResolvedTs: c.state.Info.StartTs - 1, - CheckpointTs: c.state.Info.StartTs - 1, + ResolvedTs: c.state.Info.StartTs, + CheckpointTs: c.state.Info.StartTs, AdminJobType: model.AdminNone, } return status, true, nil diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 02819ab20c0..1707e39ebac 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -166,7 +166,7 @@ func (s *changefeedSuite) TestInitialize(c *check.C) { // initialize cf.Tick(ctx, state, captures) tester.MustApplyPatches() - c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs-1) + c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs) } func (s *changefeedSuite) TestHandleError(c *check.C) { @@ -186,7 +186,7 @@ func (s *changefeedSuite) TestHandleError(c *check.C) { // handle error cf.Tick(ctx, state, captures) tester.MustApplyPatches() - c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs-1) + c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs) c.Assert(state.Info.Error.Message, check.Equals, "fake error") }