Skip to content

Commit

Permalink
redo(ticdc): replace tableID with Span (#7916)
Browse files Browse the repository at this point in the history
ref #7720
  • Loading branch information
overvenus authored Dec 26, 2022
1 parent a98f447 commit 2a6f44f
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 117 deletions.
30 changes: 15 additions & 15 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ const (
)

type sinkNode struct {
sinkV1 sinkv1.Sink
sinkV2 sinkv2.TableSink
state *tablepb.TableState
tableID model.TableID
sinkV1 sinkv1.Sink
sinkV2 sinkv2.TableSink
state *tablepb.TableState
span tablepb.Span

// atomic operations for model.ResolvedTs
resolvedTs atomic.Value
Expand All @@ -60,7 +60,7 @@ type sinkNode struct {
}

func newSinkNode(
tableID model.TableID,
span tablepb.Span,
sinkV1 sinkv1.Sink,
sinkV2 sinkv2.TableSink,
startTs model.Ts, targetTs model.Ts,
Expand All @@ -74,7 +74,7 @@ func newSinkNode(
sn := &sinkNode{
sinkV1: sinkV1,
sinkV2: sinkV2,
tableID: tableID,
span: span,
state: state,
targetTs: targetTs,
barrierTs: startTs,
Expand Down Expand Up @@ -138,7 +138,7 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
// redo log do not support batch resolve mode, hence we
// use `ResolvedMark` to restore a normal resolved ts
resolved = model.NewResolvedTs(resolved.ResolvedMark())
err = n.redoManager.UpdateResolvedTs(ctx, n.tableID, resolved.Ts)
err = n.redoManager.UpdateResolvedTs(ctx, n.span, resolved.Ts)
}

// Flush sink with barrierTs, which is broadcast by owner.
Expand All @@ -147,15 +147,15 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
resolved = model.NewResolvedTs(barrierTs)
}
if n.redoManager != nil && n.redoManager.Enabled() {
redoFlushed := n.redoManager.GetResolvedTs(n.tableID)
redoFlushed := n.redoManager.GetResolvedTs(n.span)
if barrierTs > redoFlushed {
// NOTE: How can barrierTs be greater than redoFlushed?
// When scheduler moves a table from one place to another place, the table
// start position will be checkpointTs instead of resolvedTs, which means
// redoTs can be less than barrierTs.
if n.logLimiter.Allow() {
log.Info("redo flushedTs is less than current barrierTs",
zap.Int64("tableID", n.tableID),
zap.Stringer("span", &n.span),
zap.Uint64("barrierTs", barrierTs),
zap.Uint64("tableRedoFlushed", redoFlushed))
}
Expand All @@ -172,7 +172,7 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er

var checkpoint model.ResolvedTs
if n.sinkV1 != nil {
checkpoint, err = n.sinkV1.FlushRowChangedEvents(ctx, n.tableID, resolved)
checkpoint, err = n.sinkV1.FlushRowChangedEvents(ctx, n.span.TableID, resolved)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -208,7 +208,7 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv

emitRows := func(rows ...*model.RowChangedEvent) error {
if n.redoManager != nil && n.redoManager.Enabled() {
err := n.redoManager.EmitRowChangedEvents(ctx, n.tableID, nil, rows...)
err := n.redoManager.EmitRowChangedEvents(ctx, n.span, nil, rows...)
if err != nil {
return err
}
Expand All @@ -222,7 +222,7 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv

if event == nil || event.Row == nil {
log.Warn("skip emit nil event",
zap.Int64("tableID", n.tableID),
zap.Stringer("span", &n.span),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID),
zap.Any("event", event))
Expand All @@ -236,7 +236,7 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
// Just ignore these row changed events.
if colLen == 0 && preColLen == 0 {
log.Warn("skip emit empty row event",
zap.Int64("tableID", n.tableID),
zap.Stringer("span", &n.span),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID),
zap.Any("event", event))
Expand Down Expand Up @@ -385,15 +385,15 @@ func (n *sinkNode) closeTableSink(ctx context.Context) (err error) {
return
}
log.Info("sinkV1 is closed",
zap.Int64("tableID", n.tableID),
zap.Stringer("span", &n.span),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID))
err = cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
return
}
n.sinkV2.Close(ctx)
log.Info("sinkV2 is closed",
zap.Int64("tableID", n.tableID),
zap.Stringer("span", &n.span),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID))
err = cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
Expand Down
30 changes: 19 additions & 11 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
mocksink "github.com/pingcap/tiflow/cdc/sink/mock"
cerrors "github.com/pingcap/tiflow/pkg/errors"
pmessage "github.com/pingcap/tiflow/pkg/pipeline/message"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -58,7 +59,7 @@ func TestState(t *testing.T) {
state := tablepb.TableStatePrepared
// test stop at targetTs
targetTs := model.Ts(10)
node := newSinkNode(1, mocksink.NewNormalMockSink(), nil,
node := newSinkNode(spanz.TableIDToComparableSpan(1), mocksink.NewNormalMockSink(), nil,
0, targetTs, &mockFlowController{}, redo.NewDisabledManager(),
&state, model.DefaultChangeFeedID("changefeed-id-test-status"), true, true)
require.Equal(t, tablepb.TableStatePrepared, node.State())
Expand Down Expand Up @@ -128,7 +129,7 @@ func TestState(t *testing.T) {

// test the stop at ts command
state = tablepb.TableStatePrepared
node = newSinkNode(1, mocksink.NewNormalMockSink(), nil,
node = newSinkNode(spanz.TableIDToComparableSpan(1), mocksink.NewNormalMockSink(), nil,
0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, model.DefaultChangeFeedID("changefeed-id-test-status"), true, false)
require.Equal(t, tablepb.TableStatePrepared, node.State())
Expand Down Expand Up @@ -168,7 +169,7 @@ func TestState(t *testing.T) {

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
state = tablepb.TableStatePrepared
node = newSinkNode(1, mocksink.NewNormalMockSink(), nil,
node = newSinkNode(spanz.TableIDToComparableSpan(1), mocksink.NewNormalMockSink(), nil,
0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, model.DefaultChangeFeedID("changefeed-id-test-status"), true, false)
require.Equal(t, tablepb.TableStatePrepared, node.State())
Expand Down Expand Up @@ -213,7 +214,7 @@ func TestStopStatus(t *testing.T) {
defer cancel()
state := tablepb.TableStatePrepared
closeCh := make(chan interface{}, 1)
node := newSinkNode(1,
node := newSinkNode(spanz.TableIDToComparableSpan(1),
mocksink.NewMockCloseControlSink(closeCh),
nil, 0, 100,
&mockFlowController{}, redo.NewDisabledManager(), &state,
Expand Down Expand Up @@ -250,7 +251,8 @@ func TestManyTs(t *testing.T) {
defer cancel()
state := tablepb.TableStatePrepared
sink := mocksink.NewNormalMockSink()
node := newSinkNode(1, sink, nil, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
span := spanz.TableIDToComparableSpan(1)
node := newSinkNode(span, sink, nil, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, model.DefaultChangeFeedID("changefeed-id-test"), true, false)
require.Equal(t, tablepb.TableStatePrepared, node.State())

Expand Down Expand Up @@ -409,7 +411,8 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) {
defer cancel()
state := tablepb.TableStatePreparing
sink := mocksink.NewNormalMockSink()
node := newSinkNode(1, sink, nil, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
span := spanz.TableIDToComparableSpan(1)
node := newSinkNode(span, sink, nil, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, model.DefaultChangeFeedID("changefeed-id-test"), true, false)

// empty row, no Columns and PreColumns.
Expand All @@ -428,7 +431,8 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {
defer cancel()
state := tablepb.TableStatePreparing
sink := mocksink.NewNormalMockSink()
node := newSinkNode(1, sink, nil, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
span := spanz.TableIDToComparableSpan(1)
node := newSinkNode(span, sink, nil, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, model.DefaultChangeFeedID("changefeed-id-test"), true, false)

// nil row.
Expand Down Expand Up @@ -483,7 +487,8 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
state := tablepb.TableStatePreparing
sink := mocksink.NewNormalMockSink()
enableOldValue := false
node := newSinkNode(1, sink, nil, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
span := spanz.TableIDToComparableSpan(1)
node := newSinkNode(span, sink, nil, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, model.DefaultChangeFeedID("changefeed-id-test"), enableOldValue, false)

// nil row.
Expand Down Expand Up @@ -615,7 +620,8 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
flowController := &flushFlowController{}
sink := mocksink.NewMockFlushSink()
// sNode is a sinkNode
sNode := newSinkNode(1, sink, nil, 0, 10, flowController, redo.NewDisabledManager(),
span := spanz.TableIDToComparableSpan(1)
sNode := newSinkNode(span, sink, nil, 0, 10, flowController, redo.NewDisabledManager(),
&state, model.DefaultChangeFeedID("changefeed-id-test"), true, false)
sNode.barrierTs = 10

Expand All @@ -637,7 +643,8 @@ func TestSplitTxn(t *testing.T) {
flowController := &flushFlowController{}
sink := mocksink.NewMockFlushSink()
// sNode is a sinkNode
sNode := newSinkNode(1, sink, nil, 0, 10, flowController, redo.NewDisabledManager(),
span := spanz.TableIDToComparableSpan(1)
sNode := newSinkNode(span, sink, nil, 0, 10, flowController, redo.NewDisabledManager(),
&state, model.DefaultChangeFeedID("changefeed-id-test"), true, false)
msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 1,
Expand Down Expand Up @@ -684,7 +691,8 @@ func TestSinkStatsRace(t *testing.T) {
flowController := &flushFlowController{}
sink := mocksink.NewMockFlushSink()
// sNode is a sinkNode
sNode := newSinkNode(1, sink, nil, 0, 10, flowController, redo.NewDisabledManager(),
span := spanz.TableIDToComparableSpan(1)
sNode := newSinkNode(span, sink, nil, 0, 10, flowController, redo.NewDisabledManager(),
&state, model.DefaultChangeFeedID("changefeed-id-test"), true, false)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
}

actorSinkNode := newSinkNode(
t.span.TableID,
t.span,
t.tableSinkV1,
t.tableSinkV2,
t.replicaInfo.StartTs, t.targetTs, flowController, t.redoManager,
Expand Down Expand Up @@ -414,7 +414,7 @@ func (t *tableActor) ResolvedTs() model.Ts {
// another replication barrier for consistent replication instead of reusing
// the global resolved-ts.
if t.redoManager.Enabled() {
return t.redoManager.GetResolvedTs(t.span.TableID)
return t.redoManager.GetResolvedTs(t.span)
}
return t.sortNode.ResolvedTs()
}
Expand Down
6 changes: 3 additions & 3 deletions cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestAsyncStopFailed(t *testing.T) {
state: tablepb.TableStatePreparing,
upstream: upstream.NewUpstream4Test(&mockPD{}),
}
tbl.sinkNode = newSinkNode(1, mocksink.NewNormalMockSink(), nil,
tbl.sinkNode = newSinkNode(spanz.TableIDToComparableSpan(1), mocksink.NewNormalMockSink(), nil,
0, 0, &mockFlowController{}, tbl.redoManager,
&tbl.state, model.DefaultChangeFeedID("changefeed-test"), true, false)
require.True(t, tbl.AsyncStop())
Expand Down Expand Up @@ -101,9 +101,9 @@ func TestTableActorInterface(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
table.redoManager, _ = redo.NewMockManager(ctx)
table.redoManager.AddTable(table.span.TableID, 0)
table.redoManager.AddTable(table.span, 0)
require.Equal(t, model.Ts(0), table.ResolvedTs())
table.redoManager.UpdateResolvedTs(ctx, table.span.TableID, model.Ts(6))
table.redoManager.UpdateResolvedTs(ctx, table.span, model.Ts(6))
require.Eventually(t, func() bool { return table.ResolvedTs() == model.Ts(6) },
time.Second*5, time.Millisecond*500)
table.redoManager.Cleanup(ctx)
Expand Down
8 changes: 4 additions & 4 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (p *processor) AddTableSpan(
p.sinkManager.AddTable(
span.TableID, startTs, p.changefeed.Info.TargetTs)
if p.redoManager.Enabled() {
p.redoManager.AddTable(span.TableID, startTs)
p.redoManager.AddTable(span, startTs)
}
p.sourceManager.AddTable(
ctx.(cdcContext.Context), span.TableID, p.getTableName(ctx, span.TableID), startTs)
Expand Down Expand Up @@ -409,7 +409,7 @@ func (p *processor) IsRemoveTableSpanFinished(span tablepb.Span) (model.Ts, bool
if p.pullBasedSinking {
stats := p.sinkManager.GetTableStats(span.TableID)
if p.redoManager.Enabled() {
p.redoManager.RemoveTable(span.TableID)
p.redoManager.RemoveTable(span)
}
p.sinkManager.RemoveTable(span.TableID)
p.sourceManager.RemoveTable(span.TableID)
Expand Down Expand Up @@ -1167,7 +1167,7 @@ func (p *processor) createTablePipelineImpl(

if p.redoManager.Enabled() {
// FIXME: make span-level replication compatible with redo log.
p.redoManager.AddTable(span.TableID, replicaInfo.StartTs)
p.redoManager.AddTable(span, replicaInfo.StartTs)
}

tableName := p.getTableName(ctx, span.TableID)
Expand Down Expand Up @@ -1214,7 +1214,7 @@ func (p *processor) createTablePipelineImpl(

func (p *processor) removeTable(table tablepb.TablePipeline, span tablepb.Span) {
if p.redoManager.Enabled() {
p.redoManager.RemoveTable(span.TableID)
p.redoManager.RemoveTable(span)
}
if p.pullBasedSinking {
p.sinkManager.RemoveTable(span.TableID)
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats {
var resolvedTs model.Ts
// If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts.
if m.redoManager != nil {
resolvedTs = m.redoManager.GetResolvedTs(tableID)
resolvedTs = m.redoManager.GetResolvedTs(spanz.TableIDToComparableSpan(tableID))
} else {
resolvedTs = tableSink.getReceivedSorterResolvedTs()
}
Expand Down
7 changes: 5 additions & 2 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/pkg/spanz"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -115,7 +116,8 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
zap.Uint64("memory", refundMem))
}
}
err := w.redoManager.EmitRowChangedEvents(ctx, task.tableID, releaseMem, rows...)
err := w.redoManager.EmitRowChangedEvents(
ctx, spanz.TableIDToComparableSpan(task.tableID), releaseMem, rows...)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -127,7 +129,8 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
}
}
if lastTxnCommitTs > emitedCommitTs {
if err := w.redoManager.UpdateResolvedTs(ctx, task.tableID, lastTxnCommitTs); err != nil {
if err := w.redoManager.UpdateResolvedTs(
ctx, spanz.TableIDToComparableSpan(task.tableID), lastTxnCommitTs); err != nil {
return errors.Trace(err)
}
log.Debug("update resolved ts to redo",
Expand Down
Loading

0 comments on commit 2a6f44f

Please sign in to comment.