From 9fe79860d0c6e8958869170d52bb19c82ea4826b Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Wed, 6 Jan 2021 17:51:57 +0800 Subject: [PATCH 01/30] check update safepoint error --- cdc/owner.go | 11 ++++++++--- pkg/errors/errors.go | 1 + 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index 1914c72e5b7..971bd3cde32 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -696,12 +696,17 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { } } if time.Since(o.gcSafepointLastUpdate) > GCSafepointUpdateInterval { - _, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs) + actual, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs) if err != nil { log.Warn("failed to update service safe point", zap.Error(err)) - } else { - o.gcSafepointLastUpdate = time.Now() } + + if actual > minCheckpointTs { + // UpdateServiceGCSafePoint has failed. + log.Warn("updating service safe point failed", zap.Uint64("checkpoint-ts", minCheckpointTs), zap.Uint64("min-safepoint", actual)) + return cerror.ErrUpdateSafepointFailed.GenWithStackByArgs(actual) + } + o.gcSafepointLastUpdate = time.Now() } return nil } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index f8b76664f85..402ecd594f3 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -175,6 +175,7 @@ var ( ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType")) ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch")) ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted")) + ErrUpdateSafepointFailed = errors.Normalize("updating service safepoint failed. current safepoint is %d, please remove failed changefeed(s)", errors.RFCCodeText("CDC:ErrUpdateSafepointFailed")) // EtcdWorker related errors. Internal use only. // ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort. From adc746deba36f5c5bf8bce48f7e6be01ec594203 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 8 Jan 2021 14:46:16 +0800 Subject: [PATCH 02/30] integration tests --- cdc/owner.go | 5 +++++ pkg/errors/errors.go | 2 +- tests/gc_safepoint/run.sh | 22 ++++++++++++++++++++++ 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/cdc/owner.go b/cdc/owner.go index 971bd3cde32..70d60ac13dd 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -707,6 +707,11 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { return cerror.ErrUpdateSafepointFailed.GenWithStackByArgs(actual) } o.gcSafepointLastUpdate = time.Now() + + failpoint.Inject("ClearGCSafepoint", func() { + // cause an error for integration testing + _, _ = o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, -1, 0) + }) } return nil } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 402ecd594f3..466dda2d71d 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -175,7 +175,7 @@ var ( ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType")) ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch")) ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted")) - ErrUpdateSafepointFailed = errors.Normalize("updating service safepoint failed. current safepoint is %d, please remove failed changefeed(s)", errors.RFCCodeText("CDC:ErrUpdateSafepointFailed")) + ErrUpdateSafepointFailed = errors.Normalize("updating service safepoint failed. current safepoint is %d, please remove all changefeed(s) whose checkpoints are behind the current safepoint", errors.RFCCodeText("CDC:ErrUpdateSafepointFailed")) // EtcdWorker related errors. Internal use only. // ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort. diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh index bf77cb26fec..79118b514dd 100755 --- a/tests/gc_safepoint/run.sh +++ b/tests/gc_safepoint/run.sh @@ -62,6 +62,23 @@ function check_changefeed_state() { fi } +function check_changefeed_mark_failed() { + endpoints=$1 + changefeedid=$2 + error_msg=$3 + info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s) + state=$(echo $info|jq -r '.state') + if [[ ! "$state" == "failed" ]]; then + echo "changefeed state $state does not equal to failed" + exit 1 + fi + message=$(echo $info|jq -r '.error.message') + if [[ ! "$message" =~ "$error_msg" ]]; then + echo "error message '$message' is not as expected '$error_msg'" + exit 1 + fi +} + export -f get_safepoint export -f check_safepoint_forward export -f check_safepoint_cleared @@ -121,6 +138,11 @@ function run() { ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id2 "removed" ensure $MAX_RETRIES check_safepoint_cleared $pd_addr $pd_cluster_id + # test error handling + changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') + pd-ctl service-gc-safepoint delete ticdc --pd=$pd_addr + ensure $MAX_RETRIES check_changefeed_mark_failed $pd_addr $changefeed_id "ErrUpdateSafepointFailed" + cleanup_process $CDC_BINARY } From 078178f2b2e2b0af5e41e8a0703d5f0b4e84d075 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 8 Jan 2021 15:47:24 +0800 Subject: [PATCH 03/30] fix integration test --- tests/gc_safepoint/run.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh index 79118b514dd..100f26134db 100755 --- a/tests/gc_safepoint/run.sh +++ b/tests/gc_safepoint/run.sh @@ -84,6 +84,7 @@ export -f check_safepoint_forward export -f check_safepoint_cleared export -f check_safepoint_equal export -f check_changefeed_state +export -f check_changefeed_mark_failed function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR From f9f6ed7515b02540f249fc0dbd8ad344776ae801 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 8 Jan 2021 16:59:40 +0800 Subject: [PATCH 04/30] fix integration test --- tests/gc_safepoint/run.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh index 100f26134db..0b59f171a36 100755 --- a/tests/gc_safepoint/run.sh +++ b/tests/gc_safepoint/run.sh @@ -141,6 +141,7 @@ function run() { # test error handling changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') + ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id pd-ctl service-gc-safepoint delete ticdc --pd=$pd_addr ensure $MAX_RETRIES check_changefeed_mark_failed $pd_addr $changefeed_id "ErrUpdateSafepointFailed" From 3c277db001d5cce8ac846d721302d183dac89a00 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 8 Jan 2021 17:26:20 +0800 Subject: [PATCH 05/30] fix clear gc safepoint --- tests/gc_safepoint/run.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh index 0b59f171a36..139e4dc8c06 100755 --- a/tests/gc_safepoint/run.sh +++ b/tests/gc_safepoint/run.sh @@ -143,6 +143,8 @@ function run() { changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id pd-ctl service-gc-safepoint delete ticdc --pd=$pd_addr + pd-ctl service-gc-safepoint delete ticdc-changefeed-creating --pd=$pd_addr || true + pd-ctl service-gc-safepoint delete gc_worker --pd=$pd_addr || true ensure $MAX_RETRIES check_changefeed_mark_failed $pd_addr $changefeed_id "ErrUpdateSafepointFailed" cleanup_process $CDC_BINARY From d2ee837020d0d2dea8449a7b4965735ca8e8fe3d Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 8 Jan 2021 19:48:31 +0800 Subject: [PATCH 06/30] fix integration test --- cdc/owner.go | 13 +++++----- pkg/errors/errors.go | 53 ++++++++++++++++++++------------------- tests/gc_safepoint/run.sh | 9 +++---- 3 files changed, 38 insertions(+), 37 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index 70d60ac13dd..0e20afe2743 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -699,19 +699,20 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { actual, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs) if err != nil { log.Warn("failed to update service safe point", zap.Error(err)) + return cerror.ErrUpdateServiceSafepointFailed.Wrap(err) } + failpoint.Inject("MockClearGCSafepoint", func() { + // cause an error for integration testing + failpoint.Return(cerror.ErrServiceSafepointLost.GenWithStackByArgs(0)) + }) + if actual > minCheckpointTs { // UpdateServiceGCSafePoint has failed. log.Warn("updating service safe point failed", zap.Uint64("checkpoint-ts", minCheckpointTs), zap.Uint64("min-safepoint", actual)) - return cerror.ErrUpdateSafepointFailed.GenWithStackByArgs(actual) + return cerror.ErrServiceSafepointLost.GenWithStackByArgs(actual) } o.gcSafepointLastUpdate = time.Now() - - failpoint.Inject("ClearGCSafepoint", func() { - // cause an error for integration testing - _, _ = o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, -1, 0) - }) } return nil } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 466dda2d71d..9f3453cfdbe 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -150,32 +150,33 @@ var ( ErrFileSorterInvalidData = errors.Normalize("invalid data", errors.RFCCodeText("CDC:ErrFileSorterInvalidData")) // server related errors - ErrCaptureSuicide = errors.Normalize("capture suicide", errors.RFCCodeText("CDC:ErrCaptureSuicide")) - ErrNewCaptureFailed = errors.Normalize("new capture failed", errors.RFCCodeText("CDC:ErrNewCaptureFailed")) - ErrCaptureRegister = errors.Normalize("capture register to etcd failed", errors.RFCCodeText("CDC:ErrCaptureRegister")) - ErrNewProcessorFailed = errors.Normalize("new processor failed", errors.RFCCodeText("CDC:ErrNewProcessorFailed")) - ErrProcessorUnknown = errors.Normalize("processor running unknown error", errors.RFCCodeText("CDC:ErrProcessorUnknown")) - ErrProcessorTableNotFound = errors.Normalize("table not found in processor cache", errors.RFCCodeText("CDC:ErrProcessorTableNotFound")) - ErrProcessorEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrProcessorEtcdWatch")) - ErrProcessorSortDir = errors.Normalize("sort dir error", errors.RFCCodeText("CDC:ErrProcessorSortDir")) - ErrUnknownSortEngine = errors.Normalize("unknown sort engine %s", errors.RFCCodeText("CDC:ErrUnknownSortEngine")) - ErrInvalidTaskKey = errors.Normalize("invalid task key: %s", errors.RFCCodeText("CDC:ErrInvalidTaskKey")) - ErrInvalidServerOption = errors.Normalize("invalid server option", errors.RFCCodeText("CDC:ErrInvalidServerOption")) - ErrServerNewPDClient = errors.Normalize("server creates pd client failed", errors.RFCCodeText("CDC:ErrServerNewPDClient")) - ErrServeHTTP = errors.Normalize("serve http error", errors.RFCCodeText("CDC:ErrServeHTTP")) - ErrCaptureCampaignOwner = errors.Normalize("campaign owner failed", errors.RFCCodeText("CDC:ErrCaptureCampaignOwner")) - ErrCaptureResignOwner = errors.Normalize("resign owner failed", errors.RFCCodeText("CDC:ErrCaptureResignOwner")) - ErrWaitHandleOperationTimeout = errors.Normalize("waiting processor to handle the operation finished timeout", errors.RFCCodeText("CDC:ErrWaitHandleOperationTimeout")) - ErrSupportPostOnly = errors.Normalize("this api supports POST method only", errors.RFCCodeText("CDC:ErrSupportPostOnly")) - ErrAPIInvalidParam = errors.Normalize("invalid api parameter", errors.RFCCodeText("CDC:ErrAPIInvalidParam")) - ErrInternalServerError = errors.Normalize("internal server error", errors.RFCCodeText("CDC:ErrInternalServerError")) - ErrOwnerSortDir = errors.Normalize("owner sort dir", errors.RFCCodeText("CDC:ErrOwnerSortDir")) - ErrOwnerChangefeedNotFound = errors.Normalize("changefeed %s not found in owner cache", errors.RFCCodeText("CDC:ErrOwnerChangefeedNotFound")) - ErrChangefeedAbnormalState = errors.Normalize("changefeed in abnormal state: %s, replication status: %+v", errors.RFCCodeText("CDC:ErrChangefeedAbnormalState")) - ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType")) - ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch")) - ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted")) - ErrUpdateSafepointFailed = errors.Normalize("updating service safepoint failed. current safepoint is %d, please remove all changefeed(s) whose checkpoints are behind the current safepoint", errors.RFCCodeText("CDC:ErrUpdateSafepointFailed")) + ErrCaptureSuicide = errors.Normalize("capture suicide", errors.RFCCodeText("CDC:ErrCaptureSuicide")) + ErrNewCaptureFailed = errors.Normalize("new capture failed", errors.RFCCodeText("CDC:ErrNewCaptureFailed")) + ErrCaptureRegister = errors.Normalize("capture register to etcd failed", errors.RFCCodeText("CDC:ErrCaptureRegister")) + ErrNewProcessorFailed = errors.Normalize("new processor failed", errors.RFCCodeText("CDC:ErrNewProcessorFailed")) + ErrProcessorUnknown = errors.Normalize("processor running unknown error", errors.RFCCodeText("CDC:ErrProcessorUnknown")) + ErrProcessorTableNotFound = errors.Normalize("table not found in processor cache", errors.RFCCodeText("CDC:ErrProcessorTableNotFound")) + ErrProcessorEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrProcessorEtcdWatch")) + ErrProcessorSortDir = errors.Normalize("sort dir error", errors.RFCCodeText("CDC:ErrProcessorSortDir")) + ErrUnknownSortEngine = errors.Normalize("unknown sort engine %s", errors.RFCCodeText("CDC:ErrUnknownSortEngine")) + ErrInvalidTaskKey = errors.Normalize("invalid task key: %s", errors.RFCCodeText("CDC:ErrInvalidTaskKey")) + ErrInvalidServerOption = errors.Normalize("invalid server option", errors.RFCCodeText("CDC:ErrInvalidServerOption")) + ErrServerNewPDClient = errors.Normalize("server creates pd client failed", errors.RFCCodeText("CDC:ErrServerNewPDClient")) + ErrServeHTTP = errors.Normalize("serve http error", errors.RFCCodeText("CDC:ErrServeHTTP")) + ErrCaptureCampaignOwner = errors.Normalize("campaign owner failed", errors.RFCCodeText("CDC:ErrCaptureCampaignOwner")) + ErrCaptureResignOwner = errors.Normalize("resign owner failed", errors.RFCCodeText("CDC:ErrCaptureResignOwner")) + ErrWaitHandleOperationTimeout = errors.Normalize("waiting processor to handle the operation finished timeout", errors.RFCCodeText("CDC:ErrWaitHandleOperationTimeout")) + ErrSupportPostOnly = errors.Normalize("this api supports POST method only", errors.RFCCodeText("CDC:ErrSupportPostOnly")) + ErrAPIInvalidParam = errors.Normalize("invalid api parameter", errors.RFCCodeText("CDC:ErrAPIInvalidParam")) + ErrInternalServerError = errors.Normalize("internal server error", errors.RFCCodeText("CDC:ErrInternalServerError")) + ErrOwnerSortDir = errors.Normalize("owner sort dir", errors.RFCCodeText("CDC:ErrOwnerSortDir")) + ErrOwnerChangefeedNotFound = errors.Normalize("changefeed %s not found in owner cache", errors.RFCCodeText("CDC:ErrOwnerChangefeedNotFound")) + ErrChangefeedAbnormalState = errors.Normalize("changefeed in abnormal state: %s, replication status: %+v", errors.RFCCodeText("CDC:ErrChangefeedAbnormalState")) + ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType")) + ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch")) + ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted")) + ErrServiceSafepointLost = errors.Normalize("service safepoint lost. current safepoint is %d, please remove all changefeed(s) whose checkpoints are behind the current safepoint", errors.RFCCodeText("CDC:ErrServiceSafepointLost")) + ErrUpdateServiceSafepointFailed = errors.Normalize("updating service safepoint failed", errors.RFCCodeText("CDC:ErrUpdateServiceSafepointFailed")) // EtcdWorker related errors. Internal use only. // ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort. diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh index 139e4dc8c06..2f219f9c0b6 100755 --- a/tests/gc_safepoint/run.sh +++ b/tests/gc_safepoint/run.sh @@ -138,14 +138,13 @@ function run() { cdc cli changefeed remove --changefeed-id=$changefeed_id2 --pd=$pd_addr ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id2 "removed" ensure $MAX_RETRIES check_safepoint_cleared $pd_addr $pd_cluster_id + cleanup_process $CDC_BINARY + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr \ + --failpoint 'github.com/pingcap/ticdc/cdc/MockClearGCSafepoint=return(true)' # test error handling changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') - ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id - pd-ctl service-gc-safepoint delete ticdc --pd=$pd_addr - pd-ctl service-gc-safepoint delete ticdc-changefeed-creating --pd=$pd_addr || true - pd-ctl service-gc-safepoint delete gc_worker --pd=$pd_addr || true - ensure $MAX_RETRIES check_changefeed_mark_failed $pd_addr $changefeed_id "ErrUpdateSafepointFailed" + ensure $MAX_RETRIES check_changefeed_mark_failed $pd_addr $changefeed_id "ErrServiceSafepointLost" cleanup_process $CDC_BINARY } From c4e9832de1aa68b802f3b16e9dd08a7374dcfd80 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Sat, 9 Jan 2021 00:48:12 +0800 Subject: [PATCH 07/30] fix integration test --- tests/gc_safepoint/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh index 2f219f9c0b6..d0cfc1a7b5d 100755 --- a/tests/gc_safepoint/run.sh +++ b/tests/gc_safepoint/run.sh @@ -143,8 +143,8 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr \ --failpoint 'github.com/pingcap/ticdc/cdc/MockClearGCSafepoint=return(true)' # test error handling - changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') - ensure $MAX_RETRIES check_changefeed_mark_failed $pd_addr $changefeed_id "ErrServiceSafepointLost" + changefeed_id3=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') + ensure $MAX_RETRIES check_changefeed_mark_failed ${pd_addr} ${changefeed_id3} "ErrServiceSafepointLost" cleanup_process $CDC_BINARY } From 96d4933b28ebbd510a86f0918b30be535d3b0291 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Sat, 9 Jan 2021 19:10:37 +0800 Subject: [PATCH 08/30] fix integration test --- tests/gc_safepoint/run.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh index d0cfc1a7b5d..9bde459aa69 100755 --- a/tests/gc_safepoint/run.sh +++ b/tests/gc_safepoint/run.sh @@ -67,6 +67,7 @@ function check_changefeed_mark_failed() { changefeedid=$2 error_msg=$3 info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s) + echo $info state=$(echo $info|jq -r '.state') if [[ ! "$state" == "failed" ]]; then echo "changefeed state $state does not equal to failed" From 8264f55d996b773fdf633506ec029014466d41e2 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Sat, 9 Jan 2021 22:44:31 +0800 Subject: [PATCH 09/30] remove integration test --- cdc/owner.go | 9 ++++----- tests/gc_safepoint/run.sh | 26 -------------------------- 2 files changed, 4 insertions(+), 31 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index 0e20afe2743..bf80b75a418 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -702,14 +702,13 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { return cerror.ErrUpdateServiceSafepointFailed.Wrap(err) } - failpoint.Inject("MockClearGCSafepoint", func() { - // cause an error for integration testing - failpoint.Return(cerror.ErrServiceSafepointLost.GenWithStackByArgs(0)) - }) - if actual > minCheckpointTs { // UpdateServiceGCSafePoint has failed. log.Warn("updating service safe point failed", zap.Uint64("checkpoint-ts", minCheckpointTs), zap.Uint64("min-safepoint", actual)) + + // Returning error here crashes the entire owner, but this seems necessary to prevent data corruption. + // This error should be rare. + // TODO better way to notify the user when the owner is crashed. return cerror.ErrServiceSafepointLost.GenWithStackByArgs(actual) } o.gcSafepointLastUpdate = time.Now() diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh index 9bde459aa69..bf77cb26fec 100755 --- a/tests/gc_safepoint/run.sh +++ b/tests/gc_safepoint/run.sh @@ -62,30 +62,11 @@ function check_changefeed_state() { fi } -function check_changefeed_mark_failed() { - endpoints=$1 - changefeedid=$2 - error_msg=$3 - info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s) - echo $info - state=$(echo $info|jq -r '.state') - if [[ ! "$state" == "failed" ]]; then - echo "changefeed state $state does not equal to failed" - exit 1 - fi - message=$(echo $info|jq -r '.error.message') - if [[ ! "$message" =~ "$error_msg" ]]; then - echo "error message '$message' is not as expected '$error_msg'" - exit 1 - fi -} - export -f get_safepoint export -f check_safepoint_forward export -f check_safepoint_cleared export -f check_safepoint_equal export -f check_changefeed_state -export -f check_changefeed_mark_failed function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR @@ -139,13 +120,6 @@ function run() { cdc cli changefeed remove --changefeed-id=$changefeed_id2 --pd=$pd_addr ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id2 "removed" ensure $MAX_RETRIES check_safepoint_cleared $pd_addr $pd_cluster_id - cleanup_process $CDC_BINARY - - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr \ - --failpoint 'github.com/pingcap/ticdc/cdc/MockClearGCSafepoint=return(true)' - # test error handling - changefeed_id3=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') - ensure $MAX_RETRIES check_changefeed_mark_failed ${pd_addr} ${changefeed_id3} "ErrServiceSafepointLost" cleanup_process $CDC_BINARY } From 398a1f5ca93f77c7aeaab74822348f86dbeb06a0 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Sat, 9 Jan 2021 22:44:31 +0800 Subject: [PATCH 10/30] fix error.toml --- cdc/owner.go | 9 ++++----- tests/gc_safepoint/run.sh | 26 -------------------------- 2 files changed, 4 insertions(+), 31 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index 0e20afe2743..bf80b75a418 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -702,14 +702,13 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { return cerror.ErrUpdateServiceSafepointFailed.Wrap(err) } - failpoint.Inject("MockClearGCSafepoint", func() { - // cause an error for integration testing - failpoint.Return(cerror.ErrServiceSafepointLost.GenWithStackByArgs(0)) - }) - if actual > minCheckpointTs { // UpdateServiceGCSafePoint has failed. log.Warn("updating service safe point failed", zap.Uint64("checkpoint-ts", minCheckpointTs), zap.Uint64("min-safepoint", actual)) + + // Returning error here crashes the entire owner, but this seems necessary to prevent data corruption. + // This error should be rare. + // TODO better way to notify the user when the owner is crashed. return cerror.ErrServiceSafepointLost.GenWithStackByArgs(actual) } o.gcSafepointLastUpdate = time.Now() diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh index 9bde459aa69..bf77cb26fec 100755 --- a/tests/gc_safepoint/run.sh +++ b/tests/gc_safepoint/run.sh @@ -62,30 +62,11 @@ function check_changefeed_state() { fi } -function check_changefeed_mark_failed() { - endpoints=$1 - changefeedid=$2 - error_msg=$3 - info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s) - echo $info - state=$(echo $info|jq -r '.state') - if [[ ! "$state" == "failed" ]]; then - echo "changefeed state $state does not equal to failed" - exit 1 - fi - message=$(echo $info|jq -r '.error.message') - if [[ ! "$message" =~ "$error_msg" ]]; then - echo "error message '$message' is not as expected '$error_msg'" - exit 1 - fi -} - export -f get_safepoint export -f check_safepoint_forward export -f check_safepoint_cleared export -f check_safepoint_equal export -f check_changefeed_state -export -f check_changefeed_mark_failed function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR @@ -139,13 +120,6 @@ function run() { cdc cli changefeed remove --changefeed-id=$changefeed_id2 --pd=$pd_addr ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id2 "removed" ensure $MAX_RETRIES check_safepoint_cleared $pd_addr $pd_cluster_id - cleanup_process $CDC_BINARY - - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr \ - --failpoint 'github.com/pingcap/ticdc/cdc/MockClearGCSafepoint=return(true)' - # test error handling - changefeed_id3=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') - ensure $MAX_RETRIES check_changefeed_mark_failed ${pd_addr} ${changefeed_id3} "ErrServiceSafepointLost" cleanup_process $CDC_BINARY } From a5a038fd89d6977f2ffce80951e223e0b4a1d4f5 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 11 Jan 2021 12:11:21 +0800 Subject: [PATCH 11/30] fix error.toml --- errors.toml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/errors.toml b/errors.toml index 875bfea3f2d..813019703ac 100755 --- a/errors.toml +++ b/errors.toml @@ -616,6 +616,11 @@ error = ''' server creates pd client failed ''' +["CDC:ErrServiceSafepointLost"] +error = ''' +service safepoint lost. current safepoint is %d, please remove all changefeed(s) whose checkpoints are behind the current safepoint +''' + ["CDC:ErrSinkURIInvalid"] error = ''' sink uri invalid @@ -696,6 +701,11 @@ error = ''' unmarshal failed ''' +["CDC:ErrUpdateServiceSafepointFailed"] +error = ''' +updating service safepoint failed +''' + ["CDC:ErrVersionIncompatible"] error = ''' version is incompatible: %s From c011ea1f872f2ffdb172b3f44687f219edf10024 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 12 Jan 2021 14:20:25 +0800 Subject: [PATCH 12/30] failing changefeeds --- cdc/owner.go | 27 +++++++++++++++---- cdc/owner_test.go | 67 ++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 86 insertions(+), 8 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index bf80b75a418..25d24f52b8a 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -699,17 +699,34 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { actual, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs) if err != nil { log.Warn("failed to update service safe point", zap.Error(err)) - return cerror.ErrUpdateServiceSafepointFailed.Wrap(err) + // We do not throw an error unless updating GC safepoint has been failing for more than GCSafepointUpdateInterval. + if time.Since(o.gcSafepointLastUpdate) >= GCSafepointUpdateInterval { + return cerror.ErrUpdateServiceSafepointFailed.Wrap(err) + } } if actual > minCheckpointTs { // UpdateServiceGCSafePoint has failed. log.Warn("updating service safe point failed", zap.Uint64("checkpoint-ts", minCheckpointTs), zap.Uint64("min-safepoint", actual)) - // Returning error here crashes the entire owner, but this seems necessary to prevent data corruption. - // This error should be rare. - // TODO better way to notify the user when the owner is crashed. - return cerror.ErrServiceSafepointLost.GenWithStackByArgs(actual) + for cfID, cf := range o.changeFeeds { + if cf.status.CheckpointTs < actual { + // Mark unrecoverable changefeeds as Failed. + cf.info.State = model.StateFailed + cf.info.Error = &model.RunningError{ + Addr: util.CaptureAddrFromCtx(ctx), + Code: "CDC-owner-1001", + Message: cerror.ErrServiceSafepointLost.GenWithStackByArgs(actual).Error(), + } + cf.info.ErrorHis = append(cf.info.ErrorHis, time.Now().UnixNano()/1e6) + + err := o.etcdClient.SaveChangeFeedInfo(ctx, cf.info, cfID) + if err != nil { + return err + } + } + } + return nil } o.gcSafepointLastUpdate = time.Now() } diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 8462774927e..ca2d67fdb11 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -83,12 +83,18 @@ func (s *ownerSuite) TearDownTest(c *check.C) { type mockPDClient struct { pd.Client - invokeCounter int + invokeCounter int + mockSafePointLost bool } func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { m.invokeCounter++ - return 0, errors.New("mock error") + + if m.mockSafePointLost { + return 1000, nil + } + + return safePoint, nil } func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) { @@ -99,13 +105,68 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) { gcSafepointLastUpdate: time.Now(), } - // Owner should ignore UpdateServiceGCSafePoint error. err := mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) c.Assert(mockPDCli.invokeCounter, check.Equals, 1) s.TearDownTest(c) } +func (s *ownerSuite) TestOwnerUploadGCSafePointFailed(c *check.C) { + defer testleak.AfterTest(c)() + mockPDCli := &mockPDClient{ + mockSafePointLost: true, + } + + changeFeeds := map[model.ChangeFeedID]*changeFeed{ + "test_change_feed_1": { + info: &model.ChangeFeedInfo{}, + status: &model.ChangeFeedStatus{ + CheckpointTs: 100, + }, + targetTs: 2000, + ddlState: model.ChangeFeedSyncDML, + taskStatus: model.ProcessorsInfos{ + "capture_1": {}, + "capture_2": {}, + }, + taskPositions: map[string]*model.TaskPosition{ + "capture_1": {}, + "capture_2": {}, + }, + }, + "test_change_feed_2": { + info: &model.ChangeFeedInfo{}, + status: &model.ChangeFeedStatus{ + CheckpointTs: 1100, + }, + targetTs: 2000, + ddlState: model.ChangeFeedSyncDML, + taskStatus: model.ProcessorsInfos{ + "capture_1": {}, + "capture_2": {}, + }, + taskPositions: map[string]*model.TaskPosition{ + "capture_1": {}, + "capture_2": {}, + }, + }, + } + + mockOwner := Owner{ + pdClient: mockPDCli, + gcSafepointLastUpdate: time.Now(), + changeFeeds: changeFeeds, + } + + err := mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockPDCli.invokeCounter, check.Equals, 1) + + c.Assert(changeFeeds["test_change_feed_1"].info.State, check.Equals, model.StateFailed) + c.Assert(changeFeeds["test_change_feed_2"].info.State, check.Equals, model.StateNormal) + s.TearDownTest(c) +} + /* type handlerForPrueDMLTest struct { mu sync.RWMutex From db320ca61697c7411fe785e9d454213aa3cb0f4f Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 12 Jan 2021 14:50:52 +0800 Subject: [PATCH 13/30] add unit tests --- cdc/owner_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 8d5980e4555..e373c6c101a 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -120,7 +120,7 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointFailed(c *check.C) { changeFeeds := map[model.ChangeFeedID]*changeFeed{ "test_change_feed_1": { - info: &model.ChangeFeedInfo{}, + info: &model.ChangeFeedInfo{State: model.StateNormal}, status: &model.ChangeFeedStatus{ CheckpointTs: 100, }, @@ -136,7 +136,7 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointFailed(c *check.C) { }, }, "test_change_feed_2": { - info: &model.ChangeFeedInfo{}, + info: &model.ChangeFeedInfo{State: model.StateNormal}, status: &model.ChangeFeedStatus{ CheckpointTs: 1100, }, @@ -154,9 +154,11 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointFailed(c *check.C) { } mockOwner := Owner{ - pdClient: mockPDCli, - gcSafepointLastUpdate: time.Now(), - changeFeeds: changeFeeds, + pdClient: mockPDCli, + etcdClient: s.client, + lastFlushChangefeeds: time.Now(), + flushChangefeedInterval: 1 * time.Hour, + changeFeeds: changeFeeds, } err := mockOwner.flushChangeFeedInfos(s.ctx) From 1c4ac8c18419d03751c1a7330a4d0967dde0d6ad Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 12 Jan 2021 15:17:34 +0800 Subject: [PATCH 14/30] add integration test --- cdc/owner.go | 4 ++++ tests/changefeed_error/run.sh | 30 ++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/cdc/owner.go b/cdc/owner.go index f2a7d970b07..798d95b33bb 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -708,6 +708,10 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { } } + failpoint.Inject("InjectActualGCSafePoint", func(val failpoint.Value) { + actual = uint64(val.(int)) + }) + if actual > minCheckpointTs { // UpdateServiceGCSafePoint has failed. log.Warn("updating service safe point failed", zap.Uint64("checkpoint-ts", minCheckpointTs), zap.Uint64("min-safepoint", actual)) diff --git a/tests/changefeed_error/run.sh b/tests/changefeed_error/run.sh index 6c6f5518ba2..ccf9960199b 100755 --- a/tests/changefeed_error/run.sh +++ b/tests/changefeed_error/run.sh @@ -27,6 +27,24 @@ function check_changefeed_mark_failed() { fi } +function check_changefeed_mark_failed_regex() { + endpoints=$1 + changefeedid=$2 + error_msg=$3 + info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s) + echo "$info" + state=$(echo $info|jq -r '.state') + if [[ ! "$state" == "failed" ]]; then + echo "changefeed state $state does not equal to failed" + exit 1 + fi + message=$(echo $info|jq -r '.error.message') + if [[ ! "$message" =~ $error_msg ]]; then + echo "error message '$message' does not match '$error_msg'" + exit 1 + fi +} + function check_changefeed_mark_stopped() { endpoints=$1 changefeedid=$2 @@ -113,6 +131,7 @@ function run() { export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + # owner DDL error case export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectChangefeedDDLError=return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY changefeedid_1=$(cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') @@ -120,6 +139,17 @@ function run() { run_sql "CREATE table changefeed_error.DDLERROR(id int primary key, val int);" ensure $MAX_RETRIES check_changefeed_mark_stopped http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_1} "[CDC:ErrExecDDLFailed]exec DDL failed" + cdc cli changefeed remove -c $changefeedid_1 + cleanup_process $CDC_BINARY + + # updating GC safepoint failure case + export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectActualGCSafePoint=3*off->return(0)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + changefeedid_2=$(cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') + ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "service safepoint lost" + + cdc cli changefeed remove -c $changefeedid_2 export GO_FAILPOINTS='' cleanup_process $CDC_BINARY } From 012bbf4efca5cb0d2925d0d7ef9d0be67145dee2 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 12 Jan 2021 15:51:50 +0800 Subject: [PATCH 15/30] fix integration test --- tests/changefeed_error/run.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/changefeed_error/run.sh b/tests/changefeed_error/run.sh index ccf9960199b..0994f3d767a 100755 --- a/tests/changefeed_error/run.sh +++ b/tests/changefeed_error/run.sh @@ -80,6 +80,7 @@ function check_no_capture() { } export -f check_changefeed_mark_failed +export -f check_changefeed_mark_failed_regex export -f check_changefeed_mark_stopped export -f check_no_changefeed export -f check_no_capture From da6dd92ae50acc0b636480802d34e9f6330b8641 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 12 Jan 2021 16:27:50 +0800 Subject: [PATCH 16/30] fix integration test --- tests/changefeed_error/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/changefeed_error/run.sh b/tests/changefeed_error/run.sh index 0994f3d767a..9d8d975ac3f 100755 --- a/tests/changefeed_error/run.sh +++ b/tests/changefeed_error/run.sh @@ -144,7 +144,7 @@ function run() { cleanup_process $CDC_BINARY # updating GC safepoint failure case - export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectActualGCSafePoint=3*off->return(0)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectActualGCSafePoint=return(0)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY changefeedid_2=$(cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') From 9d8a250c841649cd47326ed693e8e477343cc0f9 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 12 Jan 2021 16:33:27 +0800 Subject: [PATCH 17/30] fix integration test --- tests/changefeed_error/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/changefeed_error/run.sh b/tests/changefeed_error/run.sh index 9d8d975ac3f..5499c653261 100755 --- a/tests/changefeed_error/run.sh +++ b/tests/changefeed_error/run.sh @@ -144,7 +144,7 @@ function run() { cleanup_process $CDC_BINARY # updating GC safepoint failure case - export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectActualGCSafePoint=return(0)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectActualGCSafePoint=return(9223372036854775807)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY changefeedid_2=$(cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') From 06999332bd2ccb1a154b4563a28e2965454a8cc4 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 12 Jan 2021 18:18:45 +0800 Subject: [PATCH 18/30] fix gc safepoint error handling --- cdc/owner.go | 13 +++++++----- cdc/owner_test.go | 54 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index 798d95b33bb..3e837519193 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -701,11 +701,15 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { if time.Since(o.gcSafepointLastUpdate) > GCSafepointUpdateInterval { actual, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs) if err != nil { - log.Warn("failed to update service safe point", zap.Error(err)) - // We do not throw an error unless updating GC safepoint has been failing for more than GCSafepointUpdateInterval. - if time.Since(o.gcSafepointLastUpdate) >= GCSafepointUpdateInterval { + sinceLastUpdate := time.Since(o.gcSafepointLastUpdate) + log.Warn("failed to update service safe point", zap.Error(err), + zap.Duration("since-last-update", sinceLastUpdate)) + // We do not throw an error unless updating GC safepoint has been failing for more than gcTTL. + if sinceLastUpdate >= time.Second*time.Duration(o.gcTTL) { return cerror.ErrUpdateServiceSafepointFailed.Wrap(err) } + } else { + o.gcSafepointLastUpdate = time.Now() } failpoint.Inject("InjectActualGCSafePoint", func(val failpoint.Value) { @@ -714,7 +718,7 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { if actual > minCheckpointTs { // UpdateServiceGCSafePoint has failed. - log.Warn("updating service safe point failed", zap.Uint64("checkpoint-ts", minCheckpointTs), zap.Uint64("min-safepoint", actual)) + log.Warn("updating an outdated service safe point", zap.Uint64("checkpoint-ts", minCheckpointTs), zap.Uint64("actual-safepoint", actual)) for cfID, cf := range o.changeFeeds { if cf.status.CheckpointTs < actual { @@ -735,7 +739,6 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { } return nil } - o.gcSafepointLastUpdate = time.Now() } return nil } diff --git a/cdc/owner_test.go b/cdc/owner_test.go index e373c6c101a..a9cc8095170 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -86,6 +86,7 @@ type mockPDClient struct { pd.Client invokeCounter int mockSafePointLost bool + mockPDFailure bool } func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { @@ -94,6 +95,9 @@ func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s if m.mockSafePointLost { return 1000, nil } + if m.mockPDFailure { + return 0, errors.New("injected PD failure") + } return safePoint, nil } @@ -112,7 +116,55 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) { s.TearDownTest(c) } -func (s *ownerSuite) TestOwnerUploadGCSafePointFailed(c *check.C) { +func (s *ownerSuite) TestOwnerFlushChangeFeedInfosFailed(c *check.C) { + defer testleak.AfterTest(c)() + mockPDCli := &mockPDClient{ + mockPDFailure: true, + } + + changeFeeds := map[model.ChangeFeedID]*changeFeed{ + "test_change_feed_1": { + info: &model.ChangeFeedInfo{State: model.StateNormal}, + status: &model.ChangeFeedStatus{ + CheckpointTs: 100, + }, + targetTs: 2000, + ddlState: model.ChangeFeedSyncDML, + taskStatus: model.ProcessorsInfos{ + "capture_1": {}, + "capture_2": {}, + }, + taskPositions: map[string]*model.TaskPosition{ + "capture_1": {}, + "capture_2": {}, + }, + }, + } + + mockOwner := Owner{ + pdClient: mockPDCli, + etcdClient: s.client, + lastFlushChangefeeds: time.Now(), + flushChangefeedInterval: 1 * time.Hour, + gcSafepointLastUpdate: time.Now(), + gcTTL: 6, // 6 seconds + changeFeeds: changeFeeds, + } + + time.Sleep(3 * time.Second) + err := mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockPDCli.invokeCounter, check.Equals, 1) + + time.Sleep(6 * time.Second) + err = mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.ErrorMatches, ".*CDC:ErrUpdateServiceSafepointFailed.*") + c.Assert(mockPDCli.invokeCounter, check.Equals, 2) + + s.TearDownTest(c) +} + +func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) { defer testleak.AfterTest(c)() mockPDCli := &mockPDClient{ mockSafePointLost: true, From e2af41fefb8f191039ac5503d8eb8489f3df5975 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Wed, 13 Jan 2021 17:27:11 +0800 Subject: [PATCH 19/30] remove unnecessary return --- cdc/owner.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cdc/owner.go b/cdc/owner.go index 3e837519193..b67d9c3863c 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -737,7 +737,6 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { } } } - return nil } } return nil From a417ac809d9126d1fab9e13062892e1d3a1375d2 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 15 Jan 2021 14:32:04 +0800 Subject: [PATCH 20/30] stop changefeeds properly --- cdc/owner.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cdc/owner.go b/cdc/owner.go index b67d9c3863c..dbf082a408a 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -735,6 +735,14 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { if err != nil { return err } + + err = o.EnqueueJob(model.AdminJob{ + CfID: cf.id, + Type: model.AdminStop, + }) + if err != nil { + return err + } } } } From f59999474a863ff7494b005b5d1d7d0dbcb22c02 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 15 Jan 2021 17:38:25 +0800 Subject: [PATCH 21/30] stop changefeeds properly --- cdc/owner.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index dbf082a408a..3cc16441707 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -941,10 +941,12 @@ func (o *Owner) handleAdminJob(ctx context.Context) error { continue } - cf.info.AdminJobType = model.AdminStop - cf.info.Error = job.Error - if job.Error != nil { - cf.info.ErrorHis = append(cf.info.ErrorHis, time.Now().UnixNano()/1e6) + if cf.info.State != model.StateFailed { + cf.info.AdminJobType = model.AdminStop + cf.info.Error = job.Error + if job.Error != nil { + cf.info.ErrorHis = append(cf.info.ErrorHis, time.Now().UnixNano()/1e6) + } } err := o.etcdClient.SaveChangeFeedInfo(ctx, cf.info, job.CfID) if err != nil { From f97af2e1893e5861c1a7f9fccbe869aaa08099f9 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 15 Jan 2021 19:53:41 +0800 Subject: [PATCH 22/30] WIP --- cdc/owner.go | 3 +-- pkg/errors/errors.go | 1 + pkg/util/gc_service.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index 3cc16441707..f4d52c6c40b 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/ticdc/pkg/scheduler" "github.com/pingcap/ticdc/pkg/security" "github.com/pingcap/ticdc/pkg/util" - "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" pd "github.com/tikv/pd/client" "go.etcd.io/etcd/clientv3" @@ -231,7 +230,7 @@ func (o *Owner) newChangeFeed( } } failpoint.Inject("NewChangefeedNoRetryError", func() { - failpoint.Return(nil, tikv.ErrGCTooEarly.GenWithStackByArgs(checkpointTs-300, checkpointTs)) + failpoint.Return(nil, cerror.ErrStartTsBeforeGC.GenWithStackByArgs(checkpointTs-300, checkpointTs)) }) failpoint.Inject("NewChangefeedRetryError", func() { diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 9f3453cfdbe..afc5b99b1e5 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -177,6 +177,7 @@ var ( ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted")) ErrServiceSafepointLost = errors.Normalize("service safepoint lost. current safepoint is %d, please remove all changefeed(s) whose checkpoints are behind the current safepoint", errors.RFCCodeText("CDC:ErrServiceSafepointLost")) ErrUpdateServiceSafepointFailed = errors.Normalize("updating service safepoint failed", errors.RFCCodeText("CDC:ErrUpdateServiceSafepointFailed")) + ErrStartTsBeforeGC = errors.Normalize("fail to create changefeed because start-ts %d is earlier than GC safepoint at %d", errors.RFCCodeText("CDC:ErrStartTsBeforeGC")) // EtcdWorker related errors. Internal use only. // ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort. diff --git a/pkg/util/gc_service.go b/pkg/util/gc_service.go index e398d92ad93..24b6fb2cf5c 100644 --- a/pkg/util/gc_service.go +++ b/pkg/util/gc_service.go @@ -17,7 +17,7 @@ import ( "context" "github.com/pingcap/errors" - "github.com/pingcap/tidb/store/tikv" + cerrors "github.com/pingcap/ticdc/pkg/errors" pd "github.com/tikv/pd/client" ) @@ -37,7 +37,7 @@ func CheckSafetyOfStartTs(ctx context.Context, pdCli pd.Client, startTs uint64) return errors.Trace(err) } if startTs < minServiceGCTs { - return errors.Wrap(tikv.ErrGCTooEarly.GenWithStackByArgs(startTs, minServiceGCTs), "startTs less than gcSafePoint") + return cerrors.ErrStartTsBeforeGC.GenWithStackByArgs(startTs, minServiceGCTs) } return nil } From f951edbe34fa096d897ca931b5ff1f23051cb74b Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 18 Jan 2021 12:24:29 +0800 Subject: [PATCH 23/30] replace tikv error --- errors.toml | 5 +++++ tests/changefeed_error/run.sh | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/errors.toml b/errors.toml index 813019703ac..65afa14daeb 100755 --- a/errors.toml +++ b/errors.toml @@ -646,6 +646,11 @@ error = ''' table %d not found in schema snapshot ''' +["CDC:ErrStartTsBeforeGC"] +error = ''' +fail to create changefeed because start-ts %d is earlier than GC safepoint at %d +''' + ["CDC:ErrSupportPostOnly"] error = ''' this api supports POST method only diff --git a/tests/changefeed_error/run.sh b/tests/changefeed_error/run.sh index 5499c653261..acaad62ddd0 100755 --- a/tests/changefeed_error/run.sh +++ b/tests/changefeed_error/run.sh @@ -109,7 +109,7 @@ function run() { run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" fi - ensure $MAX_RETRIES check_changefeed_mark_failed http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "\[tikv:9006\]GC life time is shorter than transaction duration.*" + ensure $MAX_RETRIES check_changefeed_mark_failed http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} ".*CDC:ErrStartTsBeforeGC.*" changefeed_info=$(ETCDCTL_API=3 etcdctl --endpoints=${UP_PD_HOST_1}:${UP_PD_PORT_1} get /tidb/cdc/changefeed/info/${changefeedid}|tail -n 1) new_info=$(echo $changefeed_info|sed 's/"state":"failed"/"state":"normal"/g') ETCDCTL_API=3 etcdctl --endpoints=${UP_PD_HOST_1}:${UP_PD_PORT_1} put /tidb/cdc/changefeed/info/${changefeedid} "$new_info" From d1827f5e0b56f13eaa9bfa315cad1fdd331a901e Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 18 Jan 2021 12:41:11 +0800 Subject: [PATCH 24/30] fix unit test --- pkg/util/gc_service_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/gc_service_test.go b/pkg/util/gc_service_test.go index 30d80a4bd12..322f1903eb5 100644 --- a/pkg/util/gc_service_test.go +++ b/pkg/util/gc_service_test.go @@ -35,7 +35,7 @@ func (s *gcServiceSuite) TestCheckSafetyOfStartTs(c *check.C) { ctx := context.Background() s.pdCli.UpdateServiceGCSafePoint(ctx, "service1", 10, 60) //nolint:errcheck err := CheckSafetyOfStartTs(ctx, s.pdCli, 50) - c.Assert(err.Error(), check.Equals, "startTs less than gcSafePoint: [tikv:9006]GC life time is shorter than transaction duration, transaction starts at 50, GC safe point is 60") + c.Assert(err.Error(), check.Equals, "[CDC:ErrStartTsBeforeGC]fail to create changefeed because start-ts 50 is earlier than GC safepoint at 60") s.pdCli.UpdateServiceGCSafePoint(ctx, "service2", 10, 80) //nolint:errcheck s.pdCli.UpdateServiceGCSafePoint(ctx, "service3", 10, 70) //nolint:errcheck err = CheckSafetyOfStartTs(ctx, s.pdCli, 65) From 58233a10c9563171573b7e410b4b8f939b86844d Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 18 Jan 2021 13:37:42 +0800 Subject: [PATCH 25/30] fix failing changefeed --- cdc/owner.go | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index f4d52c6c40b..3e545c73a7b 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -734,14 +734,6 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { if err != nil { return err } - - err = o.EnqueueJob(model.AdminJob{ - CfID: cf.id, - Type: model.AdminStop, - }) - if err != nil { - return err - } } } } @@ -940,13 +932,12 @@ func (o *Owner) handleAdminJob(ctx context.Context) error { continue } - if cf.info.State != model.StateFailed { - cf.info.AdminJobType = model.AdminStop - cf.info.Error = job.Error - if job.Error != nil { - cf.info.ErrorHis = append(cf.info.ErrorHis, time.Now().UnixNano()/1e6) - } + cf.info.AdminJobType = model.AdminStop + cf.info.Error = job.Error + if job.Error != nil { + cf.info.ErrorHis = append(cf.info.ErrorHis, time.Now().UnixNano()/1e6) } + err := o.etcdClient.SaveChangeFeedInfo(ctx, cf.info, job.CfID) if err != nil { return errors.Trace(err) From ea7295a66b7c02fdc52d502f05702e4e8e6c8eab Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 18 Jan 2021 14:22:37 +0800 Subject: [PATCH 26/30] fix error handling --- pkg/filter/errors.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/filter/errors.go b/pkg/filter/errors.go index cb734b216c1..4cd65bfb67b 100644 --- a/pkg/filter/errors.go +++ b/pkg/filter/errors.go @@ -14,12 +14,12 @@ package filter import ( - "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/errors" + cerror "github.com/pingcap/ticdc/pkg/errors" ) // ChangefeedFastFailError checks the error, returns true if it is meaningless // to retry on this error func ChangefeedFastFailError(err error) bool { - return terror.ErrorEqual(err, tikv.ErrGCTooEarly) + return cerror.ErrStartTsBeforeGC.Equal(errors.Cause(err)) } From 7047deb1d2e299dfc0005b53f216c7b24e8d5a90 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 18 Jan 2021 15:44:10 +0800 Subject: [PATCH 27/30] fix integration test --- tests/changefeed_error/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/changefeed_error/run.sh b/tests/changefeed_error/run.sh index acaad62ddd0..173ec92b95a 100755 --- a/tests/changefeed_error/run.sh +++ b/tests/changefeed_error/run.sh @@ -109,7 +109,7 @@ function run() { run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" fi - ensure $MAX_RETRIES check_changefeed_mark_failed http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} ".*CDC:ErrStartTsBeforeGC.*" + ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} ".*CDC:ErrStartTsBeforeGC.*" changefeed_info=$(ETCDCTL_API=3 etcdctl --endpoints=${UP_PD_HOST_1}:${UP_PD_PORT_1} get /tidb/cdc/changefeed/info/${changefeedid}|tail -n 1) new_info=$(echo $changefeed_info|sed 's/"state":"failed"/"state":"normal"/g') ETCDCTL_API=3 etcdctl --endpoints=${UP_PD_HOST_1}:${UP_PD_PORT_1} put /tidb/cdc/changefeed/info/${changefeedid} "$new_info" From a6be966fe1ae7f7a876c6127a1541c6a9ed7443a Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 25 Jan 2021 11:54:32 +0800 Subject: [PATCH 28/30] stop the feeds properly --- cdc/owner.go | 13 +++++++------ tests/changefeed_error/run.sh | 8 ++++---- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index ed4800c8216..13785dbc463 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -736,18 +736,19 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { for cfID, cf := range o.changeFeeds { if cf.status.CheckpointTs < actual { - // Mark unrecoverable changefeeds as Failed. - cf.info.State = model.StateFailed - cf.info.Error = &model.RunningError{ + runningError := &model.RunningError{ Addr: util.CaptureAddrFromCtx(ctx), Code: "CDC-owner-1001", Message: cerror.ErrServiceSafepointLost.GenWithStackByArgs(actual).Error(), } - cf.info.ErrorHis = append(cf.info.ErrorHis, time.Now().UnixNano()/1e6) - err := o.etcdClient.SaveChangeFeedInfo(ctx, cf.info, cfID) + err := o.EnqueueJob(model.AdminJob{ + CfID: cfID, + Type: model.AdminStop, + Error: runningError, + }) if err != nil { - return err + return errors.Trace(err) } } } diff --git a/tests/changefeed_error/run.sh b/tests/changefeed_error/run.sh index 2b0ab537055..982c170fc5d 100755 --- a/tests/changefeed_error/run.sh +++ b/tests/changefeed_error/run.sh @@ -27,14 +27,14 @@ function check_changefeed_mark_failed() { fi } -function check_changefeed_mark_failed_regex() { +function check_changefeed_mark_stopped_regex() { endpoints=$1 changefeedid=$2 error_msg=$3 info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s) echo "$info" state=$(echo $info|jq -r '.state') - if [[ ! "$state" == "failed" ]]; then + if [[ ! "$state" == "stopped" ]]; then echo "changefeed state $state does not equal to failed" exit 1 fi @@ -80,7 +80,7 @@ function check_no_capture() { } export -f check_changefeed_mark_failed -export -f check_changefeed_mark_failed_regex +export -f check_changefeed_mark_stopped_regex export -f check_changefeed_mark_stopped export -f check_no_changefeed export -f check_no_capture @@ -148,7 +148,7 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY changefeedid_2=$(cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') - ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "service safepoint lost" + ensure $MAX_RETRIES check_changefeed_mark_stopped_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "service safepoint lost" cdc cli changefeed remove -c $changefeedid_2 export GO_FAILPOINTS='' From 4ff620349d7ba8fc2d8b23ea73f97ce147bce1eb Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 25 Jan 2021 12:47:57 +0800 Subject: [PATCH 29/30] fix unit tests --- cdc/changefeed.go | 25 +++++++++++++++++-------- cdc/owner_test.go | 13 ++++++++++--- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index ebe71d73ecd..d0141877a4b 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -987,20 +987,29 @@ func (c *changeFeed) startSyncPointTicker(ctx context.Context, interval time.Dur } func (c *changeFeed) Close() { - err := c.ddlHandler.Close() - if err != nil && errors.Cause(err) != context.Canceled { - log.Warn("failed to close ddl handler", zap.Error(err)) + if c.ddlHandler != nil { + err := c.ddlHandler.Close() + if err != nil && errors.Cause(err) != context.Canceled { + log.Warn("failed to close ddl handler", zap.Error(err)) + } } - err = c.sink.Close() - if err != nil && errors.Cause(err) != context.Canceled { - log.Warn("failed to close owner sink", zap.Error(err)) + + if c.sink != nil { + err := c.sink.Close() + if err != nil && errors.Cause(err) != context.Canceled { + log.Warn("failed to close owner sink", zap.Error(err)) + } } + if c.syncpointStore != nil { - err = c.syncpointStore.Close() + err := c.syncpointStore.Close() if err != nil && errors.Cause(err) != context.Canceled { log.Warn("failed to close owner sink", zap.Error(err)) } } - c.cancel() + + if c.cancel != nil { + c.cancel() + } log.Info("changefeed closed", zap.String("id", c.id)) } diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 49ac0ddccb3..052223b756b 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -172,7 +172,8 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) { changeFeeds := map[model.ChangeFeedID]*changeFeed{ "test_change_feed_1": { - info: &model.ChangeFeedInfo{State: model.StateNormal}, + info: &model.ChangeFeedInfo{State: model.StateNormal}, + etcdCli: s.client, status: &model.ChangeFeedStatus{ CheckpointTs: 100, }, @@ -188,7 +189,8 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) { }, }, "test_change_feed_2": { - info: &model.ChangeFeedInfo{State: model.StateNormal}, + info: &model.ChangeFeedInfo{State: model.StateNormal}, + etcdCli: s.client, status: &model.ChangeFeedStatus{ CheckpointTs: 1100, }, @@ -211,13 +213,18 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) { lastFlushChangefeeds: time.Now(), flushChangefeedInterval: 1 * time.Hour, changeFeeds: changeFeeds, + cfRWriter: s.client, + stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus), } err := mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) c.Assert(mockPDCli.invokeCounter, check.Equals, 1) - c.Assert(changeFeeds["test_change_feed_1"].info.State, check.Equals, model.StateFailed) + err = mockOwner.handleAdminJob(s.ctx) + c.Assert(err, check.IsNil) + + c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.NotNil) c.Assert(changeFeeds["test_change_feed_2"].info.State, check.Equals, model.StateNormal) s.TearDownTest(c) } From 27839596ccb7571b88399a75a9c0f52a7039c2a7 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 25 Jan 2021 17:39:20 +0800 Subject: [PATCH 30/30] fix integration test --- tests/changefeed_error/run.sh | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/tests/changefeed_error/run.sh b/tests/changefeed_error/run.sh index 982c170fc5d..0ca2342524f 100755 --- a/tests/changefeed_error/run.sh +++ b/tests/changefeed_error/run.sh @@ -27,6 +27,24 @@ function check_changefeed_mark_failed() { fi } +function check_changefeed_mark_failed_regex() { + endpoints=$1 + changefeedid=$2 + error_msg=$3 + info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s) + echo "$info" + state=$(echo $info|jq -r '.state') + if [[ ! "$state" == "failed" ]]; then + echo "changefeed state $state does not equal to failed" + exit 1 + fi + message=$(echo $info|jq -r '.error.message') + if [[ ! "$message" =~ $error_msg ]]; then + echo "error message '$message' does not match '$error_msg'" + exit 1 + fi +} + function check_changefeed_mark_stopped_regex() { endpoints=$1 changefeedid=$2 @@ -35,7 +53,7 @@ function check_changefeed_mark_stopped_regex() { echo "$info" state=$(echo $info|jq -r '.state') if [[ ! "$state" == "stopped" ]]; then - echo "changefeed state $state does not equal to failed" + echo "changefeed state $state does not equal to stopped" exit 1 fi message=$(echo $info|jq -r '.error.message') @@ -80,6 +98,7 @@ function check_no_capture() { } export -f check_changefeed_mark_failed +export -f check_changefeed_mark_failed_regex export -f check_changefeed_mark_stopped_regex export -f check_changefeed_mark_stopped export -f check_no_changefeed