Skip to content

Commit

Permalink
refine logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Jan 7, 2022
1 parent d24b122 commit 4c54774
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
6 changes: 5 additions & 1 deletion cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
ctx, cancel := cdcContext.WithCancel(ctx)
s.cancel = cancel

lastEmitCheckpointTs := time.Now()
s.wg.Add(1)
go func() {
defer s.wg.Done()
Expand Down Expand Up @@ -147,8 +148,11 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
ctx.Throw(errors.Trace(err))
return
}
log.Info("ddl sink send checkpointTs", zap.Uint64("checkpointTs", checkpointTs))
log.Info("ddl sink send checkpointTs",
zap.Uint64("checkpointTs", checkpointTs),
zap.Duration("elapsed", time.Since(lastEmitCheckpointTs)))
lastCheckpointTs = checkpointTs
lastEmitCheckpointTs = time.Now()
case ddl := <-s.ddlCh:
err := s.sink.EmitDDLEvent(ctx, ddl)
failpoint.Inject("InjectChangefeedDDLError", func() {
Expand Down
1 change: 0 additions & 1 deletion cdc/sink/buffer_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func (b *bufferSink) runOnce(ctx context.Context, state *runState) (bool, error)
if err != nil {
return false, errors.Trace(err)
}
log.Info("runOnce", zap.Uint64("checkpointTs", checkpointTs))
b.tableCheckpointTsMap.Store(tableID, checkpointTs)
}
now := time.Now()
Expand Down

0 comments on commit 4c54774

Please sign in to comment.