Skip to content

Commit

Permalink
api(ticdc): only update upstreamInfo that has changed (pingcap#10422) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and CharlesCheung96 committed Feb 26, 2024
1 parent b562c80 commit 90759c5
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 137 deletions.
4 changes: 1 addition & 3 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,7 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
CAPath: up.SecurityConfig.CAPath,
CertAllowedCN: up.SecurityConfig.CertAllowedCN,
}
err = h.capture.GetEtcdClient().CreateChangefeedInfo(
ctx, upstreamInfo,
info, model.DefaultChangeFeedID(changefeedConfig.ID))
err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx, upstreamInfo, info)
if err != nil {
_ = c.Error(err)
return
Expand Down
7 changes: 2 additions & 5 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
return
}

err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx,
upstreamInfo,
info,
model.DefaultChangeFeedID(info.ID))
err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx, upstreamInfo, info)
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(err)
Expand Down Expand Up @@ -453,7 +450,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
zap.Any("upstreamInfo", newUpInfo))

err = h.capture.GetEtcdClient().
UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo, changefeedID)
UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo)
if err != nil {
_ = c.Error(errors.Trace(err))
return
Expand Down
6 changes: 3 additions & 3 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func TestUpdateChangefeed(t *testing.T) {
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil).
Times(1)
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(cerrors.ErrEtcdAPIError).Times(1)

w = httptest.NewRecorder()
Expand All @@ -484,7 +484,7 @@ func TestUpdateChangefeed(t *testing.T) {
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)

w = httptest.NewRecorder()
Expand All @@ -501,7 +501,7 @@ func TestUpdateChangefeed(t *testing.T) {
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)

w = httptest.NewRecorder()
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,11 @@ error = '''
meta operation fail
'''

["DFLOW:ErrMetaOpFailed"]
error = '''
unexpected meta operation failure: %s
'''

["DFLOW:ErrMetaOptionConflict"]
error = '''
WithRange/WithPrefix/WithFromKey, more than one option are used
Expand Down
5 changes: 5 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,4 +880,9 @@ var (
"add `ignore-txn-start-ts=[%d]` to the changefeed in the filter configuration.",
errors.RFCCodeText("CDC:ErrHandleDDLFailed"),
)

ErrMetaOpFailed = errors.Normalize(
"unexpected meta operation failure: %s",
errors.RFCCodeText("DFLOW:ErrMetaOpFailed"),
)
)
11 changes: 10 additions & 1 deletion pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (m mockWatcher) RequestProgress(ctx context.Context) error {
}

func TestRetry(t *testing.T) {
t.Parallel()

originValue := maxTries
// to speedup the test
maxTries = 2
Expand Down Expand Up @@ -116,6 +118,8 @@ func TestRetry(t *testing.T) {
}

func TestDelegateLease(t *testing.T) {
t.Parallel()

ctx := context.Background()
url, server, err := SetupEmbedEtcd(t.TempDir())
defer func() {
Expand Down Expand Up @@ -148,6 +152,8 @@ func TestDelegateLease(t *testing.T) {

// test no data lost when WatchCh blocked
func TestWatchChBlocked(t *testing.T) {
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
Expand Down Expand Up @@ -209,6 +215,8 @@ func TestWatchChBlocked(t *testing.T) {

// test no data lost when OutCh blocked
func TestOutChBlocked(t *testing.T) {
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
Expand Down Expand Up @@ -260,8 +268,9 @@ func TestOutChBlocked(t *testing.T) {
}

func TestRevisionNotFallBack(t *testing.T) {
cli := clientv3.NewCtxClient(context.TODO())
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
rev := int64(0)
Expand Down
Loading

0 comments on commit 90759c5

Please sign in to comment.