From 4c547744a1471cd2edfcac3b4a434e1d3bef4c51 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 7 Jan 2022 23:07:00 +0800 Subject: [PATCH] refine logs. --- cdc/owner/ddl_sink.go | 6 +++++- cdc/sink/buffer_sink.go | 1 - 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index 38af6bfdbf3..70c971d8d59 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -115,6 +115,7 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m ctx, cancel := cdcContext.WithCancel(ctx) s.cancel = cancel + lastEmitCheckpointTs := time.Now() s.wg.Add(1) go func() { defer s.wg.Done() @@ -147,8 +148,11 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m ctx.Throw(errors.Trace(err)) return } - log.Info("ddl sink send checkpointTs", zap.Uint64("checkpointTs", checkpointTs)) + log.Info("ddl sink send checkpointTs", + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("elapsed", time.Since(lastEmitCheckpointTs))) lastCheckpointTs = checkpointTs + lastEmitCheckpointTs = time.Now() case ddl := <-s.ddlCh: err := s.sink.EmitDDLEvent(ctx, ddl) failpoint.Inject("InjectChangefeedDDLError", func() { diff --git a/cdc/sink/buffer_sink.go b/cdc/sink/buffer_sink.go index 8cb2a1fd075..085c1e0da66 100644 --- a/cdc/sink/buffer_sink.go +++ b/cdc/sink/buffer_sink.go @@ -153,7 +153,6 @@ func (b *bufferSink) runOnce(ctx context.Context, state *runState) (bool, error) if err != nil { return false, errors.Trace(err) } - log.Info("runOnce", zap.Uint64("checkpointTs", checkpointTs)) b.tableCheckpointTsMap.Store(tableID, checkpointTs) } now := time.Now()