Skip to content

Commit

Permalink
Merge branch 'master' into refine-engine-IT
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Nov 21, 2022
2 parents dec82af + e6f9696 commit 4d74d2f
Show file tree
Hide file tree
Showing 29 changed files with 515 additions and 332 deletions.
33 changes: 19 additions & 14 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ type processor struct {
sinkV2Factory *factory.SinkFactory

// These fields are used to sinking data in pull-based mode.
sourceManger *sourcemanager.SourceManager
sinkManager *sinkmanager.SinkManager
sourceManager *sourcemanager.SourceManager
sinkManager *sinkmanager.SinkManager

redoManager redo.LogManager

Expand Down Expand Up @@ -211,7 +211,7 @@ func (p *processor) AddTable(
}

if p.pullBasedSinking {
p.sourceManger.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs)
p.sourceManager.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs)
p.sinkManager.AddTable(tableID, startTs, p.changefeed.Info.TargetTs)
if !isPrepare {
p.sinkManager.StartTable(tableID, startTs)
Expand Down Expand Up @@ -434,7 +434,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool
zap.Error(err))
return 0, false
}
p.sourceManger.RemoveTable(tableID)
p.sourceManager.RemoveTable(tableID)
p.sinkManager.RemoveTable(tableID)
log.Info("table removed",
zap.String("captureID", p.captureInfo.ID),
Expand Down Expand Up @@ -530,7 +530,7 @@ func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus {
}

func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats pipeline.Stats) tablepb.Stats {
pullerStats := p.sourceManger.GetTablePullerStats(tableID)
pullerStats := p.sourceManager.GetTablePullerStats(tableID)
now, _ := p.upstream.PDClock.CurrentTime()

stats := tablepb.Stats{
Expand All @@ -554,7 +554,7 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableI
}

// FIXME: add the stats of the sort engine.
//sortStats := p.sourceManger.GetTableSortStats(tableID)
//sortStats := p.sourceManager.GetTableSortStats(tableID)
//stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{
// CheckpointTs: sortStats.CheckpointTsIngress,
// ResolvedTs: sortStats.ResolvedTsIngress,
Expand Down Expand Up @@ -862,9 +862,11 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
p.sourceManger = sourcemanager.New(p.changefeedID, p.upstream, sortEngine, p.errCh)
p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, sortEngine, p.errCh)
sinkManager, err := sinkmanager.New(stdCtx, p.changefeedID, p.changefeed.Info, p.redoManager,
sortEngine, p.mg, p.errCh, p.metricsTableSinkTotalRows)
// Bind them so that sourceManager can notify sinkManager.
p.sourceManager.OnResolve(sinkManager.UpdateReceivedSorterResolvedTs)
if err != nil {
log.Info("Processor creates sink manager",
zap.String("namespace", p.changefeedID.Namespace),
Expand Down Expand Up @@ -1298,19 +1300,22 @@ func (p *processor) Close(ctx cdcContext.Context) error {
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID))
if p.pullBasedSinking {
if err := p.sourceManger.Close(); err != nil {
if err := p.sourceManager.Close(); err != nil {
log.Error("Failed to close source manager",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Error(err))
return errors.Trace(err)
}
if err := p.sinkManager.Close(); err != nil {
log.Error("Failed to close sink manager",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Error(err))
return errors.Trace(err)
if p.sinkManager != nil {
if err := p.sinkManager.Close(); err != nil {
log.Error("Failed to close sink manager",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Error(err))
return errors.Trace(err)
}
p.sinkManager = nil
}
engineFactory := ctx.GlobalVars().SortEngineFactory
if engineFactory != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ type SinkManager struct {
lastBarrierTs atomic.Uint64

// sinkWorkers used to pull data from source manager.
sinkWorkers []sinkWorker
sinkWorkers []*sinkWorker
// sinkTaskChan is used to send tasks to sinkWorkers.
sinkTaskChan chan *sinkTask

// redoWorkers used to pull data from source manager.
redoWorkers []redoWorker
redoWorkers []*redoWorker
// redoTaskChan is used to send tasks to redoWorkers.
redoTaskChan chan *redoTask

Expand Down Expand Up @@ -117,7 +117,7 @@ func New(
sortEngine: sortEngine,

sinkProgressHeap: newTableProgresses(),
sinkWorkers: make([]sinkWorker, 0, sinkWorkerNum),
sinkWorkers: make([]*sinkWorker, 0, sinkWorkerNum),
sinkTaskChan: make(chan *sinkTask),

metricsTableSinkTotalRows: metricsTableSinkTotalRows,
Expand All @@ -126,7 +126,7 @@ func New(
if redoManager != nil {
m.redoManager = redoManager
m.redoProgressHeap = newTableProgresses()
m.redoWorkers = make([]redoWorker, 0, redoWorkerNum)
m.redoWorkers = make([]*redoWorker, 0, redoWorkerNum)
m.redoTaskChan = make(chan *redoTask)
// Use at most 1/3 memory quota for redo event cache.
m.eventCache = newRedoEventCache(changefeedInfo.Config.MemoryQuota / 3)
Expand Down Expand Up @@ -336,6 +336,11 @@ func (m *SinkManager) generateSinkTasks() error {
return m.ctx.Err()
case m.sinkTaskChan <- t:
}

log.Debug("Generate sink task",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
}
}
}
Expand Down Expand Up @@ -398,6 +403,11 @@ func (m *SinkManager) generateRedoTasks() error {
return m.ctx.Err()
case m.redoTaskChan <- t:
}

log.Debug("Generate redo task",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
}
}
}
Expand Down Expand Up @@ -441,6 +451,10 @@ func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs

// StartTable sets the table(TableSink) state to replicating.
func (m *SinkManager) StartTable(tableID model.TableID, startTs model.Ts) {
log.Info("Start table sink",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
tableSink, ok := m.tableSinks.Load(tableID)
if !ok {
log.Panic("Table sink not found when starting table stats",
Expand Down Expand Up @@ -561,7 +575,10 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, erro

// Close closes all workers.
func (m *SinkManager) Close() error {
m.cancel()
if m.cancel != nil {
m.cancel()
m.cancel = nil
}
m.memQuota.close()
err := m.sinkFactory.Close()
if err != nil {
Expand Down
File renamed without changes.
159 changes: 159 additions & 0 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sinkmanager

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/redo"
)

type redoWorker struct {
changefeedID model.ChangeFeedID
mg entry.MounterGroup
sortEngine engine.SortEngine
memQuota *memQuota
redoManager redo.LogManager
eventCache *redoEventCache
splitTxn bool
enableOldValue bool
}

func newRedoWorker(
changefeedID model.ChangeFeedID,
mg entry.MounterGroup,
sortEngine engine.SortEngine,
quota *memQuota,
redoManager redo.LogManager,
eventCache *redoEventCache,
splitTxn bool,
enableOldValue bool,
) *redoWorker {
return &redoWorker{
changefeedID: changefeedID,
mg: mg,
sortEngine: sortEngine,
memQuota: quota,
redoManager: redoManager,
eventCache: eventCache,
splitTxn: splitTxn,
enableOldValue: enableOldValue,
}
}

func (w *redoWorker) handleTasks(ctx context.Context, taskChan <-chan *redoTask) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case task := <-taskChan:
if err := w.handleTask(ctx, task); err != nil {
return errors.Trace(err)
}
}
}
}

func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) error {
rows := make([]*model.RowChangedEvent, 0, 1024)
cache := w.eventCache.getAppender(task.tableID)

// Events are pushed into redoEventCache if possible. Otherwise, their memory will
// be released after they are written into redo files. Then we need to release their
// memory quota, which can be calculated based on batchSize and cachedSize.
batchSize := uint64(0)
cachedSize := uint64(0)

memAllocated := true

var lastPos engine.Position
maybeEmitBatchEvents := func(allFinished, txnFinished bool) error {
if batchSize == 0 || (!allFinished && batchSize < requestMemSize) {
return nil
}

releaseMem := func() { w.memQuota.refund(batchSize - cachedSize) }
err := w.redoManager.EmitRowChangedEvents(ctx, task.tableID, releaseMem, rows...)
if err != nil {
return errors.Trace(err)
}
if lastPos.Valid() {
err = w.redoManager.UpdateResolvedTs(ctx, task.tableID, lastPos.CommitTs)
if err != nil {
return errors.Trace(err)
}
}

rows = rows[0:]
if cap(rows) > 1024 {
rows = make([]*model.RowChangedEvent, 0, 1024)
}
batchSize = 0
cachedSize = 0

if !allFinished {
if !txnFinished {
w.memQuota.forceAcquire(requestMemSize)
} else {
memAllocated = w.memQuota.tryAcquire(requestMemSize)
}
}
return nil
}

// lowerBound and upperBound are both closed intervals.
iter := engine.NewMountedEventIter(
w.sortEngine.FetchByTable(task.tableID, task.lowerBound, task.getUpperBound()),
w.mg, 256)
defer iter.Close()
for memAllocated {
e, pos, err := iter.Next(ctx)
if err != nil {
return errors.Trace(err)
}
if e == nil {
// There is no more data.
if err = maybeEmitBatchEvents(true, true); e != nil {
return errors.Trace(err)
}
return nil
}
if pos.Valid() {
lastPos = pos
}

x, size, err := convertRowChangedEvents(w.changefeedID, task.tableID, w.enableOldValue, e)
if err != nil {
return errors.Trace(err)
}

rows = append(rows, x...)
batchSize += size
if cache.pushBatch(x, size, pos.Valid()) {
cachedSize += size
} else {
cachedSize -= cache.cleanBrokenEvents()
}
if err = maybeEmitBatchEvents(false, pos.Valid()); err != nil {
return errors.Trace(err)
}
}
// Can't allocate memory.
task.callback(lastPos)
return nil
}
Loading

0 comments on commit 4d74d2f

Please sign in to comment.