Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

capture(cdc): add owner info to help debug etcd_worker, and also some in sink. (#4325) #4365

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *Capture) run(stdCtx context.Context) error {
// when the etcd worker of processor returns an error, it means that the the processor throws an unrecoverable serious errors
// (recoverable errors are intercepted in the processor tick)
// so we should also stop the owner and let capture restart or exit
processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval)
processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval, "processor")
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
go func() {
Expand Down Expand Up @@ -259,7 +259,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
log.Info("campaign owner successfully", zap.String("capture-id", c.info.ID))
owner := c.newOwner(c.pdClient)
c.setOwner(owner)
err = c.runEtcdWorker(ctx, owner, model.NewGlobalState(), ownerFlushInterval)
err = c.runEtcdWorker(ctx, owner, model.NewGlobalState(), ownerFlushInterval, "owner")
c.setOwner(nil)
log.Info("run owner exited", zap.Error(err))
// if owner exits, resign the owner key
Expand All @@ -275,13 +275,19 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
}
}

func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Reactor, reactorState orchestrator.ReactorState, timerInterval time.Duration) error {
func (c *Capture) runEtcdWorker(
ctx cdcContext.Context,
reactor orchestrator.Reactor,
reactorState orchestrator.ReactorState,
timerInterval time.Duration,
role string,
) error {
etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, kv.EtcdKeyBase, reactor, reactorState)
if err != nil {
return errors.Trace(err)
}
captureAddr := c.info.AdvertiseAddr
if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr); err != nil {
if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr, role); err != nil {
// We check ttl of lease instead of check `session.Done`, because
// `session.Done` is only notified when etcd client establish a
// new keepalive request, there could be a time window as long as
Expand Down
4 changes: 3 additions & 1 deletion cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) {
if processor, exist := m.processors[changefeedID]; exist {
err := processor.Close()
if err != nil {
log.Warn("failed to close processor", zap.Error(err))
log.Warn("failed to close processor",
zap.String("changefeed", changefeedID),
zap.Error(err))
}
delete(m.processors, changefeedID)
}
Expand Down
23 changes: 22 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,19 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
}
opts[sink.OptChangefeedID] = p.changefeed.ID
opts[sink.OptCaptureAddr] = ctx.GlobalVars().CaptureInfo.AdvertiseAddr
log.Info("processor try new sink", zap.String("changefeed", p.changefeed.ID))

start := time.Now()
s, err := sink.NewSink(stdCtx, p.changefeed.ID, p.changefeed.Info.SinkURI, p.filter, p.changefeed.Info.Config, opts, errCh)
if err != nil {
log.Info("processor new sink failed",
zap.String("changefeed", p.changefeed.ID),
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
log.Info("processor try new sink success",
zap.Duration("duration", time.Since(start)))

checkpointTs := p.changefeed.Info.GetCheckpointTs(p.changefeed.Status)
captureAddr := ctx.GlobalVars().CaptureInfo.AdvertiseAddr
p.sinkManager = sink.NewManager(stdCtx, s, errCh, checkpointTs, captureAddr, p.changefeedID)
Expand Down Expand Up @@ -779,6 +788,7 @@ func (p *processor) doGCSchemaStorage() {
}

func (p *processor) Close() error {
log.Info("processor closing ...", zap.String("changefeed", p.changefeedID))
for _, tbl := range p.tables {
tbl.Cancel()
}
Expand All @@ -799,7 +809,18 @@ func (p *processor) Close() error {
// pass a canceled context is ok here, since we don't need to wait Close
ctx, cancel := context.WithCancel(context.Background())
cancel()
return p.sinkManager.Close(ctx)
log.Info("processor try to close the sinkManager",
zap.String("changefeed", p.changefeedID))
start := time.Now()
if err := p.sinkManager.Close(ctx); err != nil {
log.Info("processor close sinkManager failed",
zap.String("changefeed", p.changefeedID),
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
log.Info("processor close sinkManager success",
zap.String("changefeed", p.changefeedID),
zap.Duration("duration", time.Since(start)))
}
return nil
}
Expand Down
13 changes: 12 additions & 1 deletion cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,18 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts)
func (m *Manager) Close(ctx context.Context) error {
tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID)
if m.backendSink != nil {
return m.backendSink.Close(ctx)
log.Info("sinkManager try close bufSink",
zap.String("changefeed", m.changefeedID))
start := time.Now()
if err := m.backendSink.Close(ctx); err != nil {
log.Info("close bufSink failed",
zap.String("changefeed", m.changefeedID),
zap.Duration("duration", time.Since(start)))
return err
}
log.Info("close bufSink success",
zap.String("changefeed", m.changefeedID),
zap.Duration("duration", time.Since(start)))
}
return nil
}
Expand Down
18 changes: 12 additions & 6 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,19 @@ func (k *kafkaSaramaProducer) Close() error {
// In fact close sarama sync client doesn't return any error.
// But close async client returns error if error channel is not empty, we
// don't populate this error to the upper caller, just add a log here.
err1 := k.syncClient.Close()
err2 := k.asyncClient.Close()
if err1 != nil {
log.Error("close sync client with error", zap.Error(err1))
start := time.Now()
err := k.asyncClient.Close()
if err != nil {
log.Error("close async client with error", zap.Error(err), zap.Duration("duration", time.Since(start)))
} else {
log.Info("async client closed", zap.Duration("duration", time.Since(start)))
}
if err2 != nil {
log.Error("close async client with error", zap.Error(err2))
start = time.Now()
err = k.syncClient.Close()
if err != nil {
log.Error("close sync client with error", zap.Error(err), zap.Duration("duration", time.Since(start)))
} else {
log.Info("sync client closed", zap.Duration("duration", time.Since(start)))
}
return nil
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ const (
backoffBaseDelayInMs = 500
// in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second
backoffMaxDelayInMs = 60 * 1000
// If no msg comes from a etcd watchCh for etcdWatchChTimeoutDuration long,
// If no msg comes from an etcd watchCh for etcdWatchChTimeoutDuration long,
// we should cancel the watchCh and request a new watchCh from etcd client
etcdWatchChTimeoutDuration = 10 * time.Second
// If no msg comes from a etcd watchCh for etcdRequestProgressDuration long,
// If no msg comes from an etcd watchCh for etcdRequestProgressDuration long,
// we should call RequestProgress of etcd client
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
Expand Down Expand Up @@ -176,17 +176,17 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ..
}

// Watch delegates request to clientv3.Watcher.Watch
func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
func (c *Client) Watch(ctx context.Context, key string, role string, opts ...clientv3.OpOption) clientv3.WatchChan {
watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize)
go c.WatchWithChan(ctx, watchCh, key, opts...)
go c.WatchWithChan(ctx, watchCh, key, role, opts...)
return watchCh
}

// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh
func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) {
func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, role string, opts ...clientv3.OpOption) {
defer func() {
close(outCh)
log.Info("WatchWithChan exited")
log.Info("WatchWithChan exited", zap.String("role", role))
}()
var lastRevision int64
watchCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -220,7 +220,9 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
break Loop
case <-ticker.C:
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)))
log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck",
zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)),
zap.String("role", role))
}
}
}
Expand All @@ -230,7 +232,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
}
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
// cancel the last cancel func to reset it
log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack"))
log.Warn("etcd client watchCh blocking too long, reset the watchCh",
zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)),
zap.Stack("stack"),
zap.String("role", role))
cancel()
watchCtx, cancel = context.WithCancel(ctx)
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1))
Expand Down
4 changes: 2 additions & 2 deletions pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *etcdSuite) TestWatchChBlocked(c *check.C) {
defer cancel()

go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
watchCli.WatchWithChan(ctx, outCh, key, "", clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
receivedRes := make([]clientv3.WatchResponse, 0)
// wait for WatchWithChan set up
Expand Down Expand Up @@ -203,7 +203,7 @@ func (s *etcdSuite) TestOutChBlocked(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
watchCli.WatchWithChan(ctx, outCh, key, "", clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
receivedRes := make([]clientv3.WatchResponse, 0)
// wait for WatchWithChan set up
Expand Down
12 changes: 7 additions & 5 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ func (worker *EtcdWorker) initMetrics(captureAddr string) {
// A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event.
// If the specified etcd session is Done, this Run function will exit with cerrors.ErrEtcdSessionDone.
// And the specified etcd session is nil-safety.
func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string) error {
func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string, role string) error {
defer worker.cleanUp()

worker.initMetrics(captureAddr)

err := worker.syncRawState(ctx)
Expand All @@ -134,7 +133,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,

watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))
watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), role, clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))

var (
pendingPatches [][]DataPatch
Expand Down Expand Up @@ -190,7 +189,8 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
log.Info("Stale Etcd event dropped",
zap.Int64("event-revision", response.Header.GetRevision()),
zap.Int64("previous-revision", worker.revision),
zap.Any("events", response.Events))
zap.Any("events", response.Events),
zap.String("role", role))
continue
}
worker.revision = response.Header.GetRevision()
Expand Down Expand Up @@ -239,7 +239,9 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
nextState, err := worker.reactor.Tick(ctx, worker.state)
costTime := time.Since(startTime)
if costTime > etcdWorkerLogsWarnDuration {
log.Warn("EtcdWorker reactor tick took too long", zap.Duration("duration", costTime))
log.Warn("EtcdWorker reactor tick took too long",
zap.Duration("duration", costTime),
zap.String("role", role))
}
worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime.Seconds())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/etcd_worker_bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) {
accountNumber: totalAccountNumber,
}, &bankReactorState{c: c, index: i, account: make([]int, totalAccountNumber)})
c.Assert(err, check.IsNil)
err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1")
err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1", "")
if err == nil || err.Error() == "etcdserver: request timed out" {
continue
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
return errors.Trace(err)
}

return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1"))
return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", ""))
})
}

Expand Down Expand Up @@ -352,7 +352,7 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) {
c.Assert(err, check.IsNil)
errg := &errgroup.Group{}
errg.Go(func() error {
return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")
return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
})

time.Sleep(500 * time.Millisecond)
Expand Down Expand Up @@ -437,7 +437,8 @@ func (s *etcdWorkerSuite) TestFinished(c *check.C) {
state: make(map[string]string),
})
c.Assert(err, check.IsNil)
err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")

err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
c.Assert(err, check.IsNil)
resp, err := cli.Get(ctx, prefix+"/key1")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -506,7 +507,8 @@ func (s *etcdWorkerSuite) TestCover(c *check.C) {
state: make(map[string]string),
})
c.Assert(err, check.IsNil)
err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")

err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
c.Assert(err, check.IsNil)
resp, err := cli.Get(ctx, prefix+"/key1")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -585,7 +587,8 @@ func (s *etcdWorkerSuite) TestEmptyTxn(c *check.C) {
state: make(map[string]string),
})
c.Assert(err, check.IsNil)
err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")

err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
c.Assert(err, check.IsNil)
resp, err := cli.Get(ctx, prefix+"/key1")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -652,7 +655,8 @@ func (s *etcdWorkerSuite) TestEmptyOrNil(c *check.C) {
state: make(map[string]string),
})
c.Assert(err, check.IsNil)
err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")

err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
c.Assert(err, check.IsNil)
resp, err := cli.Get(ctx, prefix+"/key1")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -733,7 +737,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) {
wg.Add(1)
go func() {
defer wg.Done()
err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1")
err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1", "")
c.Assert(err, check.IsNil)
}()

Expand All @@ -748,7 +752,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) {
})
c.Assert(err, check.IsNil)

err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1")
err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1", "")
c.Assert(err, check.IsNil)

modifyReactor.waitOnCh <- struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion testing_utils/cdc_state_checker/cdc_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func newCDCMonitor(ctx context.Context, pd string, credential *security.Credenti

func (m *cdcMonitor) run(ctx context.Context) error {
log.Debug("start running cdcMonitor")
err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1")
err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1", "")
log.Error("etcdWorker exited: test-case-failed", zap.Error(err))
log.Info("CDC state", zap.Reflect("state", m.reactor.state))
return err
Expand Down