From 56edbd718c3d9acf9c74532d834659abe761180f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 7 Feb 2022 12:39:34 +0800 Subject: [PATCH 1/2] tests(ticdc): add tests for kafka max-message-bytes (#4125) (#4153) close pingcap/tiflow#4124 --- testing_utils/gen_kafka_big_messages/main.go | 145 ++++++++++++++++++ .../gen_kafka_big_messages/main_test.go | 121 +++++++++++++++ .../kafka_big_messages/conf/diff_config.toml | 29 ++++ .../kafka_big_messages/run.sh | 55 +++++++ .../kafka_messages/conf/diff_config.toml | 2 +- tests/integration_tests/kafka_messages/run.sh | 8 +- 6 files changed, 357 insertions(+), 3 deletions(-) create mode 100644 testing_utils/gen_kafka_big_messages/main.go create mode 100644 testing_utils/gen_kafka_big_messages/main_test.go create mode 100644 tests/integration_tests/kafka_big_messages/conf/diff_config.toml create mode 100755 tests/integration_tests/kafka_big_messages/run.sh diff --git a/testing_utils/gen_kafka_big_messages/main.go b/testing_utils/gen_kafka_big_messages/main.go new file mode 100644 index 00000000000..a3eaa25c643 --- /dev/null +++ b/testing_utils/gen_kafka_big_messages/main.go @@ -0,0 +1,145 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "fmt" + "log" + "os" + "strings" + + "github.com/pingcap/errors" +) + +// See: https://docs.pingcap.com/tidb/stable/tidb-limitations/#limitations-on-string-types +const varcharColumnMaxLen = 16383 + +// Value of col. Defined as a variable for testing. +var colValue = strings.Repeat("a", varcharColumnMaxLen) + +type options struct { + // The size of each row. + // The default is 1MiB. + // FIXME: Currently it does not have precise control over the size of each row. + // The overhead needs to be calculated and processed accurately. + rowBytes int + // Total number of rows. + // The default is 1 line. + rowCount int + // Sql file path. + // The default is `./test.sql`. + sqlFilePath string + // Database name. + // The default is `kafka_big_messages`. + databaseName string + // Table name. + // The default is `test`. + tableName string +} + +func (o *options) validate() error { + if o.rowBytes <= 0 { + return errors.New("rowBytes must be greater than zero") + } + + if o.rowCount <= 0 { + return errors.New("rowCount must be greater than zero") + } + + if o.sqlFilePath == "" { + return errors.New("please specify the correct file path") + } + + if o.databaseName == "" { + return errors.New("please specify the database name") + } + + if o.tableName == "" { + return errors.New("please specify the table name") + } + + return nil +} + +func gatherOptions() *options { + o := &options{} + + fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError) + fs.IntVar(&o.rowBytes, "row-bytes", 1024*1024, "Number of bytes per row.") + fs.IntVar(&o.rowCount, "row-count", 1, "Count of rows.") + fs.StringVar(&o.sqlFilePath, "sql-file-path", "./test.sql", "Sql file path.") + fs.StringVar(&o.databaseName, "database-name", "kafka_big_messages", "Database name.") + fs.StringVar(&o.tableName, "table-name", "test", "Table name.") + + _ = fs.Parse(os.Args[1:]) + return o +} + +func main() { + o := gatherOptions() + if err := o.validate(); err != nil { + log.Panicf("Invalid options: %v", err) + } + + file, err := os.OpenFile(o.sqlFilePath, os.O_CREATE|os.O_RDWR, os.ModePerm) + if err != nil { + log.Panicf("Open sql file failed: %v", err) + } + + _, err = file.Write([]byte(genDatabaseSQL(o.databaseName))) + if err != nil { + log.Panicf("Wirte create database sql failed: %v", err) + } + + _, err = file.Write([]byte(genCreateTableSQL(o.rowBytes, o.tableName))) + if err != nil { + log.Panicf("Wirte create table sql failed: %v", err) + } + + for i := 0; i < o.rowCount; i++ { + _, err = file.Write([]byte(genInsertSQL(o.rowBytes, o.tableName, i))) + if err != nil { + log.Panicf("Wirte insert sql failed: %v", err) + } + } +} + +func genDatabaseSQL(databaseName string) string { + return fmt.Sprintf(`DROP DATABASE IF EXISTS %s; +CREATE DATABASE %s; +USE %s; + +`, databaseName, databaseName, databaseName) +} + +func genCreateTableSQL(rawBytes int, tableName string) string { + var cols string + + for i := 0; i < rawBytes/varcharColumnMaxLen; i++ { + cols = fmt.Sprintf("%s, a%d VARCHAR(%d)", cols, i, varcharColumnMaxLen) + } + + return fmt.Sprintf("CREATE TABLE %s(id int primary key %s);\n", tableName, cols) +} + +func genInsertSQL(rawBytes int, tableName string, id int) string { + var values string + + for i := 0; i < rawBytes/varcharColumnMaxLen; i++ { + values = fmt.Sprintf("%s, '%s'", values, colValue) + } + + return fmt.Sprintf("INSERT INTO %s VALUES (%d%s);\n", tableName, id, values) +} diff --git a/testing_utils/gen_kafka_big_messages/main_test.go b/testing_utils/gen_kafka_big_messages/main_test.go new file mode 100644 index 00000000000..c7e4d1537b7 --- /dev/null +++ b/testing_utils/gen_kafka_big_messages/main_test.go @@ -0,0 +1,121 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestValidateOptions(t *testing.T) { + testCases := []struct { + o *options + expectedErr string + }{ + { + &options{ + rowBytes: 0, + }, + ".*rowBytes must be greater than zero.*", + }, + { + &options{ + rowBytes: 1024 * 1024, + rowCount: 0, + }, + ".*rowCount must be greater than zero.*", + }, + { + &options{ + rowBytes: 1024 * 1024, + rowCount: 1, + sqlFilePath: "", + }, + ".*please specify the correct file path.*", + }, + { + &options{ + rowBytes: 1024 * 1024, + rowCount: 1, + sqlFilePath: "./test.sql", + databaseName: "", + }, + ".*please specify the database name.*", + }, + { + &options{ + rowBytes: 1024 * 1024, + rowCount: 1, + sqlFilePath: "./test.sql", + databaseName: "kafka-big-messages", + tableName: "", + }, + ".*please specify the table name.*", + }, + { + &options{ + rowBytes: 1024 * 1024, + rowCount: 10, + sqlFilePath: "./test.sql", + databaseName: "kafka-big-messages", + tableName: "test", + }, + "", + }, + } + + for _, tc := range testCases { + err := tc.o.validate() + if tc.expectedErr != "" { + require.Error(t, err) + require.Regexp(t, tc.expectedErr, tc.o.validate().Error()) + } else { + require.Nil(t, err) + } + } +} + +func TestGenDatabaseSql(t *testing.T) { + database := "test" + + sql := genDatabaseSQL(database) + + require.Equal(t, "DROP DATABASE IF EXISTS test;\nCREATE DATABASE test;\nUSE test;\n\n", sql) +} + +func TestGenCreateTableSql(t *testing.T) { + rawBytes := varcharColumnMaxLen + tableName := "test" + + sql := genCreateTableSQL(rawBytes, tableName) + require.Equal(t, "CREATE TABLE test(id int primary key , a0 VARCHAR(16383));\n", sql) +} + +func TestGenInsertSql(t *testing.T) { + // Override the col value to test. + oldColValue := colValue + colValue = "a" + defer func() { + colValue = oldColValue + }() + + rawBytes := varcharColumnMaxLen + tableName := "test" + id := 1 + + sql := genInsertSQL(rawBytes, tableName, id) + println(sql) + require.Equal(t, "INSERT INTO test VALUES (1, 'a');\n", sql) +} diff --git a/tests/integration_tests/kafka_big_messages/conf/diff_config.toml b/tests/integration_tests/kafka_big_messages/conf/diff_config.toml new file mode 100644 index 00000000000..0082a370281 --- /dev/null +++ b/tests/integration_tests/kafka_big_messages/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/kafka_big_messages/sync_diff/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["kafka_big_messages.test"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/kafka_big_messages/run.sh b/tests/integration_tests/kafka_big_messages/run.sh new file mode 100755 index 00000000000..420a1ff2fc8 --- /dev/null +++ b/tests/integration_tests/kafka_big_messages/run.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +set -e + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + # test kafka sink only in this case + if [ "$SINK_TYPE" == "mysql" ]; then + return + fi + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + # Use a max-message-bytes parameter that is larger than the kafka topic max message bytes. + # Test if TiCDC automatically uses the max-message-bytes of the topic. + # See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L178 + # Use a topic that has already been created. + # See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L180 + SINK_URI="kafka://127.0.0.1:9092/big-message-test?partition-num=1&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912" + cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/big-message-test?partition-num=1&version=${KAFKA_VERSION}" + + echo "Starting generate kafka big messages..." + cd $CUR/../../../testing_utils/gen_kafka_big_messages + if [ ! -f ./gen_kafka_big_messages ]; then + GO111MODULE=on go build + fi + # Generate data larger than kafka broker max.message.bytes. We can send this data correctly. + ./gen_kafka_big_messages --row-count=15 --sql-file-path=$CUR/test.sql + + run_sql_file $CUR/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + table="kafka_big_messages.test" + check_table_exists $table ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/kafka_messages/conf/diff_config.toml b/tests/integration_tests/kafka_messages/conf/diff_config.toml index f4a6d29c149..f471166e80c 100644 --- a/tests/integration_tests/kafka_messages/conf/diff_config.toml +++ b/tests/integration_tests/kafka_messages/conf/diff_config.toml @@ -7,7 +7,7 @@ export-fix-sql = true check-struct-only = false [task] - output-dir = "/tmp/tidb_cdc_test/kafka_message/sync_diff/output" + output-dir = "/tmp/tidb_cdc_test/kafka_messages/sync_diff/output" source-instances = ["mysql1"] diff --git a/tests/integration_tests/kafka_messages/run.sh b/tests/integration_tests/kafka_messages/run.sh index 9f952728c9d..60ac56ae820 100755 --- a/tests/integration_tests/kafka_messages/run.sh +++ b/tests/integration_tests/kafka_messages/run.sh @@ -31,10 +31,14 @@ function run_length_limit() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info" TOPIC_NAME="ticdc-kafka-message-test-$RANDOM" - SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" + + # Use a max-message-bytes parameter that is larger than the kafka broker max message bytes. + # Test if TiCDC automatically uses the max-message-bytes of the broker. + # See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L178 + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}" fi # Add a check table to reduce check time, or if we check data with sync diff From 0f8e1b4971a7149ee252cfe404cbf7b682d1f636 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 18 Feb 2022 17:27:40 +0800 Subject: [PATCH 2/2] capture(cdc): add owner info to help debug etcd_worker, and also some in sink. (#4325) (#4365) close pingcap/tiflow#4331 --- cdc/capture/capture.go | 14 +++++++---- cdc/processor/manager.go | 4 +++- cdc/processor/processor.go | 23 ++++++++++++++++++- cdc/sink/manager.go | 13 ++++++++++- cdc/sink/producer/kafka/kafka.go | 18 ++++++++++----- pkg/etcd/client.go | 21 ++++++++++------- pkg/etcd/client_test.go | 4 ++-- pkg/orchestrator/etcd_worker.go | 12 ++++++---- pkg/orchestrator/etcd_worker_bank_test.go | 2 +- pkg/orchestrator/etcd_worker_test.go | 20 +++++++++------- .../cdc_state_checker/cdc_monitor.go | 2 +- 11 files changed, 95 insertions(+), 38 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 0f928e56f86..dfab9fb2afa 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -191,7 +191,7 @@ func (c *Capture) run(stdCtx context.Context) error { // when the etcd worker of processor returns an error, it means that the the processor throws an unrecoverable serious errors // (recoverable errors are intercepted in the processor tick) // so we should also stop the owner and let capture restart or exit - processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval) + processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval, "processor") log.Info("the processor routine has exited", zap.Error(processorErr)) }() go func() { @@ -259,7 +259,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { log.Info("campaign owner successfully", zap.String("capture-id", c.info.ID)) owner := c.newOwner(c.pdClient) c.setOwner(owner) - err = c.runEtcdWorker(ctx, owner, model.NewGlobalState(), ownerFlushInterval) + err = c.runEtcdWorker(ctx, owner, model.NewGlobalState(), ownerFlushInterval, "owner") c.setOwner(nil) log.Info("run owner exited", zap.Error(err)) // if owner exits, resign the owner key @@ -275,13 +275,19 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { } } -func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Reactor, reactorState orchestrator.ReactorState, timerInterval time.Duration) error { +func (c *Capture) runEtcdWorker( + ctx cdcContext.Context, + reactor orchestrator.Reactor, + reactorState orchestrator.ReactorState, + timerInterval time.Duration, + role string, +) error { etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, kv.EtcdKeyBase, reactor, reactorState) if err != nil { return errors.Trace(err) } captureAddr := c.info.AdvertiseAddr - if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr); err != nil { + if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr, role); err != nil { // We check ttl of lease instead of check `session.Done`, because // `session.Done` is only notified when etcd client establish a // new keepalive request, there could be a time window as long as diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index b2c31566c0d..c57e8edafab 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -132,7 +132,9 @@ func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) { if processor, exist := m.processors[changefeedID]; exist { err := processor.Close() if err != nil { - log.Warn("failed to close processor", zap.Error(err)) + log.Warn("failed to close processor", + zap.String("changefeed", changefeedID), + zap.Error(err)) } delete(m.processors, changefeedID) } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index b889adea691..7bea2cf5156 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -295,10 +295,19 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { } opts[sink.OptChangefeedID] = p.changefeed.ID opts[sink.OptCaptureAddr] = ctx.GlobalVars().CaptureInfo.AdvertiseAddr + log.Info("processor try new sink", zap.String("changefeed", p.changefeed.ID)) + + start := time.Now() s, err := sink.NewSink(stdCtx, p.changefeed.ID, p.changefeed.Info.SinkURI, p.filter, p.changefeed.Info.Config, opts, errCh) if err != nil { + log.Info("processor new sink failed", + zap.String("changefeed", p.changefeed.ID), + zap.Duration("duration", time.Since(start))) return errors.Trace(err) } + log.Info("processor try new sink success", + zap.Duration("duration", time.Since(start))) + checkpointTs := p.changefeed.Info.GetCheckpointTs(p.changefeed.Status) captureAddr := ctx.GlobalVars().CaptureInfo.AdvertiseAddr p.sinkManager = sink.NewManager(stdCtx, s, errCh, checkpointTs, captureAddr, p.changefeedID) @@ -779,6 +788,7 @@ func (p *processor) doGCSchemaStorage() { } func (p *processor) Close() error { + log.Info("processor closing ...", zap.String("changefeed", p.changefeedID)) for _, tbl := range p.tables { tbl.Cancel() } @@ -799,7 +809,18 @@ func (p *processor) Close() error { // pass a canceled context is ok here, since we don't need to wait Close ctx, cancel := context.WithCancel(context.Background()) cancel() - return p.sinkManager.Close(ctx) + log.Info("processor try to close the sinkManager", + zap.String("changefeed", p.changefeedID)) + start := time.Now() + if err := p.sinkManager.Close(ctx); err != nil { + log.Info("processor close sinkManager failed", + zap.String("changefeed", p.changefeedID), + zap.Duration("duration", time.Since(start))) + return errors.Trace(err) + } + log.Info("processor close sinkManager success", + zap.String("changefeed", p.changefeedID), + zap.Duration("duration", time.Since(start))) } return nil } diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 78959047142..07ad794f6c4 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -89,7 +89,18 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts) func (m *Manager) Close(ctx context.Context) error { tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID) if m.backendSink != nil { - return m.backendSink.Close(ctx) + log.Info("sinkManager try close bufSink", + zap.String("changefeed", m.changefeedID)) + start := time.Now() + if err := m.backendSink.Close(ctx); err != nil { + log.Info("close bufSink failed", + zap.String("changefeed", m.changefeedID), + zap.Duration("duration", time.Since(start))) + return err + } + log.Info("close bufSink success", + zap.String("changefeed", m.changefeedID), + zap.Duration("duration", time.Since(start))) } return nil } diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 39c5168bcaa..2d71b5b4e55 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -343,13 +343,19 @@ func (k *kafkaSaramaProducer) Close() error { // In fact close sarama sync client doesn't return any error. // But close async client returns error if error channel is not empty, we // don't populate this error to the upper caller, just add a log here. - err1 := k.syncClient.Close() - err2 := k.asyncClient.Close() - if err1 != nil { - log.Error("close sync client with error", zap.Error(err1)) + start := time.Now() + err := k.asyncClient.Close() + if err != nil { + log.Error("close async client with error", zap.Error(err), zap.Duration("duration", time.Since(start))) + } else { + log.Info("async client closed", zap.Duration("duration", time.Since(start))) } - if err2 != nil { - log.Error("close async client with error", zap.Error(err2)) + start = time.Now() + err = k.syncClient.Close() + if err != nil { + log.Error("close sync client with error", zap.Error(err), zap.Duration("duration", time.Since(start))) + } else { + log.Info("sync client closed", zap.Duration("duration", time.Since(start))) } return nil } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index e1e286decb5..de287e6c39e 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -43,10 +43,10 @@ const ( backoffBaseDelayInMs = 500 // in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second backoffMaxDelayInMs = 60 * 1000 - // If no msg comes from a etcd watchCh for etcdWatchChTimeoutDuration long, + // If no msg comes from an etcd watchCh for etcdWatchChTimeoutDuration long, // we should cancel the watchCh and request a new watchCh from etcd client etcdWatchChTimeoutDuration = 10 * time.Second - // If no msg comes from a etcd watchCh for etcdRequestProgressDuration long, + // If no msg comes from an etcd watchCh for etcdRequestProgressDuration long, // we should call RequestProgress of etcd client etcdRequestProgressDuration = 1 * time.Second // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future @@ -176,17 +176,17 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts .. } // Watch delegates request to clientv3.Watcher.Watch -func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { +func (c *Client) Watch(ctx context.Context, key string, role string, opts ...clientv3.OpOption) clientv3.WatchChan { watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize) - go c.WatchWithChan(ctx, watchCh, key, opts...) + go c.WatchWithChan(ctx, watchCh, key, role, opts...) return watchCh } // WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh -func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) { +func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, role string, opts ...clientv3.OpOption) { defer func() { close(outCh) - log.Info("WatchWithChan exited") + log.Info("WatchWithChan exited", zap.String("role", role)) }() var lastRevision int64 watchCtx, cancel := context.WithCancel(ctx) @@ -220,7 +220,9 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR break Loop case <-ticker.C: if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration { - log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime))) + log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", + zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), + zap.String("role", role)) } } } @@ -230,7 +232,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR } if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration { // cancel the last cancel func to reset it - log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack")) + log.Warn("etcd client watchCh blocking too long, reset the watchCh", + zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), + zap.Stack("stack"), + zap.String("role", role)) cancel() watchCtx, cancel = context.WithCancel(ctx) watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index ba14f9e5ebe..786d855921e 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -146,7 +146,7 @@ func (s *etcdSuite) TestWatchChBlocked(c *check.C) { defer cancel() go func() { - watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision)) + watchCli.WatchWithChan(ctx, outCh, key, "", clientv3.WithPrefix(), clientv3.WithRev(revision)) }() receivedRes := make([]clientv3.WatchResponse, 0) // wait for WatchWithChan set up @@ -203,7 +203,7 @@ func (s *etcdSuite) TestOutChBlocked(c *check.C) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() go func() { - watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision)) + watchCli.WatchWithChan(ctx, outCh, key, "", clientv3.WithPrefix(), clientv3.WithRev(revision)) }() receivedRes := make([]clientv3.WatchResponse, 0) // wait for WatchWithChan set up diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index cb402edac0c..edb53c4c49e 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -119,9 +119,8 @@ func (worker *EtcdWorker) initMetrics(captureAddr string) { // A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event. // If the specified etcd session is Done, this Run function will exit with cerrors.ErrEtcdSessionDone. // And the specified etcd session is nil-safety. -func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string) error { +func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string, role string) error { defer worker.cleanUp() - worker.initMetrics(captureAddr) err := worker.syncRawState(ctx) @@ -134,7 +133,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, watchCtx, cancel := context.WithCancel(ctx) defer cancel() - watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) + watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), role, clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) var ( pendingPatches [][]DataPatch @@ -190,7 +189,8 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, log.Info("Stale Etcd event dropped", zap.Int64("event-revision", response.Header.GetRevision()), zap.Int64("previous-revision", worker.revision), - zap.Any("events", response.Events)) + zap.Any("events", response.Events), + zap.String("role", role)) continue } worker.revision = response.Header.GetRevision() @@ -239,7 +239,9 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, nextState, err := worker.reactor.Tick(ctx, worker.state) costTime := time.Since(startTime) if costTime > etcdWorkerLogsWarnDuration { - log.Warn("EtcdWorker reactor tick took too long", zap.Duration("duration", costTime)) + log.Warn("EtcdWorker reactor tick took too long", + zap.Duration("duration", costTime), + zap.String("role", role)) } worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime.Seconds()) if err != nil { diff --git a/pkg/orchestrator/etcd_worker_bank_test.go b/pkg/orchestrator/etcd_worker_bank_test.go index 7321ce6033c..f47fff954ac 100644 --- a/pkg/orchestrator/etcd_worker_bank_test.go +++ b/pkg/orchestrator/etcd_worker_bank_test.go @@ -157,7 +157,7 @@ func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) { accountNumber: totalAccountNumber, }, &bankReactorState{c: c, index: i, account: make([]int, totalAccountNumber)}) c.Assert(err, check.IsNil) - err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1") + err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1", "") if err == nil || err.Error() == "etcdserver: request timed out" { continue } diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index d7715eb8d0b..d5b00f2f3c7 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -267,7 +267,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) { return errors.Trace(err) } - return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")) + return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")) }) } @@ -352,7 +352,7 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) { c.Assert(err, check.IsNil) errg := &errgroup.Group{} errg.Go(func() error { - return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") + return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "") }) time.Sleep(500 * time.Millisecond) @@ -437,7 +437,8 @@ func (s *etcdWorkerSuite) TestFinished(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") + + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -506,7 +507,8 @@ func (s *etcdWorkerSuite) TestCover(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") + + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -585,7 +587,8 @@ func (s *etcdWorkerSuite) TestEmptyTxn(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") + + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -652,7 +655,8 @@ func (s *etcdWorkerSuite) TestEmptyOrNil(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") + + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -733,7 +737,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) { wg.Add(1) go func() { defer wg.Done() - err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1") + err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1", "") c.Assert(err, check.IsNil) }() @@ -748,7 +752,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) { }) c.Assert(err, check.IsNil) - err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1") + err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1", "") c.Assert(err, check.IsNil) modifyReactor.waitOnCh <- struct{}{} diff --git a/testing_utils/cdc_state_checker/cdc_monitor.go b/testing_utils/cdc_state_checker/cdc_monitor.go index c628c4e67a0..7ef244013cd 100644 --- a/testing_utils/cdc_state_checker/cdc_monitor.go +++ b/testing_utils/cdc_state_checker/cdc_monitor.go @@ -92,7 +92,7 @@ func newCDCMonitor(ctx context.Context, pd string, credential *security.Credenti func (m *cdcMonitor) run(ctx context.Context) error { log.Debug("start running cdcMonitor") - err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1") + err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1", "") log.Error("etcdWorker exited: test-case-failed", zap.Error(err)) log.Info("CDC state", zap.Reflect("state", m.reactor.state)) return err