Skip to content

Commit

Permalink
capture(cdc): add owner info to help debug etcd_worker, and also some…
Browse files Browse the repository at this point in the history
… in sink. (pingcap#4325)

close pingcap#4331
  • Loading branch information
3AceShowHand authored and buchuitoudegou committed Jan 18, 2022
1 parent 401cf71 commit d5d5ab9
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 40 deletions.
14 changes: 10 additions & 4 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (c *Capture) run(stdCtx context.Context) error {
// when the etcd worker of processor returns an error, it means that the processor throws an unrecoverable serious errors
// (recoverable errors are intercepted in the processor tick)
// so we should also stop the processor and let capture restart or exit
processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval)
processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, "processor")
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
wg.Add(1)
Expand Down Expand Up @@ -425,7 +425,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
})
}

err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval)
err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, "owner")
c.setOwner(nil)
log.Info("run owner exited", zap.Error(err))
// if owner exits, resign the owner key
Expand All @@ -441,13 +441,19 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
}
}

func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Reactor, reactorState orchestrator.ReactorState, timerInterval time.Duration) error {
func (c *Capture) runEtcdWorker(
ctx cdcContext.Context,
reactor orchestrator.Reactor,
reactorState orchestrator.ReactorState,
timerInterval time.Duration,
role string,
) error {
etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, etcd.EtcdKeyBase, reactor, reactorState)
if err != nil {
return errors.Trace(err)
}
captureAddr := c.info.AdvertiseAddr
if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr); err != nil {
if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr, role); err != nil {
// We check ttl of lease instead of check `session.Done`, because
// `session.Done` is only notified when etcd client establish a
// new keepalive request, there could be a time window as long as
Expand Down
6 changes: 4 additions & 2 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,13 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m

start := time.Now()
if err := s.sinkInitHandler(ctx, s, id, info); err != nil {
log.Warn("ddl sink initialize failed", zap.Duration("duration", time.Since(start)))
log.Warn("ddl sink initialize failed",
zap.Duration("duration", time.Since(start)))
ctx.Throw(err)
return
}
log.Info("ddl sink initialized, start processing...", zap.Duration("duration", time.Since(start)))
log.Info("ddl sink initialized, start processing...",
zap.Duration("duration", time.Since(start)))

// TODO make the tick duration configurable
ticker := time.NewTicker(time.Second)
Expand Down
4 changes: 3 additions & 1 deletion cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) {
if processor, exist := m.processors[changefeedID]; exist {
err := processor.Close()
if err != nil {
log.Warn("failed to close processor", zap.Error(err))
log.Warn("failed to close processor",
zap.String("changefeed", changefeedID),
zap.Error(err))
}
delete(m.processors, changefeedID)
}
Expand Down
19 changes: 19 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,19 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
}
opts[sink.OptChangefeedID] = p.changefeed.ID
opts[sink.OptCaptureAddr] = ctx.GlobalVars().CaptureInfo.AdvertiseAddr
log.Info("processor try new sink", zap.String("changefeed", p.changefeed.ID))

start := time.Now()
s, err := sink.New(stdCtx, p.changefeed.ID, p.changefeed.Info.SinkURI, p.filter, p.changefeed.Info.Config, opts, errCh)
if err != nil {
log.Info("processor new sink failed",
zap.String("changefeed", p.changefeed.ID),
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
log.Info("processor try new sink success",
zap.Duration("duration", time.Since(start)))

checkpointTs := p.changefeed.Info.GetCheckpointTs(p.changefeed.Status)
captureAddr := ctx.GlobalVars().CaptureInfo.AdvertiseAddr
p.sinkManager = sink.NewManager(stdCtx, s, errCh, checkpointTs, captureAddr, p.changefeedID)
Expand Down Expand Up @@ -1040,6 +1049,7 @@ func (p *processor) flushRedoLogMeta(ctx context.Context) error {
}

func (p *processor) Close() error {
log.Info("processor closing ...", zap.String("changefeed", p.changefeedID))
for _, tbl := range p.tables {
tbl.Cancel()
}
Expand All @@ -1061,9 +1071,18 @@ func (p *processor) Close() error {
// pass a canceled context is ok here, since we don't need to wait Close
ctx, cancel := context.WithCancel(context.Background())
cancel()
log.Info("processor try to close the sinkManager",
zap.String("changefeed", p.changefeedID))
start := time.Now()
if err := p.sinkManager.Close(ctx); err != nil {
log.Info("processor close sinkManager failed",
zap.String("changefeed", p.changefeedID),
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
log.Info("processor close sinkManager success",
zap.String("changefeed", p.changefeedID),
zap.Duration("duration", time.Since(start)))
}
if p.newSchedulerEnabled {
if p.agent == nil {
Expand Down
14 changes: 13 additions & 1 deletion cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -85,7 +86,18 @@ func (m *Manager) Close(ctx context.Context) error {
defer m.tableSinksMu.Unlock()
tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID)
if m.bufSink != nil {
return m.bufSink.Close(ctx)
log.Info("sinkManager try close bufSink",
zap.String("changefeed", m.changefeedID))
start := time.Now()
if err := m.bufSink.Close(ctx); err != nil {
log.Info("close bufSink failed",
zap.String("changefeed", m.changefeedID),
zap.Duration("duration", time.Since(start)))
return err
}
log.Info("close bufSink success",
zap.String("changefeed", m.changefeedID),
zap.Duration("duration", time.Since(start)))
}
return nil
}
Expand Down
18 changes: 12 additions & 6 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,19 @@ func (k *kafkaSaramaProducer) Close() error {
// In fact close sarama sync client doesn't return any error.
// But close async client returns error if error channel is not empty, we
// don't populate this error to the upper caller, just add a log here.
err1 := k.syncClient.Close()
err2 := k.asyncClient.Close()
if err1 != nil {
log.Error("close sync client with error", zap.Error(err1))
start := time.Now()
err := k.asyncClient.Close()
if err != nil {
log.Error("close async client with error", zap.Error(err), zap.Duration("duration", time.Since(start)))
} else {
log.Info("async client closed", zap.Duration("duration", time.Since(start)))
}
if err2 != nil {
log.Error("close async client with error", zap.Error(err2))
start = time.Now()
err = k.syncClient.Close()
if err != nil {
log.Error("close sync client with error", zap.Error(err), zap.Duration("duration", time.Since(start)))
} else {
log.Info("sync client closed", zap.Duration("duration", time.Since(start)))
}
return nil
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ const (
backoffBaseDelayInMs = 500
// in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second
backoffMaxDelayInMs = 60 * 1000
// If no msg comes from a etcd watchCh for etcdWatchChTimeoutDuration long,
// If no msg comes from an etcd watchCh for etcdWatchChTimeoutDuration long,
// we should cancel the watchCh and request a new watchCh from etcd client
etcdWatchChTimeoutDuration = 10 * time.Second
// If no msg comes from a etcd watchCh for etcdRequestProgressDuration long,
// If no msg comes from an etcd watchCh for etcdRequestProgressDuration long,
// we should call RequestProgress of etcd client
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
Expand Down Expand Up @@ -176,17 +176,17 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ..
}

// Watch delegates request to clientv3.Watcher.Watch
func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
func (c *Client) Watch(ctx context.Context, key string, role string, opts ...clientv3.OpOption) clientv3.WatchChan {
watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize)
go c.WatchWithChan(ctx, watchCh, key, opts...)
go c.WatchWithChan(ctx, watchCh, key, role, opts...)
return watchCh
}

// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh
func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) {
func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, role string, opts ...clientv3.OpOption) {
defer func() {
close(outCh)
log.Info("WatchWithChan exited")
log.Info("WatchWithChan exited", zap.String("role", role))
}()

// get initial revision from opts to avoid revision fall back
Expand Down Expand Up @@ -223,7 +223,9 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
break Loop
case <-ticker.C:
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)))
log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck",
zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)),
zap.String("role", role))
}
}
}
Expand All @@ -235,7 +237,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
}
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
// cancel the last cancel func to reset it
log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack"))
log.Warn("etcd client watchCh blocking too long, reset the watchCh",
zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)),
zap.Stack("stack"),
zap.String("role", role))
cancel()
watchCtx, cancel = context.WithCancel(ctx)
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision))
Expand Down
6 changes: 3 additions & 3 deletions pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestWatchChBlocked(t *testing.T) {
defer cancel()

go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
watchCli.WatchWithChan(ctx, outCh, key, "", clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
receivedRes := make([]clientv3.WatchResponse, 0)
// wait for WatchWithChan set up
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestOutChBlocked(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
watchCli.WatchWithChan(ctx, outCh, key, "", clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
receivedRes := make([]clientv3.WatchResponse, 0)
// wait for WatchWithChan set up
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestRevisionNotFallBack(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
watchCli.WatchWithChan(ctx, outCh, key, "", clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
// wait for WatchWithChan set up
<-outCh
Expand Down
12 changes: 7 additions & 5 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ func (worker *EtcdWorker) initMetrics(captureAddr string) {
// A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event.
// If the specified etcd session is Done, this Run function will exit with cerrors.ErrEtcdSessionDone.
// And the specified etcd session is nil-safety.
func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string) error {
func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string, role string) error {
defer worker.cleanUp()

worker.initMetrics(captureAddr)

err := worker.syncRawState(ctx)
Expand All @@ -134,7 +133,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,

watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))
watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), role, clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))

var (
pendingPatches [][]DataPatch
Expand Down Expand Up @@ -190,7 +189,8 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
log.Info("Stale Etcd event dropped",
zap.Int64("event-revision", response.Header.GetRevision()),
zap.Int64("previous-revision", worker.revision),
zap.Any("events", response.Events))
zap.Any("events", response.Events),
zap.String("role", role))
continue
}
worker.revision = response.Header.GetRevision()
Expand Down Expand Up @@ -239,7 +239,9 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
nextState, err := worker.reactor.Tick(ctx, worker.state)
costTime := time.Since(startTime)
if costTime > etcdWorkerLogsWarnDuration {
log.Warn("EtcdWorker reactor tick took too long", zap.Duration("duration", costTime))
log.Warn("EtcdWorker reactor tick took too long",
zap.Duration("duration", costTime),
zap.String("role", role))
}
worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime.Seconds())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/etcd_worker_bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) {
accountNumber: totalAccountNumber,
}, &bankReactorState{c: c, index: i, account: make([]int, totalAccountNumber)})
c.Assert(err, check.IsNil)
err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1")
err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1", "")
if err == nil || err.Error() == "etcdserver: request timed out" {
continue
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
return errors.Trace(err)
}

return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1"))
return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", ""))
})
}

Expand Down Expand Up @@ -353,7 +353,7 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) {
c.Assert(err, check.IsNil)
errg := &errgroup.Group{}
errg.Go(func() error {
return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")
return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
})

time.Sleep(500 * time.Millisecond)
Expand Down Expand Up @@ -438,7 +438,8 @@ func (s *etcdWorkerSuite) TestFinished(c *check.C) {
state: make(map[string]string),
})
c.Assert(err, check.IsNil)
err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")

err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
c.Assert(err, check.IsNil)
resp, err := cli.Get(ctx, prefix+"/key1")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -507,7 +508,8 @@ func (s *etcdWorkerSuite) TestCover(c *check.C) {
state: make(map[string]string),
})
c.Assert(err, check.IsNil)
err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")

err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
c.Assert(err, check.IsNil)
resp, err := cli.Get(ctx, prefix+"/key1")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -586,7 +588,8 @@ func (s *etcdWorkerSuite) TestEmptyTxn(c *check.C) {
state: make(map[string]string),
})
c.Assert(err, check.IsNil)
err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")

err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
c.Assert(err, check.IsNil)
resp, err := cli.Get(ctx, prefix+"/key1")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -653,7 +656,8 @@ func (s *etcdWorkerSuite) TestEmptyOrNil(c *check.C) {
state: make(map[string]string),
})
c.Assert(err, check.IsNil)
err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")

err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
c.Assert(err, check.IsNil)
resp, err := cli.Get(ctx, prefix+"/key1")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -734,7 +738,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) {
wg.Add(1)
go func() {
defer wg.Done()
err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1")
err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1", "")
c.Assert(err, check.IsNil)
}()

Expand All @@ -749,7 +753,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) {
})
c.Assert(err, check.IsNil)

err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1")
err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1", "")
c.Assert(err, check.IsNil)

modifyReactor.waitOnCh <- struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/cdc_state_checker/cdc_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newCDCMonitor(ctx context.Context, pd string, credential *security.Credenti

func (m *cdcMonitor) run(ctx context.Context) error {
log.Debug("start running cdcMonitor")
err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1")
err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1", "")
log.Error("etcdWorker exited: test-case-failed", zap.Error(err))
log.Info("CDC state", zap.Reflect("state", m.reactor.state))
return err
Expand Down

0 comments on commit d5d5ab9

Please sign in to comment.