diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index d081435d363..baea8d0d137 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -77,8 +77,8 @@ type processor struct { sinkV2Factory *factory.SinkFactory // These fields are used to sinking data in pull-based mode. - sourceManger *sourcemanager.SourceManager - sinkManager *sinkmanager.SinkManager + sourceManager *sourcemanager.SourceManager + sinkManager *sinkmanager.SinkManager redoManager redo.LogManager @@ -211,7 +211,7 @@ func (p *processor) AddTable( } if p.pullBasedSinking { - p.sourceManger.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs) + p.sourceManager.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs) p.sinkManager.AddTable(tableID, startTs, p.changefeed.Info.TargetTs) if !isPrepare { p.sinkManager.StartTable(tableID, startTs) @@ -434,7 +434,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool zap.Error(err)) return 0, false } - p.sourceManger.RemoveTable(tableID) + p.sourceManager.RemoveTable(tableID) p.sinkManager.RemoveTable(tableID) log.Info("table removed", zap.String("captureID", p.captureInfo.ID), @@ -530,7 +530,7 @@ func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus { } func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats pipeline.Stats) tablepb.Stats { - pullerStats := p.sourceManger.GetTablePullerStats(tableID) + pullerStats := p.sourceManager.GetTablePullerStats(tableID) now, _ := p.upstream.PDClock.CurrentTime() stats := tablepb.Stats{ @@ -554,7 +554,7 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableI } // FIXME: add the stats of the sort engine. - //sortStats := p.sourceManger.GetTableSortStats(tableID) + //sortStats := p.sourceManager.GetTableSortStats(tableID) //stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{ // CheckpointTs: sortStats.CheckpointTsIngress, // ResolvedTs: sortStats.ResolvedTsIngress, @@ -862,9 +862,11 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { zap.Duration("duration", time.Since(start))) return errors.Trace(err) } - p.sourceManger = sourcemanager.New(p.changefeedID, p.upstream, sortEngine, p.errCh) + p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, sortEngine, p.errCh) sinkManager, err := sinkmanager.New(stdCtx, p.changefeedID, p.changefeed.Info, p.redoManager, sortEngine, p.mg, p.errCh, p.metricsTableSinkTotalRows) + // Bind them so that sourceManager can notify sinkManager. + p.sourceManager.OnResolve(sinkManager.UpdateReceivedSorterResolvedTs) if err != nil { log.Info("Processor creates sink manager", zap.String("namespace", p.changefeedID.Namespace), @@ -1298,19 +1300,22 @@ func (p *processor) Close(ctx cdcContext.Context) error { zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID)) if p.pullBasedSinking { - if err := p.sourceManger.Close(); err != nil { + if err := p.sourceManager.Close(); err != nil { log.Error("Failed to close source manager", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), zap.Error(err)) return errors.Trace(err) } - if err := p.sinkManager.Close(); err != nil { - log.Error("Failed to close sink manager", - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.Error(err)) - return errors.Trace(err) + if p.sinkManager != nil { + if err := p.sinkManager.Close(); err != nil { + log.Error("Failed to close sink manager", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.Error(err)) + return errors.Trace(err) + } + p.sinkManager = nil } engineFactory := ctx.GlobalVars().SortEngineFactory if engineFactory != nil { diff --git a/cdc/processor/sinkmanager/manager_impl.go b/cdc/processor/sinkmanager/manager.go similarity index 96% rename from cdc/processor/sinkmanager/manager_impl.go rename to cdc/processor/sinkmanager/manager.go index b0a4876e394..2b35f0286ec 100644 --- a/cdc/processor/sinkmanager/manager_impl.go +++ b/cdc/processor/sinkmanager/manager.go @@ -69,12 +69,12 @@ type SinkManager struct { lastBarrierTs atomic.Uint64 // sinkWorkers used to pull data from source manager. - sinkWorkers []sinkWorker + sinkWorkers []*sinkWorker // sinkTaskChan is used to send tasks to sinkWorkers. sinkTaskChan chan *sinkTask // redoWorkers used to pull data from source manager. - redoWorkers []redoWorker + redoWorkers []*redoWorker // redoTaskChan is used to send tasks to redoWorkers. redoTaskChan chan *redoTask @@ -117,7 +117,7 @@ func New( sortEngine: sortEngine, sinkProgressHeap: newTableProgresses(), - sinkWorkers: make([]sinkWorker, 0, sinkWorkerNum), + sinkWorkers: make([]*sinkWorker, 0, sinkWorkerNum), sinkTaskChan: make(chan *sinkTask), metricsTableSinkTotalRows: metricsTableSinkTotalRows, @@ -126,7 +126,7 @@ func New( if redoManager != nil { m.redoManager = redoManager m.redoProgressHeap = newTableProgresses() - m.redoWorkers = make([]redoWorker, 0, redoWorkerNum) + m.redoWorkers = make([]*redoWorker, 0, redoWorkerNum) m.redoTaskChan = make(chan *redoTask) // Use at most 1/3 memory quota for redo event cache. m.eventCache = newRedoEventCache(changefeedInfo.Config.MemoryQuota / 3) @@ -336,6 +336,11 @@ func (m *SinkManager) generateSinkTasks() error { return m.ctx.Err() case m.sinkTaskChan <- t: } + + log.Debug("Generate sink task", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Int64("tableID", tableID)) } } } @@ -398,6 +403,11 @@ func (m *SinkManager) generateRedoTasks() error { return m.ctx.Err() case m.redoTaskChan <- t: } + + log.Debug("Generate redo task", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Int64("tableID", tableID)) } } } @@ -441,6 +451,10 @@ func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs // StartTable sets the table(TableSink) state to replicating. func (m *SinkManager) StartTable(tableID model.TableID, startTs model.Ts) { + log.Info("Start table sink", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Int64("tableID", tableID)) tableSink, ok := m.tableSinks.Load(tableID) if !ok { log.Panic("Table sink not found when starting table stats", @@ -561,7 +575,10 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, erro // Close closes all workers. func (m *SinkManager) Close() error { - m.cancel() + if m.cancel != nil { + m.cancel() + m.cancel = nil + } m.memQuota.close() err := m.sinkFactory.Close() if err != nil { diff --git a/cdc/processor/sinkmanager/manager_impl_test.go b/cdc/processor/sinkmanager/manager_test.go similarity index 100% rename from cdc/processor/sinkmanager/manager_impl_test.go rename to cdc/processor/sinkmanager/manager_test.go diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go new file mode 100644 index 00000000000..d8d510e6ab0 --- /dev/null +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -0,0 +1,159 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sinkmanager + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" + "github.com/pingcap/tiflow/cdc/redo" +) + +type redoWorker struct { + changefeedID model.ChangeFeedID + mg entry.MounterGroup + sortEngine engine.SortEngine + memQuota *memQuota + redoManager redo.LogManager + eventCache *redoEventCache + splitTxn bool + enableOldValue bool +} + +func newRedoWorker( + changefeedID model.ChangeFeedID, + mg entry.MounterGroup, + sortEngine engine.SortEngine, + quota *memQuota, + redoManager redo.LogManager, + eventCache *redoEventCache, + splitTxn bool, + enableOldValue bool, +) *redoWorker { + return &redoWorker{ + changefeedID: changefeedID, + mg: mg, + sortEngine: sortEngine, + memQuota: quota, + redoManager: redoManager, + eventCache: eventCache, + splitTxn: splitTxn, + enableOldValue: enableOldValue, + } +} + +func (w *redoWorker) handleTasks(ctx context.Context, taskChan <-chan *redoTask) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case task := <-taskChan: + if err := w.handleTask(ctx, task); err != nil { + return errors.Trace(err) + } + } + } +} + +func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) error { + rows := make([]*model.RowChangedEvent, 0, 1024) + cache := w.eventCache.getAppender(task.tableID) + + // Events are pushed into redoEventCache if possible. Otherwise, their memory will + // be released after they are written into redo files. Then we need to release their + // memory quota, which can be calculated based on batchSize and cachedSize. + batchSize := uint64(0) + cachedSize := uint64(0) + + memAllocated := true + + var lastPos engine.Position + maybeEmitBatchEvents := func(allFinished, txnFinished bool) error { + if batchSize == 0 || (!allFinished && batchSize < requestMemSize) { + return nil + } + + releaseMem := func() { w.memQuota.refund(batchSize - cachedSize) } + err := w.redoManager.EmitRowChangedEvents(ctx, task.tableID, releaseMem, rows...) + if err != nil { + return errors.Trace(err) + } + if lastPos.Valid() { + err = w.redoManager.UpdateResolvedTs(ctx, task.tableID, lastPos.CommitTs) + if err != nil { + return errors.Trace(err) + } + } + + rows = rows[0:] + if cap(rows) > 1024 { + rows = make([]*model.RowChangedEvent, 0, 1024) + } + batchSize = 0 + cachedSize = 0 + + if !allFinished { + if !txnFinished { + w.memQuota.forceAcquire(requestMemSize) + } else { + memAllocated = w.memQuota.tryAcquire(requestMemSize) + } + } + return nil + } + + // lowerBound and upperBound are both closed intervals. + iter := engine.NewMountedEventIter( + w.sortEngine.FetchByTable(task.tableID, task.lowerBound, task.getUpperBound()), + w.mg, 256) + defer iter.Close() + for memAllocated { + e, pos, err := iter.Next(ctx) + if err != nil { + return errors.Trace(err) + } + if e == nil { + // There is no more data. + if err = maybeEmitBatchEvents(true, true); e != nil { + return errors.Trace(err) + } + return nil + } + if pos.Valid() { + lastPos = pos + } + + x, size, err := convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e) + if err != nil { + return errors.Trace(err) + } + + rows = append(rows, x...) + batchSize += size + if cache.pushBatch(x, size, pos.Valid()) { + cachedSize += size + } else { + cachedSize -= cache.cleanBrokenEvents() + } + if err = maybeEmitBatchEvents(false, pos.Valid()); err != nil { + return errors.Trace(err) + } + } + // Can't allocate memory. + task.callback(lastPos) + return nil +} diff --git a/cdc/processor/sinkmanager/worker_impl.go b/cdc/processor/sinkmanager/table_sink_worker.go similarity index 67% rename from cdc/processor/sinkmanager/worker_impl.go rename to cdc/processor/sinkmanager/table_sink_worker.go index e426beecee5..84ac253ad06 100644 --- a/cdc/processor/sinkmanager/worker_impl.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -21,33 +21,10 @@ import ( "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" - "github.com/pingcap/tiflow/cdc/redo" "go.uber.org/zap" ) -const ( - // defaultRequestMemSize is the default memory usage for a request. - defaultRequestMemSize = uint64(10 * 1024 * 1024) // 10MB - // Avoid update resolved ts too frequently, if there are too many small transactions. - defaultMaxUpdateIntervalSize = uint64(1024 * 256) // 256KB - // Limit the maximum size of a group of one batch, if there is a big translation. - defaultMaxBigTxnBatchSize = defaultMaxUpdateIntervalSize * 20 // 5MB -) - -// Make these values be variables, so that we can mock them in unit tests. -var ( - requestMemSize = defaultRequestMemSize - maxUpdateIntervalSize = defaultMaxUpdateIntervalSize - maxBigTxnBatchSize = defaultMaxBigTxnBatchSize -) - -// Some type assertions. -var ( - _ sinkWorker = (*sinkWorkerImpl)(nil) - _ redoWorker = (*redoWorkerImpl)(nil) -) - -type sinkWorkerImpl struct { +type sinkWorker struct { changefeedID model.ChangeFeedID mg entry.MounterGroup sortEngine engine.SortEngine @@ -69,8 +46,8 @@ func newSinkWorker( eventCache *redoEventCache, splitTxn bool, enableOldValue bool, -) sinkWorker { - return &sinkWorkerImpl{ +) *sinkWorker { + return &sinkWorker{ changefeedID: changefeedID, mg: mg, sortEngine: sortEngine, @@ -81,7 +58,7 @@ func newSinkWorker( } } -func (w *sinkWorkerImpl) handleTasks(ctx context.Context, taskChan <-chan *sinkTask) (err error) { +func (w *sinkWorker) handleTasks(ctx context.Context, taskChan <-chan *sinkTask) (err error) { for { select { case <-ctx.Done(): @@ -131,7 +108,7 @@ func (w *sinkWorkerImpl) handleTasks(ctx context.Context, taskChan <-chan *sinkT // lowerBound and upperBound are both closed intervals. iter := engine.NewMountedEventIter( w.sortEngine.FetchByTable(task.tableID, lowerBound, upperBound), - w.mg, requestMemSize, 256) + w.mg, 256) for !task.isCanceled() { e, pos, err := iter.Next(ctx) if err != nil { @@ -258,6 +235,14 @@ func (w *sinkWorkerImpl) handleTasks(ctx context.Context, taskChan <-chan *sinkT ) } else { + if lastCommitTs == 0 { + lastCommitTs = upperBound.CommitTs + err := advanceTableSinkAndResetCurrentSize() + if err != nil { + return errors.Trace(err) + } + lastPos = upperBound + } // This means that we append all the events to the table sink. // But we have not updated the resolved ts. // Because we do not reach the maxUpdateIntervalSize. @@ -277,7 +262,7 @@ func (w *sinkWorkerImpl) handleTasks(ctx context.Context, taskChan <-chan *sinkT } } -func (w *sinkWorkerImpl) fetchFromCache( +func (w *sinkWorker) fetchFromCache( task *sinkTask, // task is read-only here. lowerBound engine.Position, upperBound engine.Position, @@ -299,7 +284,7 @@ func (w *sinkWorkerImpl) fetchFromCache( return lowerBound, nil } -func (w *sinkWorkerImpl) appendEventsToTableSink(t *sinkTask, events []*model.PolymorphicEvent) (uint64, error) { +func (w *sinkWorker) appendEventsToTableSink(t *sinkTask, events []*model.PolymorphicEvent) (uint64, error) { rowChangedEvents, size, err := convertRowChangedEvents(w.changefeedID, t.tableID, w.enableOldValue, events...) if err != nil { return 0, err @@ -308,147 +293,20 @@ func (w *sinkWorkerImpl) appendEventsToTableSink(t *sinkTask, events []*model.Po return size, nil } -func (w *sinkWorkerImpl) advanceTableSink(t *sinkTask, commitTs model.Ts, size uint64, batchID uint64) error { +func (w *sinkWorker) advanceTableSink(t *sinkTask, commitTs model.Ts, size uint64, batchID uint64) error { + log.Debug("Advance table sink", + zap.String("namespace", w.changefeedID.Namespace), + zap.String("changefeed", w.changefeedID.ID), + zap.Int64("tableID", t.tableID), + zap.Uint64("commitTs", commitTs)) + resolvedTs := model.NewResolvedTs(commitTs) if w.splitTxn { resolvedTs.Mode = model.BatchResolvedMode resolvedTs.BatchID = batchID } - w.memQuota.record(t.tableID, resolvedTs, size) - return t.tableSink.updateResolvedTs(resolvedTs) -} - -type redoWorkerImpl struct { - changefeedID model.ChangeFeedID - mg entry.MounterGroup - sortEngine engine.SortEngine - memQuota *memQuota - redoManager redo.LogManager - eventCache *redoEventCache - splitTxn bool - enableOldValue bool -} - -func newRedoWorker( - changefeedID model.ChangeFeedID, - mg entry.MounterGroup, - sortEngine engine.SortEngine, - quota *memQuota, - redoManager redo.LogManager, - eventCache *redoEventCache, - splitTxn bool, - enableOldValue bool, -) redoWorker { - return &redoWorkerImpl{ - changefeedID: changefeedID, - mg: mg, - sortEngine: sortEngine, - memQuota: quota, - redoManager: redoManager, - eventCache: eventCache, - splitTxn: splitTxn, - enableOldValue: enableOldValue, + if size > 0 { + w.memQuota.record(t.tableID, resolvedTs, size) } -} - -func (w *redoWorkerImpl) handleTasks(ctx context.Context, taskChan <-chan *redoTask) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case task := <-taskChan: - if err := w.handleTask(ctx, task); err != nil { - return errors.Trace(err) - } - } - } -} - -func (w *redoWorkerImpl) handleTask(ctx context.Context, task *redoTask) error { - rows := make([]*model.RowChangedEvent, 0, 1024) - cache := w.eventCache.getAppender(task.tableID) - - // Events are pushed into redoEventCache if possible. Otherwise, their memory will - // be released after they are written into redo files. Then we need to release their - // memory quota, which can be calculated based on batchSize and cachedSize. - batchSize := uint64(0) - cachedSize := uint64(0) - - memAllocated := true - - var lastPos engine.Position - maybeEmitBatchEvents := func(allFinished, txnFinished bool) error { - if batchSize == 0 || (!allFinished && batchSize < requestMemSize) { - return nil - } - - releaseMem := func() { w.memQuota.refund(batchSize - cachedSize) } - err := w.redoManager.EmitRowChangedEvents(ctx, task.tableID, releaseMem, rows...) - if err != nil { - return errors.Trace(err) - } - if lastPos.Valid() { - err = w.redoManager.UpdateResolvedTs(ctx, task.tableID, lastPos.CommitTs) - if err != nil { - return errors.Trace(err) - } - } - - rows = rows[0:] - if cap(rows) > 1024 { - rows = make([]*model.RowChangedEvent, 0, 1024) - } - batchSize = 0 - cachedSize = 0 - - if !allFinished { - if !txnFinished { - w.memQuota.forceAcquire(requestMemSize) - } else { - memAllocated = w.memQuota.tryAcquire(requestMemSize) - } - } - return nil - } - - // lowerBound and upperBound are both closed intervals. - iter := engine.NewMountedEventIter( - w.sortEngine.FetchByTable(task.tableID, task.lowerBound, task.getUpperBound()), - w.mg, requestMemSize, 256) - defer iter.Close() - for memAllocated { - e, pos, err := iter.Next(ctx) - if err != nil { - return errors.Trace(err) - } - if e == nil { - // There is no more data. - if err = maybeEmitBatchEvents(true, true); e != nil { - return errors.Trace(err) - } - return nil - } - if pos.Valid() { - lastPos = pos - } - - x, size, err := convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e) - if err != nil { - return errors.Trace(err) - } - - rows = append(rows, x...) - batchSize += size - if cache.pushBatch(x, size, pos.Valid()) { - cachedSize += size - } else { - cachedSize -= cache.cleanBrokenEvents() - } - if err = maybeEmitBatchEvents(false, pos.Valid()); err != nil { - return errors.Trace(err) - } - } - // Can't allocate memory. - task.callback(lastPos) - return nil + return t.tableSink.updateResolvedTs(resolvedTs) } diff --git a/cdc/processor/sinkmanager/worker_impl_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go similarity index 97% rename from cdc/processor/sinkmanager/worker_impl_test.go rename to cdc/processor/sinkmanager/table_sink_worker_test.go index abe53028706..8bea78a742b 100644 --- a/cdc/processor/sinkmanager/worker_impl_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/suite" ) -func createWorker(changefeedID model.ChangeFeedID, memQuota uint64, splitTxn bool) sinkWorker { +func createWorker(changefeedID model.ChangeFeedID, memQuota uint64, splitTxn bool) *sinkWorker { sortEngine := memory.New(context.Background()) quota := newMemQuota(changefeedID, memQuota) return newSinkWorker(changefeedID, &entry.MockMountGroup{}, sortEngine, quota, nil, splitTxn, false) @@ -144,7 +144,7 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithSplitTxnAndAbortWhenNoMemA } w := createWorker(changefeedID, eventSize, true) - addEventsToSortEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) + addEventsToSortEngine(suite.T(), events, w.sortEngine, tableID) taskChan := make(chan *sinkTask) var wg sync.WaitGroup @@ -259,7 +259,7 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithSplitTxnAndAbortWhenNoMemA }, } w := createWorker(changefeedID, eventSize, true) - addEventsToSortEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) + addEventsToSortEngine(suite.T(), events, w.sortEngine, tableID) taskChan := make(chan *sinkTask) var wg sync.WaitGroup @@ -301,7 +301,7 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithSplitTxnAndAbortWhenNoMemA isCanceled: func() bool { return false }, } // Abort the task when no memory quota and blocked. - w.(*sinkWorkerImpl).memQuota.close() + w.memQuota.close() wg.Wait() require.Len(suite.T(), sink.events, 1, "Only one txn should be sent to sink before abort") } @@ -386,7 +386,7 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithSplitTxnAndOnlyAdvanceTabl }, } w := createWorker(changefeedID, eventSize, true) - addEventsToSortEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) + addEventsToSortEngine(suite.T(), events, w.sortEngine, tableID) taskChan := make(chan *sinkTask) var wg sync.WaitGroup @@ -512,7 +512,7 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithoutSplitTxnAndAbortWhenNoM }, } w := createWorker(changefeedID, eventSize, false) - addEventsToSortEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) + addEventsToSortEngine(suite.T(), events, w.sortEngine, tableID) taskChan := make(chan *sinkTask) var wg sync.WaitGroup @@ -637,7 +637,7 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithoutSplitTxnOnlyAdvanceTabl }, } w := createWorker(changefeedID, eventSize, false) - addEventsToSortEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) + addEventsToSortEngine(suite.T(), events, w.sortEngine, tableID) taskChan := make(chan *sinkTask) var wg sync.WaitGroup diff --git a/cdc/processor/sinkmanager/worker.go b/cdc/processor/sinkmanager/tasks.go similarity index 70% rename from cdc/processor/sinkmanager/worker.go rename to cdc/processor/sinkmanager/tasks.go index 9708d8d5293..1ca3b0a435a 100644 --- a/cdc/processor/sinkmanager/worker.go +++ b/cdc/processor/sinkmanager/tasks.go @@ -14,12 +14,26 @@ package sinkmanager import ( - "context" - "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" ) +const ( + // defaultRequestMemSize is the default memory usage for a request. + defaultRequestMemSize = uint64(10 * 1024 * 1024) // 10MB + // Avoid update resolved ts too frequently, if there are too many small transactions. + defaultMaxUpdateIntervalSize = uint64(1024 * 256) // 256KB + // Limit the maximum size of a group of one batch, if there is a big translation. + defaultMaxBigTxnBatchSize = defaultMaxUpdateIntervalSize * 20 // 5MB +) + +// Make these values be variables, so that we can mock them in unit tests. +var ( + requestMemSize = defaultRequestMemSize + maxUpdateIntervalSize = defaultMaxUpdateIntervalSize + maxBigTxnBatchSize = defaultMaxBigTxnBatchSize +) + // Used to record the progress of the table. type writeSuccessCallback func(lastWrittenPos engine.Position) @@ -45,12 +59,7 @@ type sinkTask struct { isCanceled isCanceled } -type sinkWorker interface { - // Pull data from source manager for the table sink. - // We suppose that the worker only handle one task at a time. - handleTasks(ctx context.Context, taskChan <-chan *sinkTask) error -} - +// redoTask is a task for the redo log. type redoTask struct { tableID model.TableID lowerBound engine.Position @@ -58,9 +67,3 @@ type redoTask struct { tableSink *tableSinkWrapper callback writeSuccessCallback } - -type redoWorker interface { - // Pull data from source manager for the table sink. - // We suppose that the worker only handle one task at a time. - handleTasks(ctx context.Context, taskChan <-chan *redoTask) error -} diff --git a/cdc/processor/sourcemanager/engine/factory/factory.go b/cdc/processor/sourcemanager/engine/factory/factory.go index e2044b3178b..1cc4159e3b9 100644 --- a/cdc/processor/sourcemanager/engine/factory/factory.go +++ b/cdc/processor/sourcemanager/engine/factory/factory.go @@ -116,7 +116,10 @@ func (f *SortEngineFactory) Close() (err error) { func NewForPebble(dir string, memQuotaInBytes uint64, cfg *config.DBConfig) *SortEngineFactory { manager := &SortEngineFactory{ engineType: pebbleEngine, + dir: dir, memQuotaInBytes: memQuotaInBytes, + engines: make(map[model.ChangeFeedID]engine.SortEngine), + closed: make(chan struct{}), pebbleConfig: cfg, } diff --git a/cdc/processor/sourcemanager/engine/mounted_iter.go b/cdc/processor/sourcemanager/engine/mounted_iter.go index 814878efd6c..2612c2cf14d 100644 --- a/cdc/processor/sourcemanager/engine/mounted_iter.go +++ b/cdc/processor/sourcemanager/engine/mounted_iter.go @@ -24,11 +24,9 @@ import ( type MountedEventIter struct { iter EventIterator mg entry.MounterGroup - maxMemUsage uint64 maxBatchSize int rawEvents []rawEvent - totalMemUsage uint64 nextToMount int nextToEmit int savedIterError error @@ -38,13 +36,11 @@ type MountedEventIter struct { func NewMountedEventIter( iter EventIterator, mg entry.MounterGroup, - maxMemUsage uint64, maxBatchSize int, ) *MountedEventIter { return &MountedEventIter{ iter: iter, mg: mg, - maxMemUsage: maxMemUsage, maxBatchSize: maxBatchSize, } } @@ -56,14 +52,13 @@ func (i *MountedEventIter) Next(ctx context.Context) (event *model.PolymorphicEv if err = i.rawEvents[idx].event.WaitFinished(ctx); err == nil { event = i.rawEvents[idx].event txnFinished = i.rawEvents[idx].txnFinished - i.totalMemUsage -= i.rawEvents[idx].size i.nextToEmit += 1 } return } - // There are no events in mounting. Fetch more events and mounting them. The batch - // size is determined by `maxMemUsage` and `maxBatchSize`. + // There are no events in mounting. Fetch more events and mounting them. + // The batch size is determined by `maxBatchSize`. if i.mg != nil && i.iter != nil { i.nextToMount = 0 i.nextToEmit = 0 @@ -73,7 +68,7 @@ func (i *MountedEventIter) Next(ctx context.Context) (event *model.PolymorphicEv i.rawEvents = i.rawEvents[:0] } - for i.totalMemUsage < i.maxMemUsage && len(i.rawEvents) < cap(i.rawEvents) { + for len(i.rawEvents) < cap(i.rawEvents) { event, txnFinished, err = i.iter.Next() if err != nil { return @@ -82,11 +77,8 @@ func (i *MountedEventIter) Next(ctx context.Context) (event *model.PolymorphicEv i.savedIterError = i.iter.Close() i.iter = nil break - } - size := uint64(event.Row.ApproximateBytes()) - i.totalMemUsage += size - i.rawEvents = append(i.rawEvents, rawEvent{event, txnFinished, size}) + i.rawEvents = append(i.rawEvents, rawEvent{event, txnFinished}) } for idx := i.nextToMount; idx < len(i.rawEvents); idx++ { i.rawEvents[idx].event.SetUpFinishedCh() @@ -119,5 +111,4 @@ func (i *MountedEventIter) Close() error { type rawEvent struct { event *model.PolymorphicEvent txnFinished Position - size uint64 } diff --git a/cdc/processor/sourcemanager/engine/mounted_iter_test.go b/cdc/processor/sourcemanager/engine/mounted_iter_test.go index a29776f782e..3811d095981 100644 --- a/cdc/processor/sourcemanager/engine/mounted_iter_test.go +++ b/cdc/processor/sourcemanager/engine/mounted_iter_test.go @@ -47,16 +47,14 @@ func TestMountedEventIter(t *testing.T) { } }, } - itemSize := uint64(rawIter.repeatItem().Row.ApproximateBytes()) mg := &entry.MockMountGroup{} - iter := NewMountedEventIter(rawIter, mg, itemSize*3, 8) + iter := NewMountedEventIter(rawIter, mg, 3) for i := 0; i < 3; i++ { event, _, err := iter.Next(context.Background()) require.NotNil(t, event) require.Nil(t, err) - require.Equal(t, itemSize*uint64(2-i), iter.totalMemUsage) } require.Equal(t, iter.nextToMount, 3) require.Equal(t, iter.nextToEmit, 3) diff --git a/cdc/processor/sourcemanager/engine/pebble/event_sorter.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go index 7447ff026b9..3d6e7ed90e3 100644 --- a/cdc/processor/sourcemanager/engine/pebble/event_sorter.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go @@ -49,6 +49,7 @@ type EventSorter struct { // Following fields are protected by mu. mu sync.RWMutex + isClosed bool onResolves []func(model.TableID, model.Ts) tables map[model.TableID]*tableState } @@ -221,6 +222,14 @@ func (s *EventSorter) CleanAllTables(upperBound engine.Position) error { // Close implements sorter.EventSortEngine. func (s *EventSorter) Close() error { + s.mu.Lock() + if s.isClosed { + s.mu.Unlock() + return nil + } + s.isClosed = true + s.mu.Unlock() + close(s.closed) s.wg.Wait() for _, ch := range s.channs { @@ -399,5 +408,5 @@ func getDB(tableID model.TableID, dbCount int) int { b := [8]byte{} binary.LittleEndian.PutUint64(b[:], uint64(tableID)) h.Write(b[:]) - return int(h.Sum64()) % dbCount + return int(h.Sum64() % uint64(dbCount)) } diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index 9a9e0b00baf..827e09817d4 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -38,7 +38,7 @@ type Wrapper struct { // cancel is used to cancel the puller when remove or close the table. cancel context.CancelFunc // wg is used to wait the puller to exit. - wg *sync.WaitGroup + wg sync.WaitGroup } // NewPullerWrapper creates a new puller wrapper. diff --git a/dm/loader/lightning.go b/dm/loader/lightning.go index 3b234803d1b..9dfea9c4647 100644 --- a/dm/loader/lightning.go +++ b/dm/loader/lightning.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" lcfg "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/dumpling/export" tidbpromutil "github.com/pingcap/tidb/util/promutil" "github.com/pingcap/tiflow/dm/config" @@ -471,7 +472,17 @@ func (l *LightningLoader) Update(ctx context.Context, cfg *config.SubTaskConfig) func (l *LightningLoader) status() *pb.LoadStatus { finished, total := l.core.Status() progress := percent(finished, total, l.finish.Load()) - currentSpeed := int64(l.speedRecorder.GetSpeed(float64(finished))) + finishedForSpeed := finished + // we need finished bytes to calculate speed. For tidb backend, BytesStateRestored in metrics is the source file size + // that has been written to downstream DB. For local backend, we need to wait TiKV finishing ingest SST files, so we + // use the value from Status() instead. + if l.cfg.LoaderConfig.ImportMode == config.LoadModeLogical { + m := l.core.Metrics() + if m != nil { + finishedForSpeed = int64(metric.ReadCounter(m.BytesCounter.WithLabelValues(metric.BytesStateRestored))) + } + } + currentSpeed := int64(l.speedRecorder.GetSpeed(float64(finishedForSpeed))) l.logger.Info("progress status of lightning", zap.Int64("finished_bytes", finished), diff --git a/engine/executor/dm/unitholder.go b/engine/executor/dm/unitholder.go index 6febd57db07..534704fdfc1 100644 --- a/engine/executor/dm/unitholder.go +++ b/engine/executor/dm/unitholder.go @@ -60,9 +60,10 @@ type unitHolderImpl struct { cfg *dmconfig.SubTaskConfig unit unit.Unit - upstreamDB *conn.BaseDB - sourceStatus *binlog.SourceStatus - sourceStatusMu sync.RWMutex + upstreamDB *conn.BaseDB + sourceStatus *binlog.SourceStatus + sourceStatusMu sync.RWMutex + sourceStatusCheckTime time.Time logger log.Logger // use to access process(init/close/pause/resume) @@ -73,6 +74,9 @@ type unitHolderImpl struct { runCtx context.Context runCancel context.CancelFunc result *pb.ProcessResult // TODO: check if framework can persist result + + // used to run background task + bgWg sync.WaitGroup } var _ unitHolder = &unitHolderImpl{} @@ -151,6 +155,7 @@ func (u *unitHolderImpl) Pause(ctx context.Context) error { u.fieldMu.Lock() u.runCancel() u.fieldMu.Unlock() + u.bgWg.Wait() u.processWg.Wait() // TODO: refactor unit.Syncer // unit needs to manage its own life cycle @@ -196,6 +201,7 @@ func (u *unitHolderImpl) Close(ctx context.Context) error { } u.fieldMu.Unlock() + u.bgWg.Wait() u.processWg.Wait() if u.unit != nil { u.unit.Close() @@ -266,10 +272,15 @@ func (u *unitHolderImpl) setSourceStatus(in *binlog.SourceStatus) { // CheckAndUpdateStatus implement UnitHolder.CheckAndUpdateStatus. func (u *unitHolderImpl) CheckAndUpdateStatus(ctx context.Context) { - sourceStatus := u.getSourceStatus() - - if sourceStatus == nil || time.Since(sourceStatus.UpdateTime) > sourceStatusRefreshInterval { - u.updateSourceStatus(ctx) + u.fieldMu.Lock() + defer u.fieldMu.Unlock() + if time.Since(u.sourceStatusCheckTime) > sourceStatusRefreshInterval { + u.sourceStatusCheckTime = time.Now() + u.bgWg.Add(1) + go func() { + defer u.bgWg.Done() + u.updateSourceStatus(ctx) + }() } } diff --git a/engine/executor/dm/unitholder_test.go b/engine/executor/dm/unitholder_test.go index cc944e3f0aa..e2215dfdbc7 100644 --- a/engine/executor/dm/unitholder_test.go +++ b/engine/executor/dm/unitholder_test.go @@ -187,6 +187,7 @@ func TestUnitHolderCheckAndUpdateStatus(t *testing.T) { sqlmock.NewRows([]string{"File", "Position"}).AddRow("mysql-bin.000001", "2345"), ) unitHolder.CheckAndUpdateStatus(ctx) + unitHolder.bgWg.Wait() u.AssertExpectations(t) require.NotNil(t, unitHolder.sourceStatus) require.NoError(t, mock.ExpectationsWereMet()) @@ -194,6 +195,7 @@ func TestUnitHolderCheckAndUpdateStatus(t *testing.T) { // the second time CheckAndUpdateStatus, will not query upstreamDB unitHolder.upstreamDB = nil unitHolder.CheckAndUpdateStatus(ctx) + unitHolder.bgWg.Wait() u.AssertExpectations(t) // imitate pass refresh interval @@ -214,6 +216,7 @@ func TestUnitHolderCheckAndUpdateStatus(t *testing.T) { ) u.On("Status").Return(&pb.DumpStatus{}) unitHolder.CheckAndUpdateStatus(ctx) + unitHolder.bgWg.Wait() u.AssertExpectations(t) require.NoError(t, mock.ExpectationsWereMet()) require.NotEqual(t, lastUpdateTime, unitHolder.sourceStatus.UpdateTime) diff --git a/engine/executor/dm/worker.go b/engine/executor/dm/worker.go index f42039610e6..7c815f024c3 100644 --- a/engine/executor/dm/worker.go +++ b/engine/executor/dm/worker.go @@ -146,6 +146,7 @@ func (w *dmWorker) InitImpl(ctx context.Context) error { } // Tick implements lib.WorkerImpl.Tick +// Do not do heavy work in Tick, it will block the message processing. func (w *dmWorker) Tick(ctx context.Context) error { if err := w.checkAndAutoResume(ctx); err != nil { return err diff --git a/engine/executor/openapi.go b/engine/executor/openapi.go index 27eb8e29a39..e03868980e0 100644 --- a/engine/executor/openapi.go +++ b/engine/executor/openapi.go @@ -21,12 +21,12 @@ import ( "github.com/gin-gonic/gin" engineModel "github.com/pingcap/tiflow/engine/model" + "github.com/pingcap/tiflow/engine/pkg/openapi" + "github.com/pingcap/tiflow/pkg/errors" ) -const jobAPIPrefix = "/api/v1/jobs/" - func jobAPIBasePath(jobID engineModel.JobID) string { - return jobAPIPrefix + jobID + "/" + return openapi.JobAPIPrefix + jobID + "/" } type jobAPIServer struct { @@ -41,27 +41,32 @@ func newJobAPIServer() *jobAPIServer { } func (s *jobAPIServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if h, ok := s.match(r.URL.Path); ok { - h.ServeHTTP(w, r) - } else { - http.NotFound(w, r) + h, err := s.matchHandler(r.URL.Path) + if err != nil { + openapi.WriteHTTPError(w, err) + return } + h.ServeHTTP(w, r) } -func (s *jobAPIServer) match(path string) (http.Handler, bool) { - if !strings.HasPrefix(path, jobAPIPrefix) { - return nil, false +func (s *jobAPIServer) matchHandler(path string) (http.Handler, error) { + if !strings.HasPrefix(path, openapi.JobAPIPrefix) { + return nil, errors.ErrInvalidArgument.GenWithStack("invalid job api path: %s", path) } - path = strings.TrimPrefix(path, jobAPIPrefix) - fields := strings.SplitN(path, "/", 2) - if len(fields) != 2 { - return nil, false + path = strings.TrimPrefix(path, openapi.JobAPIPrefix) + parts := strings.SplitN(path, "/", 2) + if len(parts) != 2 { + return nil, errors.ErrInvalidArgument.GenWithStack("invalid job api path: %s", path) } - JobID := fields[0] + JobID := parts[0] s.rwm.RLock() h, ok := s.handlers[JobID] s.rwm.RUnlock() - return h, ok + if !ok { + // We can't tell whether the job exists or not, but at least the job is not running. + return nil, errors.ErrJobNotRunning.GenWithStackByArgs(JobID) + } + return h, nil } func (s *jobAPIServer) initialize(jobID engineModel.JobID, f func(apiGroup *gin.RouterGroup)) { diff --git a/engine/executor/openapi_test.go b/engine/executor/openapi_test.go index 5ebc11dfa34..9af73a89559 100644 --- a/engine/executor/openapi_test.go +++ b/engine/executor/openapi_test.go @@ -15,6 +15,7 @@ package executor import ( "context" + "encoding/json" "net/http" "net/http/httptest" "sync" @@ -23,6 +24,8 @@ import ( "github.com/gin-gonic/gin" engineModel "github.com/pingcap/tiflow/engine/model" + "github.com/pingcap/tiflow/engine/pkg/openapi" + "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" ) @@ -40,6 +43,13 @@ func TestJobAPIServer(t *testing.T) { }) }) + ensureNotRunning := func(w *httptest.ResponseRecorder) { + require.Equal(t, errors.HTTPStatusCode(errors.ErrJobNotRunning), w.Code) + var httpErr openapi.HTTPError + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &httpErr)) + require.Equal(t, string(errors.ErrJobNotRunning.RFCCode()), httpErr.Code) + } + // test job1 { w := httptest.NewRecorder() @@ -56,12 +66,12 @@ func TestJobAPIServer(t *testing.T) { require.Equal(t, http.StatusOK, w.Code) require.Equal(t, "job2 status", w.Body.String()) } - // test not found + // test not running job { w := httptest.NewRecorder() r := httptest.NewRequest("GET", "/api/v1/jobs/job3/status", nil) jobAPISrv.ServeHTTP(w, r) - require.Equal(t, http.StatusNotFound, w.Code) + ensureNotRunning(w) } wg := sync.WaitGroup{} @@ -78,7 +88,11 @@ func TestJobAPIServer(t *testing.T) { w := httptest.NewRecorder() r := httptest.NewRequest("GET", "/api/v1/jobs/job1/status", nil) jobAPISrv.ServeHTTP(w, r) - return w.Code == http.StatusNotFound + if w.Code/100 != 2 { + ensureNotRunning(w) + return true + } + return false }, time.Second, time.Millisecond*100) stoppedJobs <- "job2" @@ -86,7 +100,11 @@ func TestJobAPIServer(t *testing.T) { w := httptest.NewRecorder() r := httptest.NewRequest("GET", "/api/v1/jobs/job2/status", nil) jobAPISrv.ServeHTTP(w, r) - return w.Code == http.StatusNotFound + if w.Code/100 != 2 { + ensureNotRunning(w) + return true + } + return false }, time.Second, time.Millisecond*100) close(stoppedJobs) diff --git a/engine/executor/server.go b/engine/executor/server.go index c39c345bbfc..40c76a36b8a 100644 --- a/engine/executor/server.go +++ b/engine/executor/server.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/tiflow/engine/pkg/deps" "github.com/pingcap/tiflow/engine/pkg/externalresource/broker" metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model" + "github.com/pingcap/tiflow/engine/pkg/openapi" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" "github.com/pingcap/tiflow/engine/pkg/p2p" "github.com/pingcap/tiflow/engine/pkg/promutil" @@ -578,7 +579,7 @@ func (s *Server) startTCPService(ctx context.Context, wg *errgroup.Group) error mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) mux.Handle("/metrics", promutil.HTTPHandlerForMetric()) - mux.Handle(jobAPIPrefix, s.jobAPISrv) + mux.Handle(openapi.JobAPIPrefix, s.jobAPISrv) httpSrv := &http.Server{ Handler: mux, diff --git a/engine/framework/internal/master/worker_creator.go b/engine/framework/internal/master/worker_creator.go index 1fe993a0319..299b58c40ac 100644 --- a/engine/framework/internal/master/worker_creator.go +++ b/engine/framework/internal/master/worker_creator.go @@ -114,7 +114,7 @@ func (c *WorkerCreator) CreateWorker( c.logger.Warn("ScheduleTask returned error", zap.Error(err)) return err } - c.logger.Debug("ScheduleTask succeeded", zap.Any("response", resp)) + c.logger.Info("ScheduleTask succeeded", zap.Any("response", resp)) executorID := model.ExecutorID(resp.ExecutorId) executorClient, err := c.executorGroup.GetExecutorClientB(ctx, executorID) diff --git a/engine/jobmaster/dm/dm_jobmaster.go b/engine/jobmaster/dm/dm_jobmaster.go index 731fbae21e0..04b398c294b 100644 --- a/engine/jobmaster/dm/dm_jobmaster.go +++ b/engine/jobmaster/dm/dm_jobmaster.go @@ -158,6 +158,7 @@ func (jm *JobMaster) OnMasterRecovered(ctx context.Context) error { } // Tick implements JobMasterImpl.Tick +// Do not do heavy work in Tick, it will block the message processing. func (jm *JobMaster) Tick(ctx context.Context) error { jm.workerManager.Tick(ctx) jm.taskManager.Tick(ctx) diff --git a/engine/jobmaster/dm/openapi.go b/engine/jobmaster/dm/openapi.go index c0f09c0a216..de77a9ea9ba 100644 --- a/engine/jobmaster/dm/openapi.go +++ b/engine/jobmaster/dm/openapi.go @@ -33,16 +33,16 @@ import ( const errCodePrefix = "DM:Err" func (jm *JobMaster) initOpenAPI(router *gin.RouterGroup) { + router.Use(httpErrorHandler()) + router.Use(func(c *gin.Context) { if !jm.initialized.Load() { - c.AbortWithStatus(http.StatusServiceUnavailable) + _ = c.Error(errors.ErrJobNotRunning.GenWithStackByArgs(jm.ID())) + c.Abort() return } - c.Next() }) - router.Use(httpErrorHandler()) - wrapper := openapi.ServerInterfaceWrapper{ Handler: jm, } diff --git a/engine/jobmaster/dm/openapi_test.go b/engine/jobmaster/dm/openapi_test.go index 27ff3edad39..c581ebb16f5 100644 --- a/engine/jobmaster/dm/openapi_test.go +++ b/engine/jobmaster/dm/openapi_test.go @@ -465,7 +465,10 @@ func (t *testDMOpenAPISuite) TestJobMasterNotInitialized() { w := httptest.NewRecorder() r := httptest.NewRequest("GET", baseURL+"config", nil) t.engine.ServeHTTP(w, r) - require.Equal(t.T(), http.StatusServiceUnavailable, w.Code) + require.Equal(t.T(), errors.HTTPStatusCode(errors.ErrJobNotRunning), w.Code) + var httpErr engineOpenAPI.HTTPError + require.NoError(t.T(), json.Unmarshal(w.Body.Bytes(), &httpErr)) + require.Equal(t.T(), string(errors.ErrJobNotRunning.RFCCode()), httpErr.Code) } func equalError(t *testing.T, expected string, body *bytes.Buffer) { diff --git a/engine/servermaster/jobmanager.go b/engine/servermaster/jobmanager.go index d8f3301a36c..0807712006c 100644 --- a/engine/servermaster/jobmanager.go +++ b/engine/servermaster/jobmanager.go @@ -302,10 +302,7 @@ func (jm *JobManagerImpl) CreateJob(ctx context.Context, req *pb.CreateJobReques // CreateWorker here is to create job master actually // TODO: use correct worker cost - workerID, err := jm.BaseMaster.CreateWorker( - meta.Type, meta, - framework.CreateWorkerWithCost(defaultJobMasterCost), - framework.CreateWorkerWithSelectors(selectors...)) + workerID, err := jm.frameworkCreateWorker(meta) if err != nil { err2 := metadata.DeleteMasterMeta(ctx, jm.frameMetaClient, meta.ID) if err2 != nil { @@ -620,8 +617,7 @@ func (jm *JobManagerImpl) Tick(ctx context.Context) error { if !jm.JobBackoffMgr.Allow(job.ID) { return "", errors.ErrMasterCreateWorkerBackoff.FastGenByArgs() } - return jm.BaseMaster.CreateWorker( - job.Type, job, framework.CreateWorkerWithCost(defaultJobMasterCost)) + return jm.frameworkCreateWorker(job) }) if _, err = filterQuotaError(err); err != nil { return err @@ -647,8 +643,7 @@ func (jm *JobManagerImpl) Tick(ctx context.Context) error { } err = jm.JobFsm.IterWaitAckJobs( func(job *frameModel.MasterMeta) (string, error) { - return jm.BaseMaster.CreateWorker( - job.Type, job, framework.CreateWorkerWithCost(defaultJobMasterCost)) + return jm.frameworkCreateWorker(job) }) exceedQuota, err := filterQuotaError(err) if err != nil { @@ -840,6 +835,13 @@ func (jm *JobManagerImpl) bgJobOperatorLoop(ctx context.Context) { }) } +func (jm *JobManagerImpl) frameworkCreateWorker(job *frameModel.MasterMeta) (string, error) { + return jm.BaseMaster.CreateWorker( + job.Type, job, framework.CreateWorkerWithCost(defaultJobMasterCost), + framework.CreateWorkerWithSelectors(job.Ext.Selectors...), + ) +} + func (jm *JobManagerImpl) terminateJob( ctx context.Context, errMsg string, jobID string, state frameModel.MasterState, ) error { diff --git a/engine/servermaster/jobmanager_test.go b/engine/servermaster/jobmanager_test.go index 6fd0320d4a1..cfb1e5b11b9 100644 --- a/engine/servermaster/jobmanager_test.go +++ b/engine/servermaster/jobmanager_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tiflow/engine/servermaster/jobop" jobopMock "github.com/pingcap/tiflow/engine/servermaster/jobop/mock" "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/label" "github.com/pingcap/tiflow/pkg/notify" "github.com/pingcap/tiflow/pkg/uuid" "github.com/stretchr/testify/mock" @@ -666,6 +667,20 @@ func TestJobOperatorBgLoop(t *testing.T) { require.NoError(t, mgr.wg.Wait()) } +// TODO: refine the interface of JobManager and use mock JobManager in test +func dispatchJobAndMeetError( + ctx context.Context, t *testing.T, mgr *JobManagerImpl, meta *frameModel.MasterMeta, +) { + err := mgr.frameMetaClient.UpsertJob(ctx, meta) + require.NoError(t, err) + + // dispatch job, meet error and move it to pending job list + mgr.JobFsm.JobDispatched(&frameModel.MasterMeta{ID: meta.ID}, false) + require.NotNil(t, mgr.QueryJob(meta.ID)) + mockHandle := &framework.MockHandle{WorkerID: meta.ID} + mgr.JobFsm.JobOffline(mockHandle, true /* needFailover */) +} + func TestJobManagerIterPendingJobs(t *testing.T) { t.Parallel() @@ -693,20 +708,11 @@ func TestJobManagerIterPendingJobs(t *testing.T) { err := mockMaster.Init(ctx) require.NoError(t, err) - dispatchJobAndMeetError := func(jobID string) { - // save job master meta - meta := &frameModel.MasterMeta{ + newMasterMeta := func(jobID string) *frameModel.MasterMeta { + return &frameModel.MasterMeta{ ID: jobID, State: frameModel.MasterStateInit, } - err = mgr.frameMetaClient.UpsertJob(ctx, meta) - require.NoError(t, err) - - // dispatch job, meet error and move it to pending job list - mgr.JobFsm.JobDispatched(&frameModel.MasterMeta{ID: jobID}, false) - require.NotNil(t, mgr.QueryJob(jobID)) - mockHandle := &framework.MockHandle{WorkerID: jobID} - mgr.JobFsm.JobOffline(mockHandle, true /* needFailover */) } jobMgrTickAndCheckJobState := func(jobID string, state frameModel.MasterState) { @@ -719,7 +725,7 @@ func TestJobManagerIterPendingJobs(t *testing.T) { { jobID := "job-backoff-test-1" - dispatchJobAndMeetError(jobID) + dispatchJobAndMeetError(ctx, t, mgr, newMasterMeta(jobID)) // job is being backoff mockJobOperator.EXPECT().IsJobCanceling(ctx, jobID).Times(1).Return(false) @@ -736,7 +742,7 @@ func TestJobManagerIterPendingJobs(t *testing.T) { { jobID := "job-backoff-test-2" - dispatchJobAndMeetError(jobID) + dispatchJobAndMeetError(ctx, t, mgr, newMasterMeta(jobID)) // job will be terminated because it is canceled mockJobOperator.EXPECT().IsJobCanceling(ctx, jobID).Times(1).Return(true) @@ -744,6 +750,73 @@ func TestJobManagerIterPendingJobs(t *testing.T) { } } +func TestFailoverWithCreateWorkerOpt(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + selectors := []*label.Selector{ + {Key: "name", Target: "executor.*", Op: label.OpRegex}, + {Key: "region", Target: "us-west-2", Op: label.OpEq}, + } + checkOptsFn := func(opts ...framework.CreateWorkerOpt) { + // CreateWorkerOpt: 1 for cost opt, 1 for label selectors + require.Len(t, opts, 2) + } + + masterImpl := framework.NewMockMasterImpl(t, "", "iter-pending-jobs-test") + framework.MockMasterPrepareMeta(ctx, t, masterImpl) + mockMaster := &mockBaseMasterCheckCreateOpts{ + MockMasterImpl: masterImpl, + checkOptsFn: checkOptsFn, + } + ctrl := gomock.NewController(t) + mockBackoffMgr := jobopMock.NewMockBackoffManager(ctrl) + mockJobOperator := jobopMock.NewMockJobOperator(ctrl) + mgr := &JobManagerImpl{ + BaseMaster: mockMaster, + JobFsm: NewJobFsm(), + uuidGen: uuid.NewGenerator(), + frameMetaClient: mockMaster.GetFrameMetaClient(), + jobHTTPClient: jobMock.NewMockNilReturnJobHTTPClient(), + JobBackoffMgr: mockBackoffMgr, + jobOperator: mockJobOperator, + } + mockMaster.Impl = mgr + err := mockMaster.Init(ctx) + require.NoError(t, err) + + { + job := &frameModel.MasterMeta{ + ID: "failover-job-with-label", + State: frameModel.MasterStateInit, + Ext: frameModel.MasterMetaExt{Selectors: selectors}, + } + dispatchJobAndMeetError(ctx, t, mgr, job) + + mockJobOperator.EXPECT().IsJobCanceling(ctx, job.ID).Times(1).Return(false) + mockBackoffMgr.EXPECT().Terminate(job.ID).Times(1).Return(false) + mockBackoffMgr.EXPECT().Allow(job.ID).Times(1).Return(true) + err := mgr.Tick(ctx) + require.NoError(t, err) + } +} + +type mockBaseMasterCheckCreateOpts struct { + *framework.MockMasterImpl + checkOptsFn func(opts ...framework.CreateWorkerOpt) +} + +func (m *mockBaseMasterCheckCreateOpts) CreateWorker( + workerType framework.WorkerType, + config framework.WorkerConfig, + opts ...framework.CreateWorkerOpt, +) (frameModel.WorkerID, error) { + m.checkOptsFn(opts...) + return uuid.NewGenerator().NewString(), nil +} + func TestIsJobTerminated(t *testing.T) { require.False(t, isJobTerminated(frameModel.MasterStateUninit)) require.False(t, isJobTerminated(frameModel.MasterStateInit)) diff --git a/engine/servermaster/jobop/config.go b/engine/servermaster/jobop/config.go index f4c86663588..77fdcf2ea29 100644 --- a/engine/servermaster/jobop/config.go +++ b/engine/servermaster/jobop/config.go @@ -16,14 +16,16 @@ package jobop import "time" const ( - defaultBackoffInitInterval = 10 * time.Second + defaultBackoffInitInterval = 5 * time.Second defaultBackoffMaxInterval = 5 * time.Minute - defaultBackoffMultiplier = 2.0 + defaultBackoffMultiplier = 1.2 // If a job can keep running for more than 10 minutes, it won't be backoff // If a job keeps failing, the max back interval is 5 minutes, and 10 minutes // can keep at least one failed record. defaultBackoffResetInterval = 2 * defaultBackoffMaxInterval - defaultBackoffMaxTryTime = 8 + // backoff for 24 times, from 5s to 300s at most, with 1.2 as multiplier will + // cost approximately 32 minutes. + defaultBackoffMaxTryTime = 24 ) // BackoffConfig is used to configure job backoff diff --git a/go.mod b/go.mod index 45b3212d8e1..451929e81c5 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/pingcap/tiflow go 1.19 require ( - github.com/BurntSushi/toml v1.2.0 + github.com/BurntSushi/toml v1.2.1 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/Shopify/sarama v1.36.0 github.com/VividCortex/mysqlerr v1.0.0 @@ -35,7 +35,7 @@ require ( github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 github.com/google/btree v1.1.2 - github.com/google/go-cmp v0.5.8 + github.com/google/go-cmp v0.5.9 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/google/uuid v1.3.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 @@ -56,17 +56,17 @@ require ( github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/kvproto v0.0.0-20221103025916-e7e21f0e9cd9 - github.com/pingcap/log v1.1.1-0.20221110065318-21a4942860b3 - github.com/pingcap/tidb v1.1.0-beta.0.20221114102155-bdcb85031e07 + github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c + github.com/pingcap/tidb v1.1.0-beta.0.20221118024355-777cb5fbc6ef github.com/pingcap/tidb-tools v6.1.1-0.20220715000306-1d2f00da8c3e+incompatible - github.com/pingcap/tidb/parser v0.0.0-20221114102155-bdcb85031e07 + github.com/pingcap/tidb/parser v0.0.0-20221118024355-777cb5fbc6ef github.com/prometheus/client_golang v1.13.0 github.com/prometheus/client_model v0.2.0 github.com/r3labs/diff v1.1.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/shopspring/decimal v1.3.0 github.com/soheilhy/cmux v0.1.5 - github.com/spf13/cobra v1.5.0 + github.com/spf13/cobra v1.6.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.0 github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14 @@ -80,9 +80,9 @@ require ( github.com/uber-go/atomic v1.4.0 github.com/vmihailenco/msgpack/v5 v5.3.5 github.com/xdg/scram v1.0.3 - go.etcd.io/etcd/api/v3 v3.5.2 - go.etcd.io/etcd/client/pkg/v3 v3.5.2 - go.etcd.io/etcd/client/v3 v3.5.2 + go.etcd.io/etcd/api/v3 v3.5.4 + go.etcd.io/etcd/client/pkg/v3 v3.5.4 + go.etcd.io/etcd/client/v3 v3.5.4 go.etcd.io/etcd/pkg/v3 v3.5.2 go.etcd.io/etcd/raft/v3 v3.5.2 go.etcd.io/etcd/server/v3 v3.5.2 @@ -93,12 +93,12 @@ require ( go.uber.org/multierr v1.8.0 go.uber.org/ratelimit v0.2.0 go.uber.org/zap v1.23.0 - golang.org/x/exp v0.0.0-20220927162542-c76eaa363f9d + golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e golang.org/x/net v0.1.0 - golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 + golang.org/x/sync v0.1.0 golang.org/x/sys v0.1.0 golang.org/x/text v0.4.0 - golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 + golang.org/x/time v0.1.0 google.golang.org/genproto v0.0.0-20220719170305-83ca9fad585f google.golang.org/grpc v1.48.0 google.golang.org/protobuf v1.28.1 @@ -150,7 +150,6 @@ require ( github.com/eapache/queue v1.1.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect - github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-ole/go-ole v1.2.6 // indirect @@ -175,7 +174,7 @@ require ( github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/iancoleman/strcase v0.2.0 // indirect github.com/improbable-eng/grpc-web v0.12.0 // indirect - github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect @@ -228,9 +227,9 @@ require ( github.com/prometheus/procfs v0.8.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20220927061507-ef77025ab5aa // indirect github.com/rivo/uniseg v0.4.2 // indirect - github.com/rogpeppe/go-internal v1.8.0 // indirect + github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/rs/cors v1.7.0 // indirect - github.com/shirou/gopsutil/v3 v3.22.7 // indirect + github.com/shirou/gopsutil/v3 v3.22.9 // indirect github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 // indirect @@ -257,7 +256,7 @@ require ( github.com/xitongsys/parquet-go v1.6.0 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.etcd.io/bbolt v1.3.6 // indirect - go.etcd.io/etcd/client/v2 v2.305.2 // indirect + go.etcd.io/etcd/client/v2 v2.305.4 // indirect go.opencensus.io v0.23.0 // indirect go.opentelemetry.io/contrib v0.20.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 // indirect @@ -272,7 +271,7 @@ require ( golang.org/x/crypto v0.1.0 // indirect golang.org/x/oauth2 v0.0.0-20220718184931-c8730f7fcb92 // indirect golang.org/x/term v0.1.0 // indirect - golang.org/x/tools v0.1.12 // indirect + golang.org/x/tools v0.2.0 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect google.golang.org/api v0.84.0 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index feb7f0fe2b0..9c71334e738 100644 --- a/go.sum +++ b/go.sum @@ -75,8 +75,8 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0 h1:62Ew5xXg5UCGIXDOM github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0/go.mod h1:eHWhQKXc1Gv1DvWH//UzgWjWFEo0Pp4pH2vBzjBw8Fc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0= -github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak= +github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EFZQ978U7x8IRnstaskI3IysnWY5Ao3QgZUKOXlsAdw= github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w= @@ -337,9 +337,8 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= -github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= -github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/fsouza/fake-gcs-server v1.19.0 h1:XyaGOlqo+R5sjT03x2ymk0xepaQlgwhRLTT2IopW0zA= github.com/fsouza/fake-gcs-server v1.19.0/go.mod h1:JtXHY/QzHhtyIxsNfIuQ+XgHtRb5B/w8nqbL5O8zqo0= github.com/fzipp/gocyclo v0.3.1/go.mod h1:DJHO6AUmbdqj2ET4Z9iArSuwWgYDRryYt2wASxc7x3E= @@ -528,8 +527,9 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= -github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= @@ -644,8 +644,9 @@ github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1: github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= github.com/improbable-eng/grpc-web v0.12.0 h1:GlCS+lMZzIkfouf7CNqY+qqpowdKuJLSLLcKVfM1oLc= github.com/improbable-eng/grpc-web v0.12.0/go.mod h1:6hRR09jOEG81ADP5wCQju1z71g6OL4eEvELdran/3cs= -github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc= +github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/integralist/go-findroot v0.0.0-20160518114804-ac90681525dc h1:4IZpk3M4m6ypx0IlRoEyEyY1gAdicWLMQ0NcG/gBnnA= github.com/integralist/go-findroot v0.0.0-20160518114804-ac90681525dc/go.mod h1:UlaC6ndby46IJz9m/03cZPKKkR9ykeIVBBDE3UDBdJk= github.com/iris-contrib/blackfriday v2.0.0+incompatible/go.mod h1:UzZ2bDEoaSGPbkg6SAB4att1aAwTmVIx/5gCVqeyUdI= @@ -1006,22 +1007,22 @@ github.com/pingcap/log v0.0.0-20210906054005-afc726e70354/go.mod h1:DWQW5jICDR7U github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= -github.com/pingcap/log v1.1.1-0.20221110065318-21a4942860b3 h1:T7e5Low0BU2ZazI2dz2mh3W1qv+w8wtvq1YR8DneA0c= -github.com/pingcap/log v1.1.1-0.20221110065318-21a4942860b3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= +github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c h1:crhkw6DD+07Bg1wYhW5Piw+kYNKZqFQqfC2puUf6gMI= +github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/parser v0.0.0-20210415081931-48e7f467fd74/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d/go.mod h1:7j18ezaWTao2LHOyMlsc2Dg1vW+mDY9dEbPzVyOlaeM= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39cGiv2vwunZkaFA917vVkqDTGSPbbV7z4Oops= github.com/pingcap/tidb v1.1.0-beta.0.20220511160835-98c31070d958/go.mod h1:luW4sIZoLHY3bCWuKqyqk2QgMvF+/M7nWOXf/me0+fY= -github.com/pingcap/tidb v1.1.0-beta.0.20221114102155-bdcb85031e07 h1:sr5oQuyGfx7NkqjjGicwIV1Eg/Xj8GSmDbLV4inBZPI= -github.com/pingcap/tidb v1.1.0-beta.0.20221114102155-bdcb85031e07/go.mod h1:krwRE9b7/TfAcpt8QCbjZkRpJnoxU/hYfZAkApIxY4I= +github.com/pingcap/tidb v1.1.0-beta.0.20221118024355-777cb5fbc6ef h1:1nKkPO3lsvIRTMcrgIhMZnRWesXuk0kmbOVBH/lZxMI= +github.com/pingcap/tidb v1.1.0-beta.0.20221118024355-777cb5fbc6ef/go.mod h1:Qjq/AVThVYEYZYvEl6O2Fckl1tri2yjqCs40mH3AsRI= github.com/pingcap/tidb-dashboard v0.0.0-20220117082709-e8076b5c79ba/go.mod h1:4hk/3owVGWdvI9Kx6yCqqvM1T5PVgwyQNyMQxD3rwfc= github.com/pingcap/tidb-tools v6.1.1-0.20220715000306-1d2f00da8c3e+incompatible h1:ftmrSd7avCEdTOkWx3O0UkS4yTBrlKQweRF8uqz9+No= github.com/pingcap/tidb-tools v6.1.1-0.20220715000306-1d2f00da8c3e+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e/go.mod h1:e1MGCA9Sg3T8jid8PKAEq5eYVuMMCq4n8gJ+Kqp4Plg= github.com/pingcap/tidb/parser v0.0.0-20220511160835-98c31070d958/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI= -github.com/pingcap/tidb/parser v0.0.0-20221114102155-bdcb85031e07 h1:U/f29nLoTYARg2gwQyYzWRGpz6DXUENgI2Mr0eOQAgU= -github.com/pingcap/tidb/parser v0.0.0-20221114102155-bdcb85031e07/go.mod h1:wjvp+T3/T9XYt0nKqGX3Kc1AKuyUcfno6LTc6b2A6ew= +github.com/pingcap/tidb/parser v0.0.0-20221118024355-777cb5fbc6ef h1:1sr0q2bVM1wFS7klWf9Hvu/aZvtj4XDQglsV/CbfI7g= +github.com/pingcap/tidb/parser v0.0.0-20221118024355-777cb5fbc6ef/go.mod h1:wjvp+T3/T9XYt0nKqGX3Kc1AKuyUcfno6LTc6b2A6ew= github.com/pingcap/tipb v0.0.0-20220215045658-d12dec7a7609/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pingcap/tipb v0.0.0-20221020071514-cd933387bcb5 h1:Yoo8j5xQGxjlsC3yt0ndsiAz0WZXED9rzsKmEN0U0DY= github.com/pingcap/tipb v0.0.0-20221020071514-cd933387bcb5/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= @@ -1046,6 +1047,7 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQP1xR9D75/vuwEF3g= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_golang v1.13.0 h1:b71QUfeo5M8gq2+evJdTPfZhYMAU0uKPkyPJ7TPsloU= github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ= @@ -1093,8 +1095,9 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= @@ -1115,8 +1118,8 @@ github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX github.com/shhdgit/testfixtures/v3 v3.6.2-0.20211219171712-c4f264d673d3/go.mod h1:Z0OLtuFJ7Y4yLsVijHK8uq95NjGFlYJy+I00ElAEtUQ= github.com/shirou/gopsutil v3.21.3+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil/v3 v3.21.12/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8ufT6fPQLdJzA= -github.com/shirou/gopsutil/v3 v3.22.7 h1:flKnuCMfUUrO+oAvwAd6GKZgnPzr098VA/UJ14nhJd4= -github.com/shirou/gopsutil/v3 v3.22.7/go.mod h1:s648gW4IywYzUfE/KjXxUsqrqx/T2xO5VqOXxONeRfI= +github.com/shirou/gopsutil/v3 v3.22.9 h1:yibtJhIVEMcdw+tCTbOPiF1VcsuDeTE4utJ8Dm4c5eA= +github.com/shirou/gopsutil/v3 v3.22.9/go.mod h1:bBYl1kjgEJpWpxeHmLI+dVHWtyAwfcmSBLDsp2TNT8A= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shopspring/decimal v1.3.0 h1:KK3gWIXskZ2O1U/JNTisNcvH+jveJxZYrjbTsrbbnh8= github.com/shopspring/decimal v1.3.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= @@ -1153,8 +1156,8 @@ github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tL github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo= github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g= -github.com/spf13/cobra v1.5.0 h1:X+jTBEBqF0bHN+9cSMgmfuvv2VHJ9ezmFNf9Y/XstYU= -github.com/spf13/cobra v1.5.0/go.mod h1:dWXEIy2H428czQCjInthrTRUg7yKbok+2Qi/yBIJoUM= +github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA= +github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= @@ -1303,14 +1306,18 @@ go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= -go.etcd.io/etcd/api/v3 v3.5.2 h1:tXok5yLlKyuQ/SXSjtqHc4uzNaMqZi2XsoSPr/LlJXI= go.etcd.io/etcd/api/v3 v3.5.2/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= -go.etcd.io/etcd/client/pkg/v3 v3.5.2 h1:4hzqQ6hIb3blLyQ8usCU4h3NghkqcsohEQ3o3VetYxE= +go.etcd.io/etcd/api/v3 v3.5.4 h1:OHVyt3TopwtUQ2GKdd5wu3PmmipR4FTwCqoEjSyRdIc= +go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/client/pkg/v3 v3.5.2/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= -go.etcd.io/etcd/client/v2 v2.305.2 h1:ymrVwTkefuqA/rPkSW7/B4ApijbPVefRumkY+stNfS0= +go.etcd.io/etcd/client/pkg/v3 v3.5.4 h1:lrneYvz923dvC14R54XcA7FXoZ3mlGZAgmwhfm7HqOg= +go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v2 v2.305.2/go.mod h1:2D7ZejHVMIfog1221iLSYlQRzrtECw3kz4I4VAQm3qI= -go.etcd.io/etcd/client/v3 v3.5.2 h1:WdnejrUtQC4nCxK0/dLTMqKOB+U5TP/2Ya0BJL+1otA= +go.etcd.io/etcd/client/v2 v2.305.4 h1:Dcx3/MYyfKcPNLpR4VVQUP5KgYrBeJtktBwEKkw08Ao= +go.etcd.io/etcd/client/v2 v2.305.4/go.mod h1:Ud+VUwIi9/uQHOMA+4ekToJ12lTxlv0zB/+DHwTGEbU= go.etcd.io/etcd/client/v3 v3.5.2/go.mod h1:kOOaWFFgHygyT0WlSmL8TJiXmMysO/nNUlEsSsN6W4o= +go.etcd.io/etcd/client/v3 v3.5.4 h1:p83BUL3tAYS0OT/r0qglgc3M1JjhM0diV8DSWAhVXv4= +go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= go.etcd.io/etcd/etcdutl/v3 v3.5.2/go.mod h1:f+KEUNxRzqQGq1Y/SsaDN5cmlOGRWgfE3lXEDi5F1Ys= go.etcd.io/etcd/pkg/v3 v3.5.2 h1:YZUojdoPhOyl5QILYnR8LTUbbNefu/sV4ma+ZMr2tto= go.etcd.io/etcd/pkg/v3 v3.5.2/go.mod h1:zsXz+9D/kijzRiG/UnFGDTyHKcVp0orwiO8iMLAi+k0= @@ -1434,8 +1441,8 @@ golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EH golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw= golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= -golang.org/x/exp v0.0.0-20220927162542-c76eaa363f9d h1:3wgmvnqHUJ8SxiNWwea5NCzTwAVfhTtuV+0ClVFlClc= -golang.org/x/exp v0.0.0-20220927162542-c76eaa363f9d/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= +golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e h1:SkwG94eNiiYJhbeDE018Grw09HIN/KB9NlRmZsrzfWs= +golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -1467,7 +1474,7 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= +golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I= golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1573,8 +1580,9 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180816055513-1c9583448a9c/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1709,8 +1717,9 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs= golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA= +golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -1797,8 +1806,8 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE= +golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index ed32d0b2783..ba2cbf9f076 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -149,8 +149,9 @@ var defaultServerConfig = &ServerConfig{ }, Messages: defaultMessageConfig.Clone(), - Scheduler: NewDefaultSchedulerConfig(), - EnableNewSink: true, + Scheduler: NewDefaultSchedulerConfig(), + EnableNewSink: true, + EnablePullBasedSink: false, }, ClusterID: "default", }