Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeed: Fix changefeed fails immediately after being created successfully. (#2115) #2238

Merged
8 changes: 2 additions & 6 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,9 @@ LOOP:
failpoint.Inject("NewChangefeedNoRetryError", func() {
failpoint.Return(cerror.ErrStartTsBeforeGC.GenWithStackByArgs(checkpointTs-300, checkpointTs))
})

failpoint.Inject("NewChangefeedRetryError", func() {
failpoint.Return(errors.New("failpoint injected retriable error"))
})

if c.state.Info.Config.CheckGCSafePoint {
err := util.CheckSafetyOfStartTs(ctx, ctx.GlobalVars().PDClient, c.state.ID, checkpointTs)
if err != nil {
Expand Down Expand Up @@ -262,10 +260,8 @@ func (c *changefeed) preflightCheck(captures map[model.CaptureID]*model.CaptureI
if status == nil {
status = &model.ChangeFeedStatus{
// the changefeed status is nil when the changefeed is just created.
// the txn in start ts is not replicated at that time,
// so the checkpoint ts and resolved ts should less than start ts.
ResolvedTs: c.state.Info.StartTs - 1,
CheckpointTs: c.state.Info.StartTs - 1,
ResolvedTs: c.state.Info.StartTs,
CheckpointTs: c.state.Info.StartTs,
AdminJobType: model.AdminNone,
}
return status, true, nil
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *changefeedSuite) TestInitialize(c *check.C) {
// initialize
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs-1)
c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs)
}

func (s *changefeedSuite) TestHandleError(c *check.C) {
Expand All @@ -186,7 +186,7 @@ func (s *changefeedSuite) TestHandleError(c *check.C) {
// handle error
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs-1)
c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs)
c.Assert(state.Info.Error.Message, check.Equals, "fake error")
}

Expand Down