diff --git a/Makefile b/Makefile index da53a4eeddc..da4502c3c68 100644 --- a/Makefile +++ b/Makefile @@ -81,6 +81,7 @@ check_third_party_binary: @which bin/pd-ctl @which bin/sync_diff_inspector @which bin/go-ycsb + @which bin/etcdctl integration_test_build: check_failpoint_ctl $(FAILPOINT_ENABLE) diff --git a/cdc/owner.go b/cdc/owner.go index 9499b1ac203..54fc76f4165 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -592,8 +592,20 @@ func (o *Owner) cleanUpStaleTasks(ctx context.Context) error { if err != nil { return errors.Trace(err) } - + positions, err := o.etcdClient.GetAllTaskPositions(ctx, changeFeedID) + if err != nil { + return errors.Trace(err) + } + // in most cases statuses and positions have the same keys + captureIDs := make(map[string]struct{}, len(statuses)) for captureID := range statuses { + captureIDs[captureID] = struct{}{} + } + for captureID := range positions { + captureIDs[captureID] = struct{}{} + } + + for captureID := range captureIDs { if _, ok := active[captureID]; !ok { if err := o.etcdClient.DeleteTaskStatus(ctx, changeFeedID, captureID); err != nil { return errors.Trace(err) diff --git a/scripts/jenkins_ci/integration_test_common.groovy b/scripts/jenkins_ci/integration_test_common.groovy index 1349c204233..3279261999e 100644 --- a/scripts/jenkins_ci/integration_test_common.groovy +++ b/scripts/jenkins_ci/integration_test_common.groovy @@ -56,6 +56,8 @@ def prepare_binaries() { curl http://download.pingcap.org/tiflash-nightly-linux-amd64.tar.gz | tar xz -C third_bin mv third_bin/tiflash-nightly-linux-amd64/* third_bin curl ${FILE_SERVER_URL}/download/builds/pingcap/go-ycsb/test-br/go-ycsb -o third_bin/go-ycsb + curl -L https://github.com/etcd-io/etcd/releases/download/v3.4.7/etcd-v3.4.7-linux-amd64.tar.gz | tar xz -C ./tmp + mv tmp/etcd-v3.4.7-linux-amd64/etcdctl third_bin curl https://download.pingcap.org/tidb-tools-v2.1.6-linux-amd64.tar.gz | tar xz -C ./tmp tidb-tools-v2.1.6-linux-amd64/bin/sync_diff_inspector mv tmp/tidb-tools-v2.1.6-linux-amd64/bin/* third_bin chmod a+x third_bin/* diff --git a/tests/availability/capture.sh b/tests/availability/capture.sh index e735c46812b..a103b6e0c5c 100755 --- a/tests/availability/capture.sh +++ b/tests/availability/capture.sh @@ -28,11 +28,19 @@ function check_result() { ensure $MAX_RETRIES sql_check } +function empty() { + sql=$* + run_sql "$sql" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} && + check_not_contains "id:" +} + function nonempty() { sql=$* run_sql "$sql" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} && check_contains "id:" } + +export -f empty export -f nonempty function test_capture_ha() { diff --git a/tests/availability/owner.sh b/tests/availability/owner.sh index c9c3ffba25b..c0eb106408d 100755 --- a/tests/availability/owner.sh +++ b/tests/availability/owner.sh @@ -13,6 +13,7 @@ function test_owner_ha() { test_kill_owner test_hang_up_owner test_expire_owner + test_owner_cleanup_stale_tasks } # test_kill_owner starts two captures and kill the owner # we expect the live capture will be elected as the new @@ -111,3 +112,41 @@ function test_expire_owner() { cleanup_process $CDC_BINARY } + +function test_owner_cleanup_stale_tasks() { + echo "run test case test_owner_cleanup_stale_tasks" + + # start a capture server + run_cdc_server $WORK_DIR $CDC_BINARY + # ensure the server become the owner + ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep '\"is-owner\": true'" + owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') + owner_id=$($CDC_BINARY cli capture list 2>&1 | awk -F '"' '/id/{print $4}') + echo "owner pid:" $owner_pid + echo "owner id" $owner_id + + # run another server + run_cdc_server $WORK_DIR $CDC_BINARY + ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep -v \"$owner_id\" | grep id" + capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}' | grep -v "$owner_pid") + capture_id=$($CDC_BINARY cli capture list 2>&1 | awk -F '"' '/id/{print $4}' | grep -v "$owner_id") + echo "capture_id:" $capture_id + + kill -SIGKILL $owner_pid + kill -SIGKILL $capture_pid + # wait capture info expires + sleep 3 + + # simulate task status is deleted but task position stales + etcdctl del /tidb/cdc/task/status --prefix + run_cdc_server $WORK_DIR $CDC_BINARY + ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep '\"is-owner\": true'" + + run_sql "INSERT INTO test.availability(id, val) VALUES (1, 1);" + ensure $MAX_RETRIES nonempty 'select id, val from test.availability where id=1 and val=1' + run_sql "UPDATE test.availability set val = 22 where id = 1;" + ensure $MAX_RETRIES nonempty 'select id, val from test.availability where id=1 and val=22' + run_sql "DELETE from test.availability where id=1;" + ensure $MAX_RETRIES empty 'select id, val from test.availability where id=1' + cleanup_process $CDC_BINARY +}