Skip to content

Commit

Permalink
Merge branch 'release-4.0' into cherry-pick-4502-to-release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Feb 22, 2022
2 parents 46d7b72 + 0f8e1b4 commit a0d93f7
Show file tree
Hide file tree
Showing 17 changed files with 452 additions and 41 deletions.
14 changes: 10 additions & 4 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *Capture) run(stdCtx context.Context) error {
// when the etcd worker of processor returns an error, it means that the the processor throws an unrecoverable serious errors
// (recoverable errors are intercepted in the processor tick)
// so we should also stop the owner and let capture restart or exit
processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval)
processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval, "processor")
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
go func() {
Expand Down Expand Up @@ -259,7 +259,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
log.Info("campaign owner successfully", zap.String("capture-id", c.info.ID))
owner := c.newOwner(c.pdClient)
c.setOwner(owner)
err = c.runEtcdWorker(ctx, owner, model.NewGlobalState(), ownerFlushInterval)
err = c.runEtcdWorker(ctx, owner, model.NewGlobalState(), ownerFlushInterval, "owner")
c.setOwner(nil)
log.Info("run owner exited", zap.Error(err))
// if owner exits, resign the owner key
Expand All @@ -275,13 +275,19 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
}
}

func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Reactor, reactorState orchestrator.ReactorState, timerInterval time.Duration) error {
func (c *Capture) runEtcdWorker(
ctx cdcContext.Context,
reactor orchestrator.Reactor,
reactorState orchestrator.ReactorState,
timerInterval time.Duration,
role string,
) error {
etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, kv.EtcdKeyBase, reactor, reactorState)
if err != nil {
return errors.Trace(err)
}
captureAddr := c.info.AdvertiseAddr
if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr); err != nil {
if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr, role); err != nil {
// We check ttl of lease instead of check `session.Done`, because
// `session.Done` is only notified when etcd client establish a
// new keepalive request, there could be a time window as long as
Expand Down
4 changes: 3 additions & 1 deletion cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) {
if processor, exist := m.processors[changefeedID]; exist {
err := processor.Close()
if err != nil {
log.Warn("failed to close processor", zap.Error(err))
log.Warn("failed to close processor",
zap.String("changefeed", changefeedID),
zap.Error(err))
}
delete(m.processors, changefeedID)
}
Expand Down
23 changes: 22 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,19 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
}
opts[sink.OptChangefeedID] = p.changefeed.ID
opts[sink.OptCaptureAddr] = ctx.GlobalVars().CaptureInfo.AdvertiseAddr
log.Info("processor try new sink", zap.String("changefeed", p.changefeed.ID))

start := time.Now()
s, err := sink.NewSink(stdCtx, p.changefeed.ID, p.changefeed.Info.SinkURI, p.filter, p.changefeed.Info.Config, opts, errCh)
if err != nil {
log.Info("processor new sink failed",
zap.String("changefeed", p.changefeed.ID),
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
log.Info("processor try new sink success",
zap.Duration("duration", time.Since(start)))

checkpointTs := p.changefeed.Info.GetCheckpointTs(p.changefeed.Status)
captureAddr := ctx.GlobalVars().CaptureInfo.AdvertiseAddr
p.sinkManager = sink.NewManager(stdCtx, s, errCh, checkpointTs, captureAddr, p.changefeedID)
Expand Down Expand Up @@ -779,6 +788,7 @@ func (p *processor) doGCSchemaStorage() {
}

func (p *processor) Close() error {
log.Info("processor closing ...", zap.String("changefeed", p.changefeedID))
for _, tbl := range p.tables {
tbl.Cancel()
}
Expand All @@ -799,7 +809,18 @@ func (p *processor) Close() error {
// pass a canceled context is ok here, since we don't need to wait Close
ctx, cancel := context.WithCancel(context.Background())
cancel()
return p.sinkManager.Close(ctx)
log.Info("processor try to close the sinkManager",
zap.String("changefeed", p.changefeedID))
start := time.Now()
if err := p.sinkManager.Close(ctx); err != nil {
log.Info("processor close sinkManager failed",
zap.String("changefeed", p.changefeedID),
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
log.Info("processor close sinkManager success",
zap.String("changefeed", p.changefeedID),
zap.Duration("duration", time.Since(start)))
}
return nil
}
Expand Down
13 changes: 12 additions & 1 deletion cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,18 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts)
func (m *Manager) Close(ctx context.Context) error {
tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID)
if m.backendSink != nil {
return m.backendSink.Close(ctx)
log.Info("sinkManager try close bufSink",
zap.String("changefeed", m.changefeedID))
start := time.Now()
if err := m.backendSink.Close(ctx); err != nil {
log.Info("close bufSink failed",
zap.String("changefeed", m.changefeedID),
zap.Duration("duration", time.Since(start)))
return err
}
log.Info("close bufSink success",
zap.String("changefeed", m.changefeedID),
zap.Duration("duration", time.Since(start)))
}
return nil
}
Expand Down
18 changes: 12 additions & 6 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,19 @@ func (k *kafkaSaramaProducer) Close() error {
// In fact close sarama sync client doesn't return any error.
// But close async client returns error if error channel is not empty, we
// don't populate this error to the upper caller, just add a log here.
err1 := k.syncClient.Close()
err2 := k.asyncClient.Close()
if err1 != nil {
log.Error("close sync client with error", zap.Error(err1))
start := time.Now()
err := k.asyncClient.Close()
if err != nil {
log.Error("close async client with error", zap.Error(err), zap.Duration("duration", time.Since(start)))
} else {
log.Info("async client closed", zap.Duration("duration", time.Since(start)))
}
if err2 != nil {
log.Error("close async client with error", zap.Error(err2))
start = time.Now()
err = k.syncClient.Close()
if err != nil {
log.Error("close sync client with error", zap.Error(err), zap.Duration("duration", time.Since(start)))
} else {
log.Info("sync client closed", zap.Duration("duration", time.Since(start)))
}
return nil
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ const (
backoffBaseDelayInMs = 500
// in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second
backoffMaxDelayInMs = 60 * 1000
// If no msg comes from a etcd watchCh for etcdWatchChTimeoutDuration long,
// If no msg comes from an etcd watchCh for etcdWatchChTimeoutDuration long,
// we should cancel the watchCh and request a new watchCh from etcd client
etcdWatchChTimeoutDuration = 10 * time.Second
// If no msg comes from a etcd watchCh for etcdRequestProgressDuration long,
// If no msg comes from an etcd watchCh for etcdRequestProgressDuration long,
// we should call RequestProgress of etcd client
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
Expand Down Expand Up @@ -176,17 +176,17 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ..
}

// Watch delegates request to clientv3.Watcher.Watch
func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
func (c *Client) Watch(ctx context.Context, key string, role string, opts ...clientv3.OpOption) clientv3.WatchChan {
watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize)
go c.WatchWithChan(ctx, watchCh, key, opts...)
go c.WatchWithChan(ctx, watchCh, key, role, opts...)
return watchCh
}

// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh
func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) {
func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, role string, opts ...clientv3.OpOption) {
defer func() {
close(outCh)
log.Info("WatchWithChan exited")
log.Info("WatchWithChan exited", zap.String("role", role))
}()
var lastRevision int64
watchCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -220,7 +220,9 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
break Loop
case <-ticker.C:
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)))
log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck",
zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)),
zap.String("role", role))
}
}
}
Expand All @@ -230,7 +232,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
}
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
// cancel the last cancel func to reset it
log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack"))
log.Warn("etcd client watchCh blocking too long, reset the watchCh",
zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)),
zap.Stack("stack"),
zap.String("role", role))
cancel()
watchCtx, cancel = context.WithCancel(ctx)
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1))
Expand Down
4 changes: 2 additions & 2 deletions pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *etcdSuite) TestWatchChBlocked(c *check.C) {
defer cancel()

go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
watchCli.WatchWithChan(ctx, outCh, key, "", clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
receivedRes := make([]clientv3.WatchResponse, 0)
// wait for WatchWithChan set up
Expand Down Expand Up @@ -203,7 +203,7 @@ func (s *etcdSuite) TestOutChBlocked(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
watchCli.WatchWithChan(ctx, outCh, key, "", clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
receivedRes := make([]clientv3.WatchResponse, 0)
// wait for WatchWithChan set up
Expand Down
12 changes: 7 additions & 5 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ func (worker *EtcdWorker) initMetrics(captureAddr string) {
// A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event.
// If the specified etcd session is Done, this Run function will exit with cerrors.ErrEtcdSessionDone.
// And the specified etcd session is nil-safety.
func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string) error {
func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string, role string) error {
defer worker.cleanUp()

worker.initMetrics(captureAddr)

err := worker.syncRawState(ctx)
Expand All @@ -134,7 +133,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,

watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))
watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), role, clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))

var (
pendingPatches [][]DataPatch
Expand Down Expand Up @@ -190,7 +189,8 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
log.Info("Stale Etcd event dropped",
zap.Int64("event-revision", response.Header.GetRevision()),
zap.Int64("previous-revision", worker.revision),
zap.Any("events", response.Events))
zap.Any("events", response.Events),
zap.String("role", role))
continue
}
worker.revision = response.Header.GetRevision()
Expand Down Expand Up @@ -239,7 +239,9 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
nextState, err := worker.reactor.Tick(ctx, worker.state)
costTime := time.Since(startTime)
if costTime > etcdWorkerLogsWarnDuration {
log.Warn("EtcdWorker reactor tick took too long", zap.Duration("duration", costTime))
log.Warn("EtcdWorker reactor tick took too long",
zap.Duration("duration", costTime),
zap.String("role", role))
}
worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime.Seconds())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/etcd_worker_bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) {
accountNumber: totalAccountNumber,
}, &bankReactorState{c: c, index: i, account: make([]int, totalAccountNumber)})
c.Assert(err, check.IsNil)
err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1")
err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1", "")
if err == nil || err.Error() == "etcdserver: request timed out" {
continue
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
return errors.Trace(err)
}

return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1"))
return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", ""))
})
}

Expand Down Expand Up @@ -352,7 +352,7 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) {
c.Assert(err, check.IsNil)
errg := &errgroup.Group{}
errg.Go(func() error {
return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")
return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
})

time.Sleep(500 * time.Millisecond)
Expand Down Expand Up @@ -437,7 +437,8 @@ func (s *etcdWorkerSuite) TestFinished(c *check.C) {
state: make(map[string]string),
})
c.Assert(err, check.IsNil)
err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")

err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
c.Assert(err, check.IsNil)
resp, err := cli.Get(ctx, prefix+"/key1")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -506,7 +507,8 @@ func (s *etcdWorkerSuite) TestCover(c *check.C) {
state: make(map[string]string),
})
c.Assert(err, check.IsNil)
err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")

err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
c.Assert(err, check.IsNil)
resp, err := cli.Get(ctx, prefix+"/key1")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -585,7 +587,8 @@ func (s *etcdWorkerSuite) TestEmptyTxn(c *check.C) {
state: make(map[string]string),
})
c.Assert(err, check.IsNil)
err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")

err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
c.Assert(err, check.IsNil)
resp, err := cli.Get(ctx, prefix+"/key1")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -652,7 +655,8 @@ func (s *etcdWorkerSuite) TestEmptyOrNil(c *check.C) {
state: make(map[string]string),
})
c.Assert(err, check.IsNil)
err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")

err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1", "")
c.Assert(err, check.IsNil)
resp, err := cli.Get(ctx, prefix+"/key1")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -733,7 +737,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) {
wg.Add(1)
go func() {
defer wg.Done()
err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1")
err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1", "")
c.Assert(err, check.IsNil)
}()

Expand All @@ -748,7 +752,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) {
})
c.Assert(err, check.IsNil)

err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1")
err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1", "")
c.Assert(err, check.IsNil)

modifyReactor.waitOnCh <- struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion testing_utils/cdc_state_checker/cdc_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func newCDCMonitor(ctx context.Context, pd string, credential *security.Credenti

func (m *cdcMonitor) run(ctx context.Context) error {
log.Debug("start running cdcMonitor")
err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1")
err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1", "")
log.Error("etcdWorker exited: test-case-failed", zap.Error(err))
log.Info("CDC state", zap.Reflect("state", m.reactor.state))
return err
Expand Down
Loading

0 comments on commit a0d93f7

Please sign in to comment.