From d5d5ab93198a711d1daab5bfe5df74d9a0d41548 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Mon, 17 Jan 2022 18:25:44 +0800 Subject: [PATCH] capture(cdc): add owner info to help debug etcd_worker, and also some in sink. (#4325) close pingcap/tiflow#4331 --- cdc/capture/capture.go | 14 +++++++++---- cdc/owner/ddl_sink.go | 6 ++++-- cdc/processor/manager.go | 4 +++- cdc/processor/processor.go | 19 ++++++++++++++++++ cdc/sink/manager.go | 14 ++++++++++++- cdc/sink/producer/kafka/kafka.go | 18 +++++++++++------ pkg/etcd/client.go | 21 ++++++++++++-------- pkg/etcd/client_test.go | 6 +++--- pkg/orchestrator/etcd_worker.go | 12 ++++++----- pkg/orchestrator/etcd_worker_bank_test.go | 2 +- pkg/orchestrator/etcd_worker_test.go | 20 +++++++++++-------- tests/utils/cdc_state_checker/cdc_monitor.go | 2 +- 12 files changed, 98 insertions(+), 40 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 413e7d8f1bc..97b17835f2d 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -313,7 +313,7 @@ func (c *Capture) run(stdCtx context.Context) error { // when the etcd worker of processor returns an error, it means that the processor throws an unrecoverable serious errors // (recoverable errors are intercepted in the processor tick) // so we should also stop the processor and let capture restart or exit - processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval) + processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, "processor") log.Info("the processor routine has exited", zap.Error(processorErr)) }() wg.Add(1) @@ -425,7 +425,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { }) } - err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval) + err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, "owner") c.setOwner(nil) log.Info("run owner exited", zap.Error(err)) // if owner exits, resign the owner key @@ -441,13 +441,19 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { } } -func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Reactor, reactorState orchestrator.ReactorState, timerInterval time.Duration) error { +func (c *Capture) runEtcdWorker( + ctx cdcContext.Context, + reactor orchestrator.Reactor, + reactorState orchestrator.ReactorState, + timerInterval time.Duration, + role string, +) error { etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, etcd.EtcdKeyBase, reactor, reactorState) if err != nil { return errors.Trace(err) } captureAddr := c.info.AdvertiseAddr - if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr); err != nil { + if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr, role); err != nil { // We check ttl of lease instead of check `session.Done`, because // `session.Done` is only notified when etcd client establish a // new keepalive request, there could be a time window as long as diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index 3ecdc2535e6..415bfcc5d88 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -121,11 +121,13 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m start := time.Now() if err := s.sinkInitHandler(ctx, s, id, info); err != nil { - log.Warn("ddl sink initialize failed", zap.Duration("duration", time.Since(start))) + log.Warn("ddl sink initialize failed", + zap.Duration("duration", time.Since(start))) ctx.Throw(err) return } - log.Info("ddl sink initialized, start processing...", zap.Duration("duration", time.Since(start))) + log.Info("ddl sink initialized, start processing...", + zap.Duration("duration", time.Since(start))) // TODO make the tick duration configurable ticker := time.NewTicker(time.Second) diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index fd5f84ca527..935e484bb0e 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -131,7 +131,9 @@ func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) { if processor, exist := m.processors[changefeedID]; exist { err := processor.Close() if err != nil { - log.Warn("failed to close processor", zap.Error(err)) + log.Warn("failed to close processor", + zap.String("changefeed", changefeedID), + zap.Error(err)) } delete(m.processors, changefeedID) } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 37b69a7e157..ffe0d848f57 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -473,10 +473,19 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { } opts[sink.OptChangefeedID] = p.changefeed.ID opts[sink.OptCaptureAddr] = ctx.GlobalVars().CaptureInfo.AdvertiseAddr + log.Info("processor try new sink", zap.String("changefeed", p.changefeed.ID)) + + start := time.Now() s, err := sink.New(stdCtx, p.changefeed.ID, p.changefeed.Info.SinkURI, p.filter, p.changefeed.Info.Config, opts, errCh) if err != nil { + log.Info("processor new sink failed", + zap.String("changefeed", p.changefeed.ID), + zap.Duration("duration", time.Since(start))) return errors.Trace(err) } + log.Info("processor try new sink success", + zap.Duration("duration", time.Since(start))) + checkpointTs := p.changefeed.Info.GetCheckpointTs(p.changefeed.Status) captureAddr := ctx.GlobalVars().CaptureInfo.AdvertiseAddr p.sinkManager = sink.NewManager(stdCtx, s, errCh, checkpointTs, captureAddr, p.changefeedID) @@ -1040,6 +1049,7 @@ func (p *processor) flushRedoLogMeta(ctx context.Context) error { } func (p *processor) Close() error { + log.Info("processor closing ...", zap.String("changefeed", p.changefeedID)) for _, tbl := range p.tables { tbl.Cancel() } @@ -1061,9 +1071,18 @@ func (p *processor) Close() error { // pass a canceled context is ok here, since we don't need to wait Close ctx, cancel := context.WithCancel(context.Background()) cancel() + log.Info("processor try to close the sinkManager", + zap.String("changefeed", p.changefeedID)) + start := time.Now() if err := p.sinkManager.Close(ctx); err != nil { + log.Info("processor close sinkManager failed", + zap.String("changefeed", p.changefeedID), + zap.Duration("duration", time.Since(start))) return errors.Trace(err) } + log.Info("processor close sinkManager success", + zap.String("changefeed", p.changefeedID), + zap.Duration("duration", time.Since(start))) } if p.newSchedulerEnabled { if p.agent == nil { diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 74b7168aa67..2e315c867be 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -17,6 +17,7 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -85,7 +86,18 @@ func (m *Manager) Close(ctx context.Context) error { defer m.tableSinksMu.Unlock() tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID) if m.bufSink != nil { - return m.bufSink.Close(ctx) + log.Info("sinkManager try close bufSink", + zap.String("changefeed", m.changefeedID)) + start := time.Now() + if err := m.bufSink.Close(ctx); err != nil { + log.Info("close bufSink failed", + zap.String("changefeed", m.changefeedID), + zap.Duration("duration", time.Since(start))) + return err + } + log.Info("close bufSink success", + zap.String("changefeed", m.changefeedID), + zap.Duration("duration", time.Since(start))) } return nil } diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index b17763263e7..e921fa60382 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -213,13 +213,19 @@ func (k *kafkaSaramaProducer) Close() error { // In fact close sarama sync client doesn't return any error. // But close async client returns error if error channel is not empty, we // don't populate this error to the upper caller, just add a log here. - err1 := k.syncClient.Close() - err2 := k.asyncClient.Close() - if err1 != nil { - log.Error("close sync client with error", zap.Error(err1)) + start := time.Now() + err := k.asyncClient.Close() + if err != nil { + log.Error("close async client with error", zap.Error(err), zap.Duration("duration", time.Since(start))) + } else { + log.Info("async client closed", zap.Duration("duration", time.Since(start))) } - if err2 != nil { - log.Error("close async client with error", zap.Error(err2)) + start = time.Now() + err = k.syncClient.Close() + if err != nil { + log.Error("close sync client with error", zap.Error(err), zap.Duration("duration", time.Since(start))) + } else { + log.Info("sync client closed", zap.Duration("duration", time.Since(start))) } return nil } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 7e63e906861..b346ac6a3cb 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -43,10 +43,10 @@ const ( backoffBaseDelayInMs = 500 // in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second backoffMaxDelayInMs = 60 * 1000 - // If no msg comes from a etcd watchCh for etcdWatchChTimeoutDuration long, + // If no msg comes from an etcd watchCh for etcdWatchChTimeoutDuration long, // we should cancel the watchCh and request a new watchCh from etcd client etcdWatchChTimeoutDuration = 10 * time.Second - // If no msg comes from a etcd watchCh for etcdRequestProgressDuration long, + // If no msg comes from an etcd watchCh for etcdRequestProgressDuration long, // we should call RequestProgress of etcd client etcdRequestProgressDuration = 1 * time.Second // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future @@ -176,17 +176,17 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts .. } // Watch delegates request to clientv3.Watcher.Watch -func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { +func (c *Client) Watch(ctx context.Context, key string, role string, opts ...clientv3.OpOption) clientv3.WatchChan { watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize) - go c.WatchWithChan(ctx, watchCh, key, opts...) + go c.WatchWithChan(ctx, watchCh, key, role, opts...) return watchCh } // WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh -func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) { +func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, role string, opts ...clientv3.OpOption) { defer func() { close(outCh) - log.Info("WatchWithChan exited") + log.Info("WatchWithChan exited", zap.String("role", role)) }() // get initial revision from opts to avoid revision fall back @@ -223,7 +223,9 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR break Loop case <-ticker.C: if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration { - log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime))) + log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", + zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), + zap.String("role", role)) } } } @@ -235,7 +237,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR } if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration { // cancel the last cancel func to reset it - log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack")) + log.Warn("etcd client watchCh blocking too long, reset the watchCh", + zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), + zap.Stack("stack"), + zap.String("role", role)) cancel() watchCtx, cancel = context.WithCancel(ctx) watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index be35528d16d..efa03c6d795 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -155,7 +155,7 @@ func TestWatchChBlocked(t *testing.T) { defer cancel() go func() { - watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision)) + watchCli.WatchWithChan(ctx, outCh, key, "", clientv3.WithPrefix(), clientv3.WithRev(revision)) }() receivedRes := make([]clientv3.WatchResponse, 0) // wait for WatchWithChan set up @@ -213,7 +213,7 @@ func TestOutChBlocked(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() go func() { - watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision)) + watchCli.WatchWithChan(ctx, outCh, key, "", clientv3.WithPrefix(), clientv3.WithRev(revision)) }() receivedRes := make([]clientv3.WatchResponse, 0) // wait for WatchWithChan set up @@ -263,7 +263,7 @@ func TestRevisionNotFallBack(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() go func() { - watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision)) + watchCli.WatchWithChan(ctx, outCh, key, "", clientv3.WithPrefix(), clientv3.WithRev(revision)) }() // wait for WatchWithChan set up <-outCh diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 3c30cd602b0..f5d47146445 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -119,9 +119,8 @@ func (worker *EtcdWorker) initMetrics(captureAddr string) { // A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event. // If the specified etcd session is Done, this Run function will exit with cerrors.ErrEtcdSessionDone. // And the specified etcd session is nil-safety. -func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string) error { +func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string, role string) error { defer worker.cleanUp() - worker.initMetrics(captureAddr) err := worker.syncRawState(ctx) @@ -134,7 +133,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, watchCtx, cancel := context.WithCancel(ctx) defer cancel() - watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) + watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), role, clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) var ( pendingPatches [][]DataPatch @@ -190,7 +189,8 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, log.Info("Stale Etcd event dropped", zap.Int64("event-revision", response.Header.GetRevision()), zap.Int64("previous-revision", worker.revision), - zap.Any("events", response.Events)) + zap.Any("events", response.Events), + zap.String("role", role)) continue } worker.revision = response.Header.GetRevision() @@ -239,7 +239,9 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, nextState, err := worker.reactor.Tick(ctx, worker.state) costTime := time.Since(startTime) if costTime > etcdWorkerLogsWarnDuration { - log.Warn("EtcdWorker reactor tick took too long", zap.Duration("duration", costTime)) + log.Warn("EtcdWorker reactor tick took too long", + zap.Duration("duration", costTime), + zap.String("role", role)) } worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime.Seconds()) if err != nil { diff --git a/pkg/orchestrator/etcd_worker_bank_test.go b/pkg/orchestrator/etcd_worker_bank_test.go index 0fa2243815e..fbbcec988c5 100644 --- a/pkg/orchestrator/etcd_worker_bank_test.go +++ b/pkg/orchestrator/etcd_worker_bank_test.go @@ -157,7 +157,7 @@ func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) { accountNumber: totalAccountNumber, }, &bankReactorState{c: c, index: i, account: make([]int, totalAccountNumber)}) c.Assert(err, check.IsNil) - err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1") + err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1", "") if err == nil || err.Error() == "etcdserver: request timed out" { continue } diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index b85594ae0ac..e65b463c54e 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -268,7 +268,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) { return errors.Trace(err) } - return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")) + return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")) }) } @@ -353,7 +353,7 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) { c.Assert(err, check.IsNil) errg := &errgroup.Group{} errg.Go(func() error { - return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") + return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "") }) time.Sleep(500 * time.Millisecond) @@ -438,7 +438,8 @@ func (s *etcdWorkerSuite) TestFinished(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") + + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -507,7 +508,8 @@ func (s *etcdWorkerSuite) TestCover(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") + + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -586,7 +588,8 @@ func (s *etcdWorkerSuite) TestEmptyTxn(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") + + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -653,7 +656,8 @@ func (s *etcdWorkerSuite) TestEmptyOrNil(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") + + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -734,7 +738,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) { wg.Add(1) go func() { defer wg.Done() - err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1") + err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1", "") c.Assert(err, check.IsNil) }() @@ -749,7 +753,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) { }) c.Assert(err, check.IsNil) - err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1") + err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1", "") c.Assert(err, check.IsNil) modifyReactor.waitOnCh <- struct{}{} diff --git a/tests/utils/cdc_state_checker/cdc_monitor.go b/tests/utils/cdc_state_checker/cdc_monitor.go index b5bc997860e..1b9375ccc63 100644 --- a/tests/utils/cdc_state_checker/cdc_monitor.go +++ b/tests/utils/cdc_state_checker/cdc_monitor.go @@ -89,7 +89,7 @@ func newCDCMonitor(ctx context.Context, pd string, credential *security.Credenti func (m *cdcMonitor) run(ctx context.Context) error { log.Debug("start running cdcMonitor") - err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1") + err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1", "") log.Error("etcdWorker exited: test-case-failed", zap.Error(err)) log.Info("CDC state", zap.Reflect("state", m.reactor.state)) return err