diff --git a/cdc/owner/async_sink_test.go b/cdc/owner/async_sink_test.go index 5a91821c9d1..833c9bfb603 100644 --- a/cdc/owner/async_sink_test.go +++ b/cdc/owner/async_sink_test.go @@ -66,7 +66,7 @@ func (m *mockSink) Close(ctx context.Context) error { return nil } -func (m *mockSink) Barrier(ctx context.Context) error { +func (m *mockSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index b436df2a050..5c09a6736cf 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -64,8 +64,9 @@ func (s *TableStatus) Store(new TableStatus) { } type sinkNode struct { - sink sink.Sink - status TableStatus + sink sink.Sink + status TableStatus + tableID model.TableID resolvedTs model.Ts checkpointTs model.Ts @@ -78,8 +79,9 @@ type sinkNode struct { flowController tableFlowController } -func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { +func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { return &sinkNode{ + tableID: tableID, sink: sink, status: TableStatusInitializing, targetTs: targetTs, @@ -136,7 +138,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err if err := n.emitRow2Sink(ctx); err != nil { return errors.Trace(err) } - checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, resolvedTs) + checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolvedTs) if err != nil { return errors.Trace(err) } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 2e6b25cce0e..dc87961ca1e 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -76,7 +76,7 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error panic("unreachable") } -func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (s *mockSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { s.received = append(s.received, struct { resolvedTs model.Ts row *model.RowChangedEvent @@ -92,7 +92,7 @@ func (s *mockSink) Close(ctx context.Context) error { return nil } -func (s *mockSink) Barrier(ctx context.Context) error { +func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } @@ -137,7 +137,7 @@ func (s *outputSuite) TestStatus(c *check.C) { }) // test stop at targetTs - node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) + node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -163,7 +163,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.CheckpointTs(), check.Equals, uint64(10)) // test the stop at ts command - node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -186,7 +186,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.CheckpointTs(), check.Equals, uint64(2)) // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts - node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -223,7 +223,7 @@ func (s *outputSuite) TestStopStatus(c *check.C) { }) closeCh := make(chan interface{}, 1) - node := newSinkNode(&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}) + node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, @@ -258,7 +258,7 @@ func (s *outputSuite) TestManyTs(c *check.C) { }, }) sink := &mockSink{} - node := newSinkNode(sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -379,7 +379,7 @@ func (s *outputSuite) TestIgnoreEmptyRowChangeEvent(c *check.C) { }, }) sink := &mockSink{} - node := newSinkNode(sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) // empty row, no Columns and PreColumns. @@ -399,7 +399,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) { }, }) sink := &mockSink{} - node := newSinkNode(sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) // nil row. @@ -458,7 +458,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) { }, }) sink := &mockSink{} - node := newSinkNode(sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) // nil row. diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 09f8823a3b6..296051b37f1 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -194,7 +194,7 @@ func NewTablePipeline(ctx cdcContext.Context, if cyclicEnabled { p.AppendNode(ctx, "cyclic", newCyclicMarkNode(replicaInfo.MarkTableID)) } - tablePipeline.sinkNode = newSinkNode(sink, replicaInfo.StartTs, targetTs, flowController) + tablePipeline.sinkNode = newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController) p.AppendNode(ctx, "sink", tablePipeline.sinkNode) tablePipeline.p = p return tablePipeline diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index fd50fa844a2..b20dc9121d2 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -163,6 +163,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS if err := p.lazyInit(ctx); err != nil { return nil, errors.Trace(err) } + // sink manager will return this checkpointTs to sink node if sink node resolvedTs flush failed + p.sinkManager.UpdateChangeFeedCheckpointTs(state.Info.GetCheckpointTs(state.Status)) if err := p.handleTableOperation(ctx); err != nil { return nil, errors.Trace(err) } diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 0b2d17681b7..f03cc0b17b5 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" tablepipeline "github.com/pingcap/tiflow/cdc/processor/pipeline" + "github.com/pingcap/tiflow/cdc/sink" cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" @@ -47,6 +48,7 @@ func newProcessor4Test( ) *processor { p := newProcessor(ctx) p.lazyInit = func(ctx cdcContext.Context) error { return nil } + p.sinkManager = &sink.Manager{} p.createTablePipeline = createTablePipeline p.schemaStorage = &mockSchemaStorage{c: c} return p diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 42e623422f7..051da83bf05 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -31,7 +31,6 @@ func newBlackHoleSink(ctx context.Context, opts map[string]string) *blackHoleSin type blackHoleSink struct { statistics *Statistics - checkpointTs uint64 accumulated uint64 lastAccumulated uint64 } @@ -46,7 +45,7 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model return nil } -func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs)) err := b.statistics.RecordBatchExecution(func() (int, error) { // TODO: add some random replication latency @@ -56,7 +55,6 @@ func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs ui return int(batchSize), nil }) b.statistics.PrintStatus(ctx) - atomic.StoreUint64(&b.checkpointTs, resolvedTs) return resolvedTs, err } @@ -79,6 +77,6 @@ func (b *blackHoleSink) Close(ctx context.Context) error { return nil } -func (b *blackHoleSink) Barrier(ctx context.Context) error { +func (b *blackHoleSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } diff --git a/cdc/sink/buffer_sink_test.go b/cdc/sink/buffer_sink_test.go new file mode 100644 index 00000000000..e1fe467a0a5 --- /dev/null +++ b/cdc/sink/buffer_sink_test.go @@ -0,0 +1,91 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestTableIsNotFlushed(t *testing.T) { + b := bufferSink{changeFeedCheckpointTs: 1} + require.Equal(t, uint64(1), b.getTableCheckpointTs(2)) + b.UpdateChangeFeedCheckpointTs(3) + require.Equal(t, uint64(3), b.getTableCheckpointTs(2)) +} + +func TestFlushTable(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer func() { + cancel() + }() + b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg)) + require.Equal(t, uint64(5), b.getTableCheckpointTs(2)) + require.Nil(t, b.EmitRowChangedEvents(ctx)) + tbl1 := &model.TableName{TableID: 1} + tbl2 := &model.TableName{TableID: 2} + tbl3 := &model.TableName{TableID: 3} + tbl4 := &model.TableName{TableID: 4} + require.Nil(t, b.EmitRowChangedEvents(ctx, []*model.RowChangedEvent{ + {CommitTs: 6, Table: tbl1}, + {CommitTs: 6, Table: tbl2}, + {CommitTs: 6, Table: tbl3}, + {CommitTs: 6, Table: tbl4}, + {CommitTs: 10, Table: tbl1}, + {CommitTs: 10, Table: tbl2}, + {CommitTs: 10, Table: tbl3}, + {CommitTs: 10, Table: tbl4}, + }...)) + checkpoint, err := b.FlushRowChangedEvents(ctx, 1, 7) + require.True(t, checkpoint <= 7) + require.Nil(t, err) + checkpoint, err = b.FlushRowChangedEvents(ctx, 2, 6) + require.True(t, checkpoint <= 6) + require.Nil(t, err) + checkpoint, err = b.FlushRowChangedEvents(ctx, 3, 8) + require.True(t, checkpoint <= 8) + require.Nil(t, err) + time.Sleep(200 * time.Millisecond) + require.Equal(t, uint64(7), b.getTableCheckpointTs(1)) + require.Equal(t, uint64(6), b.getTableCheckpointTs(2)) + require.Equal(t, uint64(8), b.getTableCheckpointTs(3)) + require.Equal(t, uint64(5), b.getTableCheckpointTs(4)) + b.UpdateChangeFeedCheckpointTs(6) + require.Equal(t, uint64(7), b.getTableCheckpointTs(1)) + require.Equal(t, uint64(6), b.getTableCheckpointTs(2)) + require.Equal(t, uint64(8), b.getTableCheckpointTs(3)) + require.Equal(t, uint64(6), b.getTableCheckpointTs(4)) +} + +func TestFlushFailed(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg)) + checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8) + require.True(t, checkpoint <= 8) + require.Nil(t, err) + time.Sleep(200 * time.Millisecond) + require.Equal(t, uint64(8), b.getTableCheckpointTs(3)) + cancel() + checkpoint, _ = b.FlushRowChangedEvents(ctx, 3, 18) + require.Equal(t, uint64(8), checkpoint) + checkpoint, _ = b.FlushRowChangedEvents(ctx, 1, 18) + require.Equal(t, uint64(5), checkpoint) + time.Sleep(200 * time.Millisecond) + require.Equal(t, uint64(8), b.getTableCheckpointTs(3)) + require.Equal(t, uint64(5), b.getTableCheckpointTs(1)) +} diff --git a/cdc/sink/cdclog/file.go b/cdc/sink/cdclog/file.go index 0a76d1a5fc2..1e0df2b2242 100644 --- a/cdc/sink/cdclog/file.go +++ b/cdc/sink/cdclog/file.go @@ -225,7 +225,7 @@ func (f *fileSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowC return f.emitRowChangedEvents(ctx, newTableStream, rows...) } -func (f *fileSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (f *fileSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { log.Debug("[FlushRowChangedEvents] enter", zap.Uint64("ts", resolvedTs)) return f.flushRowChangedEvents(ctx, resolvedTs) } @@ -349,7 +349,7 @@ func (f *fileSink) Close(ctx context.Context) error { return nil } -func (f *fileSink) Barrier(ctx context.Context) error { +func (f *fileSink) Barrier(ctx context.Context, tableID model.TableID) error { // Barrier does nothing because FlushRowChangedEvents in file sink has flushed // all buffered events forcedlly. return nil diff --git a/cdc/sink/cdclog/s3.go b/cdc/sink/cdclog/s3.go index 2249628358f..dc5a6cf2833 100644 --- a/cdc/sink/cdclog/s3.go +++ b/cdc/sink/cdclog/s3.go @@ -222,7 +222,7 @@ func (s *s3Sink) flushLogMeta(ctx context.Context) error { return cerror.WrapError(cerror.ErrS3SinkWriteStorage, s.storage.WriteFile(ctx, logMetaFile, data)) } -func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { // we should flush all events before resolvedTs, there are two kind of flush policy // 1. flush row events to a s3 chunk: if the event size is not enough, // TODO: when cdc crashed, we should repair these chunks to a complete file @@ -347,7 +347,7 @@ func (s *s3Sink) Close(ctx context.Context) error { return nil } -func (s *s3Sink) Barrier(ctx context.Context) error { +func (s *s3Sink) Barrier(ctx context.Context, tableID model.TableID) error { // Barrier does nothing because FlushRowChangedEvents in s3 sink has flushed // all buffered events forcedlly. return nil diff --git a/cdc/sink/common/common.go b/cdc/sink/common/common.go index cf018a7c8b5..7fff1ec0082 100644 --- a/cdc/sink/common/common.go +++ b/cdc/sink/common/common.go @@ -16,7 +16,6 @@ package common import ( "sort" "sync" - "sync/atomic" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -55,7 +54,6 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) { type UnresolvedTxnCache struct { unresolvedTxnsMu sync.Mutex unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs - checkpointTs uint64 } // NewUnresolvedTxnCache returns a new UnresolvedTxnCache @@ -103,32 +101,27 @@ func (c *UnresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha // Resolved returns resolved txns according to resolvedTs // The returned map contains many txns grouped by tableID. for each table, the each commitTs of txn in txns slice is strictly increasing -func (c *UnresolvedTxnCache) Resolved(resolvedTs uint64) map[model.TableID][]*model.SingleTableTxn { - if resolvedTs <= atomic.LoadUint64(&c.checkpointTs) { - return nil - } - +func (c *UnresolvedTxnCache) Resolved(resolvedTsMap *sync.Map) (map[model.TableID]uint64, map[model.TableID][]*model.SingleTableTxn) { c.unresolvedTxnsMu.Lock() defer c.unresolvedTxnsMu.Unlock() if len(c.unresolvedTxns) == 0 { - return nil + return nil, nil } - _, resolvedTxnsMap := splitResolvedTxn(resolvedTs, c.unresolvedTxns) - return resolvedTxnsMap -} - -// UpdateCheckpoint updates the checkpoint ts -func (c *UnresolvedTxnCache) UpdateCheckpoint(checkpointTs uint64) { - atomic.StoreUint64(&c.checkpointTs, checkpointTs) + return splitResolvedTxn(resolvedTsMap, c.unresolvedTxns) } func splitResolvedTxn( - resolvedTs uint64, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs, -) (minTs uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) { + resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs, +) (flushedResolvedTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) { resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns)) - minTs = resolvedTs + flushedResolvedTsMap = make(map[model.TableID]uint64, len(unresolvedTxns)) for tableID, txns := range unresolvedTxns { + v, ok := resolvedTsMap.Load(tableID) + if !ok { + continue + } + resolvedTs := v.(uint64) i := sort.Search(len(txns), func(i int) bool { return txns[i].commitTs > resolvedTs }) @@ -154,9 +147,7 @@ func splitResolvedTxn( } } resolvedRowsMap[tableID] = resolvedTxns - if len(resolvedTxnsWithTheSameCommitTs) > 0 && resolvedTxnsWithTheSameCommitTs[0].commitTs < minTs { - minTs = resolvedTxnsWithTheSameCommitTs[0].commitTs - } + flushedResolvedTsMap[tableID] = resolvedTs } return } diff --git a/cdc/sink/common/common_test.go b/cdc/sink/common/common_test.go index 4cadba85e56..fb8b7f26e6d 100644 --- a/cdc/sink/common/common_test.go +++ b/cdc/sink/common/common_test.go @@ -15,26 +15,22 @@ package common import ( "sort" + "sync" "testing" "github.com/google/go-cmp/cmp" - "github.com/pingcap/check" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/util/testleak" + "github.com/stretchr/testify/require" ) -type SinkCommonSuite struct{} +func TestSplitResolvedTxn(test *testing.T) { + defer testleak.AfterTestT(test)() -func Test(t *testing.T) { check.TestingT(t) } - -var _ = check.Suite(&SinkCommonSuite{}) - -func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) { - defer testleak.AfterTest(c)() testCases := [][]struct { - input []*model.RowChangedEvent - resolvedTs model.Ts - expected map[model.TableID][]*model.SingleTableTxn + input []*model.RowChangedEvent + resolvedTsMap map[model.TableID]uint64 + expected map[model.TableID][]*model.SingleTableTxn }{{{ // Testing basic transaction collocation, no txns with the same committs input: []*model.RowChangedEvent{ {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, @@ -45,7 +41,10 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) { {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}}, }, - resolvedTs: 6, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(6), + 2: uint64(6), + }, expected: map[model.TableID][]*model.SingleTableTxn{ 1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 5, Rows: []*model.RowChangedEvent{ {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, @@ -59,7 +58,11 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) { input: []*model.RowChangedEvent{ {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}}, }, - resolvedTs: 13, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(13), + 2: uint64(13), + 3: uint64(13), + }, expected: map[model.TableID][]*model.SingleTableTxn{ 1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{ {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, @@ -76,17 +79,24 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) { }}}, }, }}, {{ // Testing the short circuit path - input: []*model.RowChangedEvent{}, - resolvedTs: 6, - expected: nil, + input: []*model.RowChangedEvent{}, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(13), + 2: uint64(13), + 3: uint64(13), + }, + expected: nil, }, { input: []*model.RowChangedEvent{ {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 1}}, {StartTs: 1, CommitTs: 13, Table: &model.TableName{TableID: 2}}, }, - resolvedTs: 6, - expected: map[model.TableID][]*model.SingleTableTxn{}, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(6), + 2: uint64(6), + }, + expected: map[model.TableID][]*model.SingleTableTxn{}, }}, {{ // Testing the txns with the same commitTs input: []*model.RowChangedEvent{ {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, @@ -99,7 +109,10 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) { {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, }, - resolvedTs: 6, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(6), + 2: uint64(6), + }, expected: map[model.TableID][]*model.SingleTableTxn{ 1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 5, Rows: []*model.RowChangedEvent{ {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, @@ -119,7 +132,10 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) { {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, {StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}}, }, - resolvedTs: 13, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(13), + 2: uint64(13), + }, expected: map[model.TableID][]*model.SingleTableTxn{ 1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{ {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, @@ -144,7 +160,11 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) { cache := NewUnresolvedTxnCache() for _, t := range tc { cache.Append(nil, t.input...) - resolved := cache.Resolved(t.resolvedTs) + resolvedTsMap := sync.Map{} + for tableID, ts := range t.resolvedTsMap { + resolvedTsMap.Store(tableID, ts) + } + _, resolved := cache.Resolved(&resolvedTsMap) for tableID, txns := range resolved { sort.Slice(txns, func(i, j int) bool { if txns[i].CommitTs != txns[j].CommitTs { @@ -154,8 +174,7 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) { }) resolved[tableID] = txns } - c.Assert(resolved, check.DeepEquals, t.expected, - check.Commentf("%s", cmp.Diff(resolved, t.expected))) + require.Equal(test, t.expected, resolved, cmp.Diff(resolved, t.expected)) } } } diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 56cf9a8e59e..6fae483b2d0 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -35,10 +35,11 @@ const ( // Manager manages table sinks, maintains the relationship between table sinks and backendSink type Manager struct { - backendSink Sink - checkpointTs model.Ts - tableSinks map[model.TableID]*tableSink - tableSinksMu sync.Mutex + backendSink *bufferSink + tableCheckpointTsMap sync.Map + tableSinks map[model.TableID]*tableSink + tableSinksMu sync.Mutex + changeFeedCheckpointTs uint64 flushMu sync.Mutex flushing int64 @@ -58,7 +59,7 @@ func NewManager( drawbackChan := make(chan drawbackMsg, 16) return &Manager{ backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs, drawbackChan), - checkpointTs: checkpointTs, + changeFeedCheckpointTs: checkpointTs, tableSinks: make(map[model.TableID]*tableSink), drawbackChan: drawbackChan, captureAddr: captureAddr, @@ -87,26 +88,29 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts) // Close closes the Sink manager and backend Sink, this method can be reentrantly called func (m *Manager) Close(ctx context.Context) error { tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID) - return m.backendSink.Close(ctx) + if m.backendSink != nil { + return m.backendSink.Close(ctx) + } + return nil } -func (m *Manager) getMinEmittedTs() model.Ts { +func (m *Manager) getMinEmittedTs(tableID model.TableID) model.Ts { m.tableSinksMu.Lock() defer m.tableSinksMu.Unlock() if len(m.tableSinks) == 0 { - return m.getCheckpointTs() + return m.getCheckpointTs(tableID) } minTs := model.Ts(math.MaxUint64) - for _, tableSink := range m.tableSinks { - emittedTs := tableSink.getEmittedTs() - if minTs > emittedTs { - minTs = emittedTs + for _, tblSink := range m.tableSinks { + resolvedTs := tblSink.getResolvedTs() + if minTs > resolvedTs { + minTs = resolvedTs } } return minTs } -func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) { +func (m *Manager) flushBackendSink(ctx context.Context, tableID model.TableID) (model.Ts, error) { // NOTICE: Because all table sinks will try to flush backend sink, // which will cause a lot of lock contention and blocking in high concurrency cases. // So here we use flushing as a lightweight lock to improve the lock competition problem. @@ -114,19 +118,19 @@ func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) { // Do not skip flushing for resolving #3503. // TODO uncomment the following return. // if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) { - // return m.getCheckpointTs(), nil + // return m.getCheckpointTs(tableID), nil // } m.flushMu.Lock() defer func() { m.flushMu.Unlock() atomic.StoreInt64(&m.flushing, 0) }() - minEmittedTs := m.getMinEmittedTs() - checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, minEmittedTs) + minEmittedTs := m.getMinEmittedTs(tableID) + checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, tableID, minEmittedTs) if err != nil { - return m.getCheckpointTs(), errors.Trace(err) + return m.getCheckpointTs(tableID), errors.Trace(err) } - atomic.StoreUint64(&m.checkpointTs, checkpointTs) + m.tableCheckpointTsMap.Store(tableID, checkpointTs) return checkpointTs, nil } @@ -145,11 +149,24 @@ func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) e return ctx.Err() case <-callback: } - return m.backendSink.Barrier(ctx) + return m.backendSink.Barrier(ctx, tableID) +} + +func (m *Manager) getCheckpointTs(tableID model.TableID) uint64 { + checkPoints, ok := m.tableCheckpointTsMap.Load(tableID) + if ok { + return checkPoints.(uint64) + } + // cannot find table level checkpointTs because of no table level resolvedTs flush task finished successfully, + // for example: first time to flush resolvedTs but cannot get the flush lock, return changefeed level checkpointTs is safe + return atomic.LoadUint64(&m.changeFeedCheckpointTs) } -func (m *Manager) getCheckpointTs() uint64 { - return atomic.LoadUint64(&m.checkpointTs) +func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) { + atomic.StoreUint64(&m.changeFeedCheckpointTs, checkpointTs) + if m.backendSink != nil { + m.backendSink.UpdateChangeFeedCheckpointTs(checkpointTs) + } } type tableSink struct { @@ -176,7 +193,10 @@ func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error return nil } -func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +// FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs +// is required to be no more than global resolvedTs, table barrierTs and table +// redo log watermarkTs. +func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { // Log abnormal checkpoint that is large than resolved ts. logAbnormalCheckpoint := func(ckpt uint64) { if ckpt > resolvedTs { @@ -192,7 +212,7 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64 }) if i == 0 { atomic.StoreUint64(&t.emittedTs, resolvedTs) - ckpt, err := t.manager.flushBackendSink(ctx) + ckpt, err := t.manager.flushBackendSink(ctx, tableID) if err != nil { return ckpt, err } @@ -204,10 +224,10 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64 err := t.manager.backendSink.EmitRowChangedEvents(ctx, resolvedRows...) if err != nil { - return t.manager.getCheckpointTs(), errors.Trace(err) + return t.manager.getCheckpointTs(tableID), errors.Trace(err) } atomic.StoreUint64(&t.emittedTs, resolvedTs) - ckpt, err := t.manager.flushBackendSink(ctx) + ckpt, err := t.manager.flushBackendSink(ctx, tableID) if err != nil { return ckpt, err } @@ -229,8 +249,15 @@ func (t *tableSink) Close(ctx context.Context) error { return t.manager.destroyTableSink(ctx, t.tableID) } +// getResolvedTs returns resolved ts, which means all events before resolved ts +// have been sent to sink manager +func (t *tableSink) getResolvedTs() uint64 { + ts := atomic.LoadUint64(&t.emittedTs) + return ts +} + // Barrier is not used in table sink -func (t *tableSink) Barrier(ctx context.Context) error { +func (t *tableSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } @@ -241,11 +268,12 @@ type drawbackMsg struct { type bufferSink struct { Sink - checkpointTs uint64 - buffer map[model.TableID][]*model.RowChangedEvent - bufferMu sync.Mutex - flushTsChan chan uint64 - drawbackChan chan drawbackMsg + changeFeedCheckpointTs uint64 + tableCheckpointTsMap sync.Map + buffer map[model.TableID][]*model.RowChangedEvent + bufferMu sync.Mutex + flushTsChan chan flushMsg + drawbackChan chan drawbackMsg } func newBufferSink( @@ -254,14 +282,14 @@ func newBufferSink( errCh chan error, checkpointTs model.Ts, drawbackChan chan drawbackMsg, -) Sink { +) *bufferSink { sink := &bufferSink{ Sink: backendSink, // buffer shares the same flow control with table sink - buffer: make(map[model.TableID][]*model.RowChangedEvent), - checkpointTs: checkpointTs, - flushTsChan: make(chan uint64, 128), - drawbackChan: drawbackChan, + buffer: make(map[model.TableID][]*model.RowChangedEvent), + changeFeedCheckpointTs: checkpointTs, + flushTsChan: make(chan flushMsg, 128), + drawbackChan: drawbackChan, } go sink.run(ctx, errCh) return sink @@ -293,8 +321,9 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) { delete(b.buffer, drawback.tableID) b.bufferMu.Unlock() close(drawback.callback) - case resolvedTs := <-b.flushTsChan: + case flushEvent := <-b.flushTsChan: b.bufferMu.Lock() + resolvedTs := flushEvent.resolvedTs // find all rows before resolvedTs and emit to backend sink for tableID, rows := range b.buffer { i := sort.Search(len(rows), func(i int) bool { @@ -321,14 +350,15 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) { b.bufferMu.Unlock() start := time.Now() - checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, resolvedTs) + tableID := flushEvent.tableID + checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, flushEvent.tableID, resolvedTs) if err != nil { if errors.Cause(err) != context.Canceled { errCh <- err } return } - atomic.StoreUint64(&b.checkpointTs, checkpointTs) + b.tableCheckpointTsMap.Store(tableID, checkpointTs) dur := time.Since(start) metricFlushDuration.Observe(dur.Seconds()) @@ -358,11 +388,32 @@ func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Ro return nil } -func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { select { case <-ctx.Done(): - return atomic.LoadUint64(&b.checkpointTs), ctx.Err() - case b.flushTsChan <- resolvedTs: + return b.getTableCheckpointTs(tableID), ctx.Err() + case b.flushTsChan <- flushMsg{ + tableID: tableID, + resolvedTs: resolvedTs, + }: + } + return b.getTableCheckpointTs(tableID), nil +} + +type flushMsg struct { + tableID model.TableID + resolvedTs uint64 +} + +func (b *bufferSink) getTableCheckpointTs(tableID model.TableID) uint64 { + checkPoints, ok := b.tableCheckpointTsMap.Load(tableID) + if ok { + return checkPoints.(uint64) } - return atomic.LoadUint64(&b.checkpointTs), nil + return atomic.LoadUint64(&b.changeFeedCheckpointTs) +} + +// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick +func (b *bufferSink) UpdateChangeFeedCheckpointTs(checkpointTs uint64) { + atomic.StoreUint64(&b.changeFeedCheckpointTs, checkpointTs) } diff --git a/cdc/sink/manager_test.go b/cdc/sink/manager_test.go index 002b476a452..b9ba6a545f2 100644 --- a/cdc/sink/manager_test.go +++ b/cdc/sink/manager_test.go @@ -35,9 +35,17 @@ var _ = check.Suite(&managerSuite{}) type checkSink struct { *check.C - rows []*model.RowChangedEvent + rows map[model.TableID][]*model.RowChangedEvent rowsMu sync.Mutex - lastResolvedTs uint64 + lastResolvedTs map[model.TableID]uint64 +} + +func newCheckSink(c *check.C) *checkSink { + return &checkSink{ + C: c, + rows: make(map[model.TableID][]*model.RowChangedEvent), + lastResolvedTs: make(map[model.TableID]uint64), + } } func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { @@ -47,7 +55,9 @@ func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTab func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { c.rowsMu.Lock() defer c.rowsMu.Unlock() - c.rows = append(c.rows, rows...) + for _, row := range rows { + c.rows[row.Table.TableID] = append(c.rows[row.Table.TableID], row) + } return nil } @@ -55,24 +65,25 @@ func (c *checkSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error panic("unreachable") } -func (c *checkSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (c *checkSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { c.rowsMu.Lock() defer c.rowsMu.Unlock() var newRows []*model.RowChangedEvent - for _, row := range c.rows { - if row.CommitTs <= c.lastResolvedTs { - return c.lastResolvedTs, errors.Errorf("commit-ts(%d) is not greater than lastResolvedTs(%d)", row.CommitTs, c.lastResolvedTs) + rows := c.rows[tableID] + for _, row := range rows { + if row.CommitTs <= c.lastResolvedTs[tableID] { + return c.lastResolvedTs[tableID], errors.Errorf("commit-ts(%d) is not greater than lastResolvedTs(%d)", row.CommitTs, c.lastResolvedTs) } if row.CommitTs > resolvedTs { newRows = append(newRows, row) } } - c.Assert(c.lastResolvedTs, check.LessEqual, resolvedTs) - c.lastResolvedTs = resolvedTs - c.rows = newRows + c.Assert(c.lastResolvedTs[tableID], check.LessEqual, resolvedTs) + c.lastResolvedTs[tableID] = resolvedTs + c.rows[tableID] = newRows - return c.lastResolvedTs, nil + return c.lastResolvedTs[tableID], nil } func (c *checkSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { @@ -83,7 +94,7 @@ func (c *checkSink) Close(ctx context.Context) error { return nil } -func (c *checkSink) Barrier(ctx context.Context) error { +func (c *checkSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } @@ -92,7 +103,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() errCh := make(chan error, 16) - manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "") + manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "") defer manager.Close(ctx) goroutineNum := 10 rowNum := 100 @@ -118,7 +129,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) { for j := 1; j < rowNum; j++ { if rand.Intn(10) == 0 { resolvedTs := lastResolvedTs + uint64(rand.Intn(j-int(lastResolvedTs))) - _, err := tableSink.FlushRowChangedEvents(ctx, resolvedTs) + _, err := tableSink.FlushRowChangedEvents(ctx, model.TableID(i), resolvedTs) c.Assert(err, check.IsNil) lastResolvedTs = resolvedTs } else { @@ -129,7 +140,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) { c.Assert(err, check.IsNil) } } - _, err := tableSink.FlushRowChangedEvents(ctx, uint64(rowNum)) + _, err := tableSink.FlushRowChangedEvents(ctx, model.TableID(i), uint64(rowNum)) c.Assert(err, check.IsNil) }() } @@ -147,7 +158,7 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() errCh := make(chan error, 16) - manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "") + manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "") defer manager.Close(ctx) goroutineNum := 200 var wg sync.WaitGroup @@ -180,7 +191,7 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) { }) c.Assert(err, check.IsNil) } - _, err := sink.FlushRowChangedEvents(ctx, resolvedTs) + _, err := sink.FlushRowChangedEvents(ctx, sink.(*tableSink).tableID, resolvedTs) if err != nil { c.Assert(errors.Cause(err), check.Equals, context.Canceled) } @@ -232,7 +243,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) { defer cancel() errCh := make(chan error, 16) - manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "") + manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "") defer manager.Close(ctx) tableID := int64(49) @@ -242,7 +253,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) { CommitTs: uint64(110), }) c.Assert(err, check.IsNil) - _, err = tableSink.FlushRowChangedEvents(ctx, 110) + _, err = tableSink.FlushRowChangedEvents(ctx, tableID, 110) c.Assert(err, check.IsNil) err = manager.destroyTableSink(ctx, tableID) c.Assert(err, check.IsNil) @@ -253,7 +264,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) { func BenchmarkManagerFlushing(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error, 16) - manager := NewManager(ctx, &checkSink{}, errCh, 0, "", "") + manager := NewManager(ctx, newCheckSink(nil), errCh, 0, "", "") // Init table sinks. goroutineNum := 2000 @@ -293,11 +304,11 @@ func BenchmarkManagerFlushing(b *testing.B) { // All tables are flushed concurrently, except table 0. for i := 1; i < goroutineNum; i++ { i := i - tableSink := tableSinks[i] + tblSink := tableSinks[i] go func() { for j := 1; j < rowNum; j++ { if j%2 == 0 { - _, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(j)) + _, err := tblSink.FlushRowChangedEvents(context.Background(), tblSink.(*tableSink).tableID, uint64(j)) if err != nil { b.Error(err) } @@ -308,9 +319,9 @@ func BenchmarkManagerFlushing(b *testing.B) { b.ResetTimer() // Table 0 flush. - tableSink := tableSinks[0] + tblSink := tableSinks[0] for i := 0; i < b.N; i++ { - _, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(rowNum)) + _, err := tblSink.FlushRowChangedEvents(context.Background(), tblSink.(*tableSink).tableID, uint64(rowNum)) if err != nil { b.Error(err) } @@ -343,7 +354,7 @@ func (e *errorSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error panic("unreachable") } -func (e *errorSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (e *errorSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { return 0, errors.New("error in flush row changed events") } @@ -355,7 +366,7 @@ func (e *errorSink) Close(ctx context.Context) error { return nil } -func (e *errorSink) Barrier(ctx context.Context) error { +func (e *errorSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } @@ -372,7 +383,7 @@ func (s *managerSuite) TestManagerError(c *check.C) { Table: &model.TableName{TableID: 1}, }) c.Assert(err, check.IsNil) - _, err = sink.FlushRowChangedEvents(ctx, 2) + _, err = sink.FlushRowChangedEvents(ctx, 1, 2) c.Assert(err, check.IsNil) err = <-errCh c.Assert(err.Error(), check.Equals, "error in emit row changed events") diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 83b09b5ff88..d9633399982 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -52,7 +52,7 @@ type mqSink struct { resolvedTs uint64 } partitionResolvedTs []uint64 - checkpointTs uint64 + tableCheckpointTs map[model.TableID]uint64 resolvedNotifier *notify.Notifier resolvedReceiver *notify.Receiver @@ -143,6 +143,7 @@ func newMqSink( partitionNum: partitionNum, partitionInput: partitionInput, partitionResolvedTs: make([]uint64, partitionNum), + tableCheckpointTs: make(map[model.TableID]uint64), resolvedNotifier: notifier, resolvedReceiver: resolvedReceiver, @@ -185,9 +186,9 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha return nil } -func (k *mqSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { - if resolvedTs <= k.checkpointTs { - return k.checkpointTs, nil +func (k *mqSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { + if checkpointTs, ok := k.tableCheckpointTs[tableID]; ok && resolvedTs <= checkpointTs { + return checkpointTs, nil } for i := 0; i < int(k.partitionNum); i++ { @@ -220,9 +221,9 @@ flushLoop: if err != nil { return 0, errors.Trace(err) } - k.checkpointTs = resolvedTs + k.tableCheckpointTs[tableID] = resolvedTs k.statistics.PrintStatus(ctx) - return k.checkpointTs, nil + return resolvedTs, nil } func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { @@ -275,7 +276,7 @@ func (k *mqSink) Close(ctx context.Context) error { return errors.Trace(err) } -func (k *mqSink) Barrier(cxt context.Context) error { +func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error { // Barrier does nothing because FlushRowChangedEvents in mq sink has flushed // all buffered events forcedlly. return nil diff --git a/cdc/sink/mq_test.go b/cdc/sink/mq_test.go index 31c43fd6f86..c60e3cb229d 100644 --- a/cdc/sink/mq_test.go +++ b/cdc/sink/mq_test.go @@ -72,10 +72,12 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { // mock kafka broker processes 1 row changed event leader.Returns(prodSuccess) + tableID := model.TableID(1) row := &model.RowChangedEvent{ Table: &model.TableName{ - Schema: "test", - Table: "t1", + Schema: "test", + Table: "t1", + TableID: tableID, }, StartTs: 100, CommitTs: 120, @@ -83,11 +85,11 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { } err = sink.EmitRowChangedEvents(ctx, row) c.Assert(err, check.IsNil) - checkpointTs, err := sink.FlushRowChangedEvents(ctx, uint64(120)) + checkpointTs, err := sink.FlushRowChangedEvents(ctx, tableID, uint64(120)) c.Assert(err, check.IsNil) c.Assert(checkpointTs, check.Equals, uint64(120)) // flush older resolved ts - checkpointTs, err = sink.FlushRowChangedEvents(ctx, uint64(110)) + checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, uint64(110)) c.Assert(err, check.IsNil) c.Assert(checkpointTs, check.Equals, uint64(120)) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index fee490bea30..15e8125887b 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -22,7 +22,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" dmysql "github.com/go-sql-driver/mysql" @@ -92,10 +91,10 @@ type mysqlSink struct { filter *filter.Filter cyclic *cyclic.Cyclic - txnCache *common.UnresolvedTxnCache - workers []*mysqlSinkWorker - resolvedTs uint64 - maxResolvedTs uint64 + txnCache *common.UnresolvedTxnCache + workers []*mysqlSinkWorker + tableCheckpointTs sync.Map + tableMaxResolvedTs sync.Map execWaitNotifier *notify.Notifier resolvedNotifier *notify.Notifier @@ -120,13 +119,11 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row // FlushRowChangedEvents will flushes all received events, we don't allow mysql // sink to receive events before resolving -func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { - if atomic.LoadUint64(&s.maxResolvedTs) < resolvedTs { - atomic.StoreUint64(&s.maxResolvedTs, resolvedTs) +func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { + v, ok := s.tableMaxResolvedTs.Load(tableID) + if !ok || v.(uint64) < resolvedTs { + s.tableMaxResolvedTs.Store(tableID, resolvedTs) } - // resolvedTs can be fallen back, such as a new table is added into this sink - // with a smaller start-ts - atomic.StoreUint64(&s.resolvedTs, resolvedTs) s.resolvedNotifier.Notify() // check and throw error @@ -136,13 +133,7 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64 default: } - checkpointTs := resolvedTs - for _, worker := range s.workers { - workerCheckpointTs := atomic.LoadUint64(&worker.checkpointTs) - if workerCheckpointTs < checkpointTs { - checkpointTs = workerCheckpointTs - } - } + checkpointTs := s.getTableCheckpointTs(tableID) s.statistics.PrintStatus(ctx) return checkpointTs, nil } @@ -159,20 +150,18 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify. return case <-receiver.C: } - resolvedTs := atomic.LoadUint64(&s.resolvedTs) - resolvedTxnsMap := s.txnCache.Resolved(resolvedTs) + flushedResolvedTsMap, resolvedTxnsMap := s.txnCache.Resolved(&s.tableMaxResolvedTs) if len(resolvedTxnsMap) == 0 { - for _, worker := range s.workers { - atomic.StoreUint64(&worker.checkpointTs, resolvedTs) - } - s.txnCache.UpdateCheckpoint(resolvedTs) + s.tableMaxResolvedTs.Range(func(key, value interface{}) bool { + s.tableCheckpointTs.Store(key, value) + return true + }) continue } s.dispatchAndExecTxns(ctx, resolvedTxnsMap) - for _, worker := range s.workers { - atomic.StoreUint64(&worker.checkpointTs, resolvedTs) + for tableID, resolvedTs := range flushedResolvedTsMap { + s.tableCheckpointTs.Store(tableID, resolvedTs) } - s.txnCache.UpdateCheckpoint(resolvedTs) } } @@ -724,7 +713,6 @@ type mysqlSinkWorker struct { execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error metricBucketSize prometheus.Counter receiver *notify.Receiver - checkpointTs uint64 closedCh chan struct{} } @@ -767,10 +755,9 @@ func (w *mysqlSinkWorker) appendFinishTxn(wg *sync.WaitGroup) { func (w *mysqlSinkWorker) run(ctx context.Context) (err error) { var ( - toExecRows []*model.RowChangedEvent - replicaID uint64 - txnNum int - lastCommitTs uint64 + toExecRows []*model.RowChangedEvent + replicaID uint64 + txnNum int ) // mark FinishWg before worker exits, all data txns can be omitted. @@ -808,7 +795,6 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) { txnNum = 0 return err } - atomic.StoreUint64(&w.checkpointTs, lastCommitTs) toExecRows = toExecRows[:0] w.metricBucketSize.Add(float64(txnNum)) txnNum = 0 @@ -838,7 +824,6 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) { } replicaID = txn.ReplicaID toExecRows = append(toExecRows, txn.Rows...) - lastCommitTs = txn.CommitTs txnNum++ case <-w.receiver.C: if err := flushRows(); err != nil { @@ -875,7 +860,7 @@ func (s *mysqlSink) Close(ctx context.Context) error { return cerror.WrapError(cerror.ErrMySQLConnectionError, err) } -func (s *mysqlSink) Barrier(ctx context.Context) error { +func (s *mysqlSink) Barrier(ctx context.Context, tableID model.TableID) error { warnDuration := 3 * time.Minute ticker := time.NewTicker(warnDuration) defer ticker.Stop() @@ -884,15 +869,23 @@ func (s *mysqlSink) Barrier(ctx context.Context) error { case <-ctx.Done(): return errors.Trace(ctx.Err()) case <-ticker.C: + maxResolvedTs, ok := s.tableMaxResolvedTs.Load(tableID) log.Warn("Barrier doesn't return in time, may be stuck", - zap.Uint64("resolved-ts", atomic.LoadUint64(&s.maxResolvedTs)), - zap.Uint64("checkpoint-ts", s.checkpointTs())) + zap.Int64("tableID", tableID), + zap.Bool("has resolvedTs", ok), + zap.Any("resolvedTs", maxResolvedTs), + zap.Uint64("checkpointTs", s.getTableCheckpointTs(tableID))) default: - maxResolvedTs := atomic.LoadUint64(&s.maxResolvedTs) - if s.checkpointTs() >= maxResolvedTs { + v, ok := s.tableMaxResolvedTs.Load(tableID) + if !ok { + log.Info("No table resolvedTs is found", zap.Int64("table-id", tableID)) return nil } - checkpointTs, err := s.FlushRowChangedEvents(ctx, maxResolvedTs) + maxResolvedTs := v.(uint64) + if s.getTableCheckpointTs(tableID) >= maxResolvedTs { + return nil + } + checkpointTs, err := s.FlushRowChangedEvents(ctx, tableID, maxResolvedTs) if err != nil { return err } @@ -905,15 +898,12 @@ func (s *mysqlSink) Barrier(ctx context.Context) error { } } -func (s *mysqlSink) checkpointTs() uint64 { - checkpointTs := atomic.LoadUint64(&s.resolvedTs) - for _, worker := range s.workers { - workerCheckpointTs := atomic.LoadUint64(&worker.checkpointTs) - if workerCheckpointTs < checkpointTs { - checkpointTs = workerCheckpointTs - } +func (s *mysqlSink) getTableCheckpointTs(tableID model.TableID) uint64 { + v, ok := s.tableCheckpointTs.Load(tableID) + if ok { + return v.(uint64) } - return checkpointTs + return uint64(0) } func logDMLTxnErr(err error) error { diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index 29602302f49..3cd3bcb89a4 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -27,9 +27,7 @@ import ( "time" "github.com/DATA-DOG/go-sqlmock" - "github.com/davecgh/go-spew/spew" dmysql "github.com/go-sql-driver/mysql" - "github.com/pingcap/check" "github.com/pingcap/errors" timodel "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -40,21 +38,13 @@ import ( "github.com/pingcap/tiflow/pkg/cyclic/mark" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" - "github.com/pingcap/tiflow/pkg/notify" "github.com/pingcap/tiflow/pkg/retry" - "github.com/pingcap/tiflow/pkg/util/testleak" - "golang.org/x/sync/errgroup" + "github.com/stretchr/testify/require" ) -type MySQLSinkSuite struct{} - -func Test(t *testing.T) { check.TestingT(t) } - -var _ = check.Suite(&MySQLSinkSuite{}) - -func newMySQLSink4Test(ctx context.Context, c *check.C) *mysqlSink { +func newMySQLSink4Test(ctx context.Context, t *testing.T) *mysqlSink { f, err := filter.NewFilter(config.GetDefaultReplicaConfig()) - c.Assert(err, check.IsNil) + require.Nil(t, err) params := defaultParams.Clone() params.batchReplaceEnabled = false return &mysqlSink{ @@ -65,301 +55,7 @@ func newMySQLSink4Test(ctx context.Context, c *check.C) *mysqlSink { } } -func (s MySQLSinkSuite) TestMysqlSinkWorker(c *check.C) { - defer testleak.AfterTest(c)() - testCases := []struct { - txns []*model.SingleTableTxn - expectedOutputRows [][]*model.RowChangedEvent - exportedOutputReplicaIDs []uint64 - maxTxnRow int - }{ - { - txns: []*model.SingleTableTxn{}, - maxTxnRow: 4, - }, { - txns: []*model.SingleTableTxn{ - { - CommitTs: 1, - Rows: []*model.RowChangedEvent{{CommitTs: 1}}, - ReplicaID: 1, - }, - }, - expectedOutputRows: [][]*model.RowChangedEvent{{{CommitTs: 1}}}, - exportedOutputReplicaIDs: []uint64{1}, - maxTxnRow: 2, - }, { - txns: []*model.SingleTableTxn{ - { - CommitTs: 1, - Rows: []*model.RowChangedEvent{{CommitTs: 1}, {CommitTs: 1}, {CommitTs: 1}}, - ReplicaID: 1, - }, - }, - expectedOutputRows: [][]*model.RowChangedEvent{ - {{CommitTs: 1}, {CommitTs: 1}, {CommitTs: 1}}, - }, - exportedOutputReplicaIDs: []uint64{1}, - maxTxnRow: 2, - }, { - txns: []*model.SingleTableTxn{ - { - CommitTs: 1, - Rows: []*model.RowChangedEvent{{CommitTs: 1}, {CommitTs: 1}}, - ReplicaID: 1, - }, - { - CommitTs: 2, - Rows: []*model.RowChangedEvent{{CommitTs: 2}}, - ReplicaID: 1, - }, - { - CommitTs: 3, - Rows: []*model.RowChangedEvent{{CommitTs: 3}, {CommitTs: 3}}, - ReplicaID: 1, - }, - }, - expectedOutputRows: [][]*model.RowChangedEvent{ - {{CommitTs: 1}, {CommitTs: 1}, {CommitTs: 2}}, - {{CommitTs: 3}, {CommitTs: 3}}, - }, - exportedOutputReplicaIDs: []uint64{1, 1}, - maxTxnRow: 4, - }, { - txns: []*model.SingleTableTxn{ - { - CommitTs: 1, - Rows: []*model.RowChangedEvent{{CommitTs: 1}}, - ReplicaID: 1, - }, - { - CommitTs: 2, - Rows: []*model.RowChangedEvent{{CommitTs: 2}}, - ReplicaID: 2, - }, - { - CommitTs: 3, - Rows: []*model.RowChangedEvent{{CommitTs: 3}}, - ReplicaID: 3, - }, - }, - expectedOutputRows: [][]*model.RowChangedEvent{ - {{CommitTs: 1}}, - {{CommitTs: 2}}, - {{CommitTs: 3}}, - }, - exportedOutputReplicaIDs: []uint64{1, 2, 3}, - maxTxnRow: 4, - }, { - txns: []*model.SingleTableTxn{ - { - CommitTs: 1, - Rows: []*model.RowChangedEvent{{CommitTs: 1}}, - ReplicaID: 1, - }, - { - CommitTs: 2, - Rows: []*model.RowChangedEvent{{CommitTs: 2}, {CommitTs: 2}, {CommitTs: 2}}, - ReplicaID: 1, - }, - { - CommitTs: 3, - Rows: []*model.RowChangedEvent{{CommitTs: 3}}, - ReplicaID: 1, - }, - { - CommitTs: 4, - Rows: []*model.RowChangedEvent{{CommitTs: 4}}, - ReplicaID: 1, - }, - }, - expectedOutputRows: [][]*model.RowChangedEvent{ - {{CommitTs: 1}}, - {{CommitTs: 2}, {CommitTs: 2}, {CommitTs: 2}}, - {{CommitTs: 3}, {CommitTs: 4}}, - }, - exportedOutputReplicaIDs: []uint64{1, 1, 1}, - maxTxnRow: 2, - }, - } - ctx := context.Background() - - notifier := new(notify.Notifier) - for i, tc := range testCases { - cctx, cancel := context.WithCancel(ctx) - var outputRows [][]*model.RowChangedEvent - var outputReplicaIDs []uint64 - receiver, err := notifier.NewReceiver(-1) - c.Assert(err, check.IsNil) - w := newMySQLSinkWorker(tc.maxTxnRow, 1, - bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"), - receiver, - func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error { - outputRows = append(outputRows, events) - outputReplicaIDs = append(outputReplicaIDs, replicaID) - return nil - }) - errg, cctx := errgroup.WithContext(cctx) - errg.Go(func() error { - return w.run(cctx) - }) - for _, txn := range tc.txns { - w.appendTxn(cctx, txn) - } - var wg sync.WaitGroup - w.appendFinishTxn(&wg) - // ensure all txns are fetched from txn channel in sink worker - time.Sleep(time.Millisecond * 100) - notifier.Notify() - wg.Wait() - cancel() - c.Assert(errors.Cause(errg.Wait()), check.Equals, context.Canceled) - c.Assert(outputRows, check.DeepEquals, tc.expectedOutputRows, - check.Commentf("case %v, %s, %s", i, spew.Sdump(outputRows), spew.Sdump(tc.expectedOutputRows))) - c.Assert(outputReplicaIDs, check.DeepEquals, tc.exportedOutputReplicaIDs, - check.Commentf("case %v, %s, %s", i, spew.Sdump(outputReplicaIDs), spew.Sdump(tc.exportedOutputReplicaIDs))) - } -} - -func (s MySQLSinkSuite) TestMySQLSinkWorkerExitWithError(c *check.C) { - defer testleak.AfterTest(c)() - txns1 := []*model.SingleTableTxn{ - { - CommitTs: 1, - Rows: []*model.RowChangedEvent{{CommitTs: 1}}, - }, - { - CommitTs: 2, - Rows: []*model.RowChangedEvent{{CommitTs: 2}}, - }, - { - CommitTs: 3, - Rows: []*model.RowChangedEvent{{CommitTs: 3}}, - }, - { - CommitTs: 4, - Rows: []*model.RowChangedEvent{{CommitTs: 4}}, - }, - } - txns2 := []*model.SingleTableTxn{ - { - CommitTs: 5, - Rows: []*model.RowChangedEvent{{CommitTs: 5}}, - }, - { - CommitTs: 6, - Rows: []*model.RowChangedEvent{{CommitTs: 6}}, - }, - } - maxTxnRow := 1 - ctx := context.Background() - - errExecFailed := errors.New("sink worker exec failed") - notifier := new(notify.Notifier) - cctx, cancel := context.WithCancel(ctx) - receiver, err := notifier.NewReceiver(-1) - c.Assert(err, check.IsNil) - w := newMySQLSinkWorker(maxTxnRow, 1, /*bucket*/ - bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"), - receiver, - func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error { - return errExecFailed - }) - errg, cctx := errgroup.WithContext(cctx) - errg.Go(func() error { - return w.run(cctx) - }) - // txn in txns1 will be sent to worker txnCh - for _, txn := range txns1 { - w.appendTxn(cctx, txn) - } - - // simulate notify sink worker to flush existing txns - var wg sync.WaitGroup - w.appendFinishTxn(&wg) - time.Sleep(time.Millisecond * 100) - // txn in txn2 will be blocked since the worker has exited - for _, txn := range txns2 { - w.appendTxn(cctx, txn) - } - notifier.Notify() - - // simulate sink shutdown and send closed singal to sink worker - w.closedCh <- struct{}{} - w.cleanup() - - // the flush notification wait group should be done - wg.Wait() - - cancel() - c.Assert(errg.Wait(), check.Equals, errExecFailed) -} - -func (s MySQLSinkSuite) TestMySQLSinkWorkerExitCleanup(c *check.C) { - defer testleak.AfterTest(c)() - txns1 := []*model.SingleTableTxn{ - { - CommitTs: 1, - Rows: []*model.RowChangedEvent{{CommitTs: 1}}, - }, - { - CommitTs: 2, - Rows: []*model.RowChangedEvent{{CommitTs: 2}}, - }, - } - txns2 := []*model.SingleTableTxn{ - { - CommitTs: 5, - Rows: []*model.RowChangedEvent{{CommitTs: 5}}, - }, - } - - maxTxnRow := 1 - ctx := context.Background() - - errExecFailed := errors.New("sink worker exec failed") - notifier := new(notify.Notifier) - cctx, cancel := context.WithCancel(ctx) - receiver, err := notifier.NewReceiver(-1) - c.Assert(err, check.IsNil) - w := newMySQLSinkWorker(maxTxnRow, 1, /*bucket*/ - bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"), - receiver, - func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error { - return errExecFailed - }) - errg, cctx := errgroup.WithContext(cctx) - errg.Go(func() error { - err := w.run(cctx) - return err - }) - for _, txn := range txns1 { - w.appendTxn(cctx, txn) - } - - // sleep to let txns flushed by tick - time.Sleep(time.Millisecond * 100) - - // simulate more txns are sent to txnCh after the sink worker run has exited - for _, txn := range txns2 { - w.appendTxn(cctx, txn) - } - var wg sync.WaitGroup - w.appendFinishTxn(&wg) - notifier.Notify() - - // simulate sink shutdown and send closed singal to sink worker - w.closedCh <- struct{}{} - w.cleanup() - - // the flush notification wait group should be done - wg.Wait() - - cancel() - c.Assert(errg.Wait(), check.Equals, errExecFailed) -} - -func (s MySQLSinkSuite) TestPrepareDML(c *check.C) { - defer testleak.AfterTest(c)() +func TestPrepareDML(t *testing.T) { testCases := []struct { input []*model.RowChangedEvent expected *preparedDMLs @@ -394,15 +90,14 @@ func (s MySQLSinkSuite) TestPrepareDML(c *check.C) { }} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ms := newMySQLSink4Test(ctx, c) + ms := newMySQLSink4Test(ctx, t) for i, tc := range testCases { dmls := ms.prepareDMLs(tc.input, 0, 0) - c.Assert(dmls, check.DeepEquals, tc.expected, check.Commentf("%d", i)) + require.Equal(t, tc.expected, dmls, tc.expected, fmt.Sprintf("%d", i)) } } -func (s MySQLSinkSuite) TestPrepareUpdate(c *check.C) { - defer testleak.AfterTest(c)() +func TestPrepareUpdate(t *testing.T) { testCases := []struct { quoteTable string preCols []*model.Column @@ -448,13 +143,12 @@ func (s MySQLSinkSuite) TestPrepareUpdate(c *check.C) { } for _, tc := range testCases { query, args := prepareUpdate(tc.quoteTable, tc.preCols, tc.cols, false) - c.Assert(query, check.Equals, tc.expectedSQL) - c.Assert(args, check.DeepEquals, tc.expectedArgs) + require.Equal(t, tc.expectedSQL, query) + require.Equal(t, tc.expectedArgs, args) } } -func (s MySQLSinkSuite) TestPrepareDelete(c *check.C) { - defer testleak.AfterTest(c)() +func TestPrepareDelete(t *testing.T) { testCases := []struct { quoteTable string preCols []*model.Column @@ -489,13 +183,12 @@ func (s MySQLSinkSuite) TestPrepareDelete(c *check.C) { } for _, tc := range testCases { query, args := prepareDelete(tc.quoteTable, tc.preCols, false) - c.Assert(query, check.Equals, tc.expectedSQL) - c.Assert(args, check.DeepEquals, tc.expectedArgs) + require.Equal(t, tc.expectedSQL, query) + require.Equal(t, tc.expectedArgs, args) } } -func (s MySQLSinkSuite) TestWhereSlice(c *check.C) { - defer testleak.AfterTest(c)() +func TestWhereSlice(t *testing.T) { testCases := []struct { cols []*model.Column forceReplicate bool @@ -574,13 +267,12 @@ func (s MySQLSinkSuite) TestWhereSlice(c *check.C) { } for _, tc := range testCases { colNames, args := whereSlice(tc.cols, tc.forceReplicate) - c.Assert(colNames, check.DeepEquals, tc.expectedColNames) - c.Assert(args, check.DeepEquals, tc.expectedArgs) + require.Equal(t, tc.expectedColNames, colNames) + require.Equal(t, tc.expectedArgs, args) } } -func (s MySQLSinkSuite) TestMapReplace(c *check.C) { - defer testleak.AfterTest(c)() +func TestMapReplace(t *testing.T) { testCases := []struct { quoteTable string cols []*model.Column @@ -614,8 +306,8 @@ func (s MySQLSinkSuite) TestMapReplace(c *check.C) { // multiple times to verify the stability of column sequence in query string for i := 0; i < 10; i++ { query, args := prepareReplace(tc.quoteTable, tc.cols, false, false) - c.Assert(query, check.Equals, tc.expectedQuery) - c.Assert(args, check.DeepEquals, tc.expectedArgs) + require.Equal(t, tc.expectedQuery, query) + require.Equal(t, tc.expectedArgs, args) } } } @@ -626,8 +318,7 @@ func (a sqlArgs) Len() int { return len(a) } func (a sqlArgs) Less(i, j int) bool { return fmt.Sprintf("%s", a[i]) < fmt.Sprintf("%s", a[j]) } func (a sqlArgs) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (s MySQLSinkSuite) TestReduceReplace(c *check.C) { - defer testleak.AfterTest(c)() +func TestReduceReplace(t *testing.T) { testCases := []struct { replaces map[string][][]interface{} batchSize int @@ -732,19 +423,18 @@ func (s MySQLSinkSuite) TestReduceReplace(c *check.C) { sort.Strings(sqls) sort.Sort(sqlArgs(args)) } - c.Assert(sqls, check.DeepEquals, tc.expectSQLs) - c.Assert(args, check.DeepEquals, tc.expectArgs) + require.Equal(t, tc.expectSQLs, sqls) + require.Equal(t, tc.expectArgs, args) } } -func (s MySQLSinkSuite) TestSinkParamsClone(c *check.C) { - defer testleak.AfterTest(c)() +func TestSinkParamsClone(t *testing.T) { param1 := defaultParams.Clone() param2 := param1.Clone() param2.changefeedID = "123" param2.batchReplaceEnabled = false param2.maxTxnRow = 1 - c.Assert(param1, check.DeepEquals, &sinkParams{ + require.Equal(t, &sinkParams{ workerCount: defaultWorkerCount, maxTxnRow: defaultMaxTxnRow, tidbTxnMode: defaultTiDBTxnMode, @@ -754,8 +444,8 @@ func (s MySQLSinkSuite) TestSinkParamsClone(c *check.C) { writeTimeout: defaultWriteTimeout, dialTimeout: defaultDialTimeout, safeMode: defaultSafeMode, - }) - c.Assert(param2, check.DeepEquals, &sinkParams{ + }, param1) + require.Equal(t, &sinkParams{ changefeedID: "123", workerCount: defaultWorkerCount, maxTxnRow: 1, @@ -766,22 +456,20 @@ func (s MySQLSinkSuite) TestSinkParamsClone(c *check.C) { writeTimeout: defaultWriteTimeout, dialTimeout: defaultDialTimeout, safeMode: defaultSafeMode, - }) + }, param2) } -func (s MySQLSinkSuite) TestConfigureSinkURI(c *check.C) { - defer testleak.AfterTest(c)() - +func TestConfigureSinkURI(t *testing.T) { testDefaultParams := func() { db, err := mockTestDB() - c.Assert(err, check.IsNil) + require.Nil(t, err) defer db.Close() dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") - c.Assert(err, check.IsNil) + require.Nil(t, err) params := defaultParams.Clone() dsnStr, err := configureSinkURI(context.TODO(), dsn, params, db) - c.Assert(err, check.IsNil) + require.Nil(t, err) expectedParams := []string{ "tidb_txn_mode=optimistic", "readTimeout=2m", @@ -789,45 +477,45 @@ func (s MySQLSinkSuite) TestConfigureSinkURI(c *check.C) { "allow_auto_random_explicit_insert=1", } for _, param := range expectedParams { - c.Assert(strings.Contains(dsnStr, param), check.IsTrue) + require.True(t, strings.Contains(dsnStr, param)) } - c.Assert(strings.Contains(dsnStr, "time_zone"), check.IsFalse) + require.False(t, strings.Contains(dsnStr, "time_zone")) } testTimezoneParam := func() { db, err := mockTestDB() - c.Assert(err, check.IsNil) + require.Nil(t, err) defer db.Close() dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") - c.Assert(err, check.IsNil) + require.Nil(t, err) params := defaultParams.Clone() params.timezone = `"UTC"` dsnStr, err := configureSinkURI(context.TODO(), dsn, params, db) - c.Assert(err, check.IsNil) - c.Assert(strings.Contains(dsnStr, "time_zone=%22UTC%22"), check.IsTrue) + require.Nil(t, err) + require.True(t, strings.Contains(dsnStr, "time_zone=%22UTC%22")) } testTimeoutParams := func() { db, err := mockTestDB() - c.Assert(err, check.IsNil) + require.Nil(t, err) defer db.Close() dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") - c.Assert(err, check.IsNil) + require.Nil(t, err) uri, err := url.Parse("mysql://127.0.0.1:3306/?read-timeout=4m&write-timeout=5m&timeout=3m") - c.Assert(err, check.IsNil) + require.Nil(t, err) params, err := parseSinkURI(context.TODO(), uri, map[string]string{}) - c.Assert(err, check.IsNil) + require.Nil(t, err) dsnStr, err := configureSinkURI(context.TODO(), dsn, params, db) - c.Assert(err, check.IsNil) + require.Nil(t, err) expectedParams := []string{ "readTimeout=4m", "writeTimeout=5m", "timeout=3m", } for _, param := range expectedParams { - c.Assert(strings.Contains(dsnStr, param), check.IsTrue) + require.True(t, strings.Contains(dsnStr, param)) } } @@ -836,8 +524,7 @@ func (s MySQLSinkSuite) TestConfigureSinkURI(c *check.C) { testTimeoutParams() } -func (s MySQLSinkSuite) TestParseSinkURI(c *check.C) { - defer testleak.AfterTest(c)() +func TestParseSinkURI(t *testing.T) { expected := defaultParams.Clone() expected.workerCount = 64 expected.maxTxnRow = 20 @@ -856,14 +543,13 @@ func (s MySQLSinkSuite) TestParseSinkURI(c *check.C) { OptCaptureAddr: expected.captureAddr, } uri, err := url.Parse(uriStr) - c.Assert(err, check.IsNil) + require.Nil(t, err) params, err := parseSinkURI(context.TODO(), uri, opts) - c.Assert(err, check.IsNil) - c.Assert(params, check.DeepEquals, expected) + require.Nil(t, err) + require.Equal(t, expected, params) } -func (s MySQLSinkSuite) TestParseSinkURITimezone(c *check.C) { - defer testleak.AfterTest(c)() +func TestParseSinkURITimezone(t *testing.T) { uris := []string{ "mysql://127.0.0.1:3306/?time-zone=Asia/Shanghai&worker-count=32", "mysql://127.0.0.1:3306/?time-zone=&worker-count=32", @@ -878,15 +564,14 @@ func (s MySQLSinkSuite) TestParseSinkURITimezone(c *check.C) { opts := map[string]string{} for i, uriStr := range uris { uri, err := url.Parse(uriStr) - c.Assert(err, check.IsNil) + require.Nil(t, err) params, err := parseSinkURI(ctx, uri, opts) - c.Assert(err, check.IsNil) - c.Assert(params.timezone, check.Equals, expected[i]) + require.Nil(t, err) + require.Equal(t, expected[i], params.timezone) } } -func (s MySQLSinkSuite) TestParseSinkURIBadQueryString(c *check.C) { - defer testleak.AfterTest(c)() +func TestParseSinkURIBadQueryString(t *testing.T) { uris := []string{ "", "postgre://127.0.0.1:3306", @@ -904,39 +589,15 @@ func (s MySQLSinkSuite) TestParseSinkURIBadQueryString(c *check.C) { for _, uriStr := range uris { if uriStr != "" { uri, err = url.Parse(uriStr) - c.Assert(err, check.IsNil) + require.Nil(t, err) } else { uri = nil } _, err = parseSinkURI(ctx, uri, opts) - c.Assert(err, check.NotNil) + require.NotNil(t, err) } } -func (s MySQLSinkSuite) TestCheckTiDBVariable(c *check.C) { - defer testleak.AfterTest(c)() - db, mock, err := sqlmock.New() - c.Assert(err, check.IsNil) - defer db.Close() //nolint:errcheck - columns := []string{"Variable_name", "Value"} - - mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( - sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"), - ) - val, err := checkTiDBVariable(context.TODO(), db, "allow_auto_random_explicit_insert", "1") - c.Assert(err, check.IsNil) - c.Assert(val, check.Equals, "1") - - mock.ExpectQuery("show session variables like 'no_exist_variable';").WillReturnError(sql.ErrNoRows) - val, err = checkTiDBVariable(context.TODO(), db, "no_exist_variable", "0") - c.Assert(err, check.IsNil) - c.Assert(val, check.Equals, "") - - mock.ExpectQuery("show session variables like 'version';").WillReturnError(sql.ErrConnDone) - _, err = checkTiDBVariable(context.TODO(), db, "version", "5.7.25-TiDB-v4.0.0") - c.Assert(err, check.ErrorMatches, ".*"+sql.ErrConnDone.Error()) -} - func mockTestDB() (*sql.DB, error) { // mock for test db, which is used querying TiDB session variable db, mock, err := sqlmock.New() @@ -954,9 +615,7 @@ func mockTestDB() (*sql.DB, error) { return db, nil } -func (s MySQLSinkSuite) TestAdjustSQLMode(c *check.C) { - defer testleak.AfterTest(c)() - +func TestAdjustSQLMode(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -968,12 +627,12 @@ func (s MySQLSinkSuite) TestAdjustSQLMode(c *check.C) { if dbIndex == 0 { // test db db, err := mockTestDB() - c.Assert(err, check.IsNil) + require.Nil(t, err) return db, nil } // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - c.Assert(err, check.IsNil) + require.Nil(t, err) mock.ExpectQuery("SELECT @@SESSION.sql_mode;"). WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}). AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE")) @@ -990,7 +649,8 @@ func (s MySQLSinkSuite) TestAdjustSQLMode(c *check.C) { changefeed := "test-changefeed" sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=4") - c.Assert(err, check.IsNil) + require.Nil(t, err) + require.Nil(t, err) rc := config.GetDefaultReplicaConfig() rc.Cyclic = &config.CyclicConfig{ Enable: true, @@ -998,17 +658,17 @@ func (s MySQLSinkSuite) TestAdjustSQLMode(c *check.C) { FilterReplicaID: []uint64{2}, } f, err := filter.NewFilter(rc) - c.Assert(err, check.IsNil) + require.Nil(t, err) cyclicConfig, err := rc.Cyclic.Marshal() - c.Assert(err, check.IsNil) + require.Nil(t, err) opts := map[string]string{ mark.OptCyclicConfig: cyclicConfig, } sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, opts) - c.Assert(err, check.IsNil) + require.Nil(t, err) err = sink.Close(ctx) - c.Assert(err, check.IsNil) + require.Nil(t, err) } type mockUnavailableMySQL struct { @@ -1017,19 +677,19 @@ type mockUnavailableMySQL struct { wg sync.WaitGroup } -func newMockUnavailableMySQL(addr string, c *check.C) *mockUnavailableMySQL { +func newMockUnavailableMySQL(addr string, t *testing.T) *mockUnavailableMySQL { s := &mockUnavailableMySQL{ quit: make(chan interface{}), } l, err := net.Listen("tcp", addr) - c.Assert(err, check.IsNil) + require.Nil(t, err) s.listener = l s.wg.Add(1) - go s.serve(c) + go s.serve(t) return s } -func (s *mockUnavailableMySQL) serve(c *check.C) { +func (s *mockUnavailableMySQL) serve(t *testing.T) { defer s.wg.Done() for { @@ -1039,7 +699,7 @@ func (s *mockUnavailableMySQL) serve(c *check.C) { case <-s.quit: return default: - c.Error(err) + require.Error(t, err) } } else { s.wg.Add(1) @@ -1058,28 +718,24 @@ func (s *mockUnavailableMySQL) Stop() { s.wg.Wait() } -func (s MySQLSinkSuite) TestNewMySQLTimeout(c *check.C) { - defer testleak.AfterTest(c)() - +func TestNewMySQLTimeout(t *testing.T) { addr := "127.0.0.1:33333" - mockMySQL := newMockUnavailableMySQL(addr, c) + mockMySQL := newMockUnavailableMySQL(addr, t) defer mockMySQL.Stop() ctx, cancel := context.WithCancel(context.Background()) defer cancel() changefeed := "test-changefeed" sinkURI, err := url.Parse(fmt.Sprintf("mysql://%s/?read-timeout=2s&timeout=2s", addr)) - c.Assert(err, check.IsNil) + require.Nil(t, err) rc := config.GetDefaultReplicaConfig() f, err := filter.NewFilter(rc) - c.Assert(err, check.IsNil) + require.Nil(t, err) _, err = newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) - c.Assert(errors.Cause(err), check.Equals, driver.ErrBadConn) + require.Equal(t, driver.ErrBadConn, errors.Cause(err)) } -func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { - defer testleak.AfterTest(c)() - +func TestNewMySQLSinkExecDML(t *testing.T) { dbIndex := 0 mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { defer func() { @@ -1088,12 +744,12 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { if dbIndex == 0 { // test db db, err := mockTestDB() - c.Assert(err, check.IsNil) + require.Nil(t, err) return db, nil } // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - c.Assert(err, check.IsNil) + require.Nil(t, err) mock.ExpectBegin() mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)"). WithArgs(1, "test", 2, "test"). @@ -1117,12 +773,12 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { defer cancel() changefeed := "test-changefeed" sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=4") - c.Assert(err, check.IsNil) + require.Nil(t, err) rc := config.GetDefaultReplicaConfig() f, err := filter.NewFilter(rc) - c.Assert(err, check.IsNil) + require.Nil(t, err) sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) - c.Assert(err, check.IsNil) + require.Nil(t, err) rows := []*model.RowChangedEvent{ { @@ -1173,39 +829,38 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { } err = sink.EmitRowChangedEvents(ctx, rows...) - c.Assert(err, check.IsNil) + require.Nil(t, err) + // retry to make sure event is flushed err = retry.Do(context.Background(), func() error { - ts, err := sink.FlushRowChangedEvents(ctx, uint64(2)) - c.Assert(err, check.IsNil) + ts, err := sink.FlushRowChangedEvents(ctx, 1, uint64(2)) + require.Nil(t, err) if ts < uint64(2) { return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 2) } return nil }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError)) - c.Assert(err, check.IsNil) + require.Nil(t, err) err = retry.Do(context.Background(), func() error { - ts, err := sink.FlushRowChangedEvents(ctx, uint64(4)) - c.Assert(err, check.IsNil) + ts, err := sink.FlushRowChangedEvents(ctx, 2, uint64(4)) + require.Nil(t, err) if ts < uint64(4) { return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 4) } return nil }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError)) - c.Assert(err, check.IsNil) + require.Nil(t, err) - err = sink.Barrier(ctx) - c.Assert(err, check.IsNil) + err = sink.Barrier(ctx, 2) + require.Nil(t, err) err = sink.Close(ctx) - c.Assert(err, check.IsNil) + require.Nil(t, err) } -func (s MySQLSinkSuite) TestExecDMLRollbackErrDatabaseNotExists(c *check.C) { - defer testleak.AfterTest(c)() - +func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) { rows := []*model.RowChangedEvent{ { Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, @@ -1233,12 +888,12 @@ func (s MySQLSinkSuite) TestExecDMLRollbackErrDatabaseNotExists(c *check.C) { if dbIndex == 0 { // test db db, err := mockTestDB() - c.Assert(err, check.IsNil) + require.Nil(t, err) return db, nil } // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - c.Assert(err, check.IsNil) + require.Nil(t, err) mock.ExpectBegin() mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). WithArgs(1, 2). @@ -1257,23 +912,21 @@ func (s MySQLSinkSuite) TestExecDMLRollbackErrDatabaseNotExists(c *check.C) { defer cancel() changefeed := "test-changefeed" sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1") - c.Assert(err, check.IsNil) + require.Nil(t, err) rc := config.GetDefaultReplicaConfig() f, err := filter.NewFilter(rc) - c.Assert(err, check.IsNil) + require.Nil(t, err) sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) - c.Assert(err, check.IsNil) + require.Nil(t, err) err = sink.(*mysqlSink).execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */) - c.Assert(errors.Cause(err), check.Equals, errDatabaseNotExists) + require.Equal(t, errDatabaseNotExists, errors.Cause(err)) err = sink.Close(ctx) - c.Assert(err, check.IsNil) + require.Nil(t, err) } -func (s MySQLSinkSuite) TestExecDMLRollbackErrTableNotExists(c *check.C) { - defer testleak.AfterTest(c)() - +func TestExecDMLRollbackErrTableNotExists(t *testing.T) { rows := []*model.RowChangedEvent{ { Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, @@ -1301,12 +954,12 @@ func (s MySQLSinkSuite) TestExecDMLRollbackErrTableNotExists(c *check.C) { if dbIndex == 0 { // test db db, err := mockTestDB() - c.Assert(err, check.IsNil) + require.Nil(t, err) return db, nil } // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - c.Assert(err, check.IsNil) + require.Nil(t, err) mock.ExpectBegin() mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). WithArgs(1, 2). @@ -1325,23 +978,21 @@ func (s MySQLSinkSuite) TestExecDMLRollbackErrTableNotExists(c *check.C) { defer cancel() changefeed := "test-changefeed" sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1") - c.Assert(err, check.IsNil) + require.Nil(t, err) rc := config.GetDefaultReplicaConfig() f, err := filter.NewFilter(rc) - c.Assert(err, check.IsNil) + require.Nil(t, err) sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) - c.Assert(err, check.IsNil) + require.Nil(t, err) err = sink.(*mysqlSink).execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */) - c.Assert(errors.Cause(err), check.Equals, errTableNotExists) + require.Equal(t, errTableNotExists, errors.Cause(err)) err = sink.Close(ctx) - c.Assert(err, check.IsNil) + require.Nil(t, err) } -func (s MySQLSinkSuite) TestExecDMLRollbackErrRetryable(c *check.C) { - defer testleak.AfterTest(c)() - +func TestExecDMLRollbackErrRetryable(t *testing.T) { rows := []*model.RowChangedEvent{ { Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, @@ -1369,12 +1020,12 @@ func (s MySQLSinkSuite) TestExecDMLRollbackErrRetryable(c *check.C) { if dbIndex == 0 { // test db db, err := mockTestDB() - c.Assert(err, check.IsNil) + require.Nil(t, err) return db, nil } // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - c.Assert(err, check.IsNil) + require.Nil(t, err) for i := 0; i < defaultDMLMaxRetryTime; i++ { mock.ExpectBegin() mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). @@ -1395,23 +1046,21 @@ func (s MySQLSinkSuite) TestExecDMLRollbackErrRetryable(c *check.C) { defer cancel() changefeed := "test-changefeed" sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1") - c.Assert(err, check.IsNil) + require.Nil(t, err) rc := config.GetDefaultReplicaConfig() f, err := filter.NewFilter(rc) - c.Assert(err, check.IsNil) + require.Nil(t, err) sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) - c.Assert(err, check.IsNil) + require.Nil(t, err) err = sink.(*mysqlSink).execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */) - c.Assert(errors.Cause(err), check.Equals, errLockDeadlock) + require.Equal(t, errLockDeadlock, errors.Cause(err)) err = sink.Close(ctx) - c.Assert(err, check.IsNil) + require.Nil(t, err) } -func (s MySQLSinkSuite) TestNewMySQLSinkExecDDL(c *check.C) { - defer testleak.AfterTest(c)() - +func TestNewMySQLSinkExecDDL(t *testing.T) { dbIndex := 0 mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { defer func() { @@ -1420,12 +1069,12 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDDL(c *check.C) { if dbIndex == 0 { // test db db, err := mockTestDB() - c.Assert(err, check.IsNil) + require.Nil(t, err) return db, nil } // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - c.Assert(err, check.IsNil) + require.Nil(t, err) mock.ExpectBegin() mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int").WillReturnResult(sqlmock.NewResult(1, 1)) @@ -1450,15 +1099,15 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDDL(c *check.C) { defer cancel() changefeed := "test-changefeed" sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=4") - c.Assert(err, check.IsNil) + require.Nil(t, err) rc := config.GetDefaultReplicaConfig() rc.Filter = &config.FilterConfig{ Rules: []string{"test.t1"}, } f, err := filter.NewFilter(rc) - c.Assert(err, check.IsNil) + require.Nil(t, err) sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) - c.Assert(err, check.IsNil) + require.Nil(t, err) ddl1 := &model.DDLEvent{ StartTs: 1000, @@ -1482,20 +1131,18 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDDL(c *check.C) { } err = sink.EmitDDLEvent(ctx, ddl1) - c.Assert(err, check.IsNil) + require.Nil(t, err) err = sink.EmitDDLEvent(ctx, ddl2) - c.Assert(cerror.ErrDDLEventIgnored.Equal(err), check.IsTrue) + require.True(t, cerror.ErrDDLEventIgnored.Equal(err)) // DDL execute failed, but error can be ignored err = sink.EmitDDLEvent(ctx, ddl1) - c.Assert(err, check.IsNil) + require.Nil(t, err) err = sink.Close(ctx) - c.Assert(err, check.IsNil) + require.Nil(t, err) } -func (s MySQLSinkSuite) TestNewMySQLSink(c *check.C) { - defer testleak.AfterTest(c)() - +func TestNewMySQLSink(t *testing.T) { dbIndex := 0 mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { defer func() { @@ -1504,13 +1151,13 @@ func (s MySQLSinkSuite) TestNewMySQLSink(c *check.C) { if dbIndex == 0 { // test db db, err := mockTestDB() - c.Assert(err, check.IsNil) + require.Nil(t, err) return db, nil } // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) mock.ExpectClose() - c.Assert(err, check.IsNil) + require.Nil(t, err) return db, nil } backupGetDBConn := getDBConnImpl @@ -1524,19 +1171,17 @@ func (s MySQLSinkSuite) TestNewMySQLSink(c *check.C) { changefeed := "test-changefeed" sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=4") - c.Assert(err, check.IsNil) + require.Nil(t, err) rc := config.GetDefaultReplicaConfig() f, err := filter.NewFilter(rc) - c.Assert(err, check.IsNil) + require.Nil(t, err) sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) - c.Assert(err, check.IsNil) + require.Nil(t, err) err = sink.Close(ctx) - c.Assert(err, check.IsNil) + require.Nil(t, err) } -func (s MySQLSinkSuite) TestMySQLSinkClose(c *check.C) { - defer testleak.AfterTest(c)() - +func TestMySQLSinkClose(t *testing.T) { dbIndex := 0 mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { defer func() { @@ -1545,13 +1190,13 @@ func (s MySQLSinkSuite) TestMySQLSinkClose(c *check.C) { if dbIndex == 0 { // test db db, err := mockTestDB() - c.Assert(err, check.IsNil) + require.Nil(t, err) return db, nil } // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) mock.ExpectClose() - c.Assert(err, check.IsNil) + require.Nil(t, err) return db, nil } backupGetDBConn := getDBConnImpl @@ -1564,14 +1209,100 @@ func (s MySQLSinkSuite) TestMySQLSinkClose(c *check.C) { changefeed := "test-changefeed" sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=4") - c.Assert(err, check.IsNil) + require.Nil(t, err) rc := config.GetDefaultReplicaConfig() f, err := filter.NewFilter(rc) - c.Assert(err, check.IsNil) + require.Nil(t, err) // test sink.Close will work correctly even if the ctx pass in has not been cancel sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) - c.Assert(err, check.IsNil) + require.Nil(t, err) + err = sink.Close(ctx) + require.Nil(t, err) +} + +func TestMySQLSinkFlushResovledTs(t *testing.T) { + dbIndex := 0 + mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + dbIndex++ + }() + if dbIndex == 0 { + // test db + db, err := mockTestDB() + require.Nil(t, err) + return db, nil + } + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + mock.ExpectBegin() + mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?)"). + WithArgs(1). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + mock.ExpectBegin() + mock.ExpectExec("REPLACE INTO `s1`.`t2`(`a`) VALUES (?)"). + WithArgs(1). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + mock.ExpectClose() + require.Nil(t, err) + return db, nil + } + backupGetDBConn := getDBConnImpl + getDBConnImpl = mockGetDBConn + defer func() { + getDBConnImpl = backupGetDBConn + }() + + ctx := context.Background() + + changefeed := "test-changefeed" + sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=4") + require.Nil(t, err) + rc := config.GetDefaultReplicaConfig() + f, err := filter.NewFilter(rc) + require.Nil(t, err) + + // test sink.Close will work correctly even if the ctx pass in has not been cancel + si, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) + sink := si.(*mysqlSink) + require.Nil(t, err) + checkpoint, err := sink.FlushRowChangedEvents(ctx, model.TableID(1), 1) + require.Nil(t, err) + require.Equal(t, uint64(0), checkpoint) + rows := []*model.RowChangedEvent{ + { + Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, + CommitTs: 5, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 1}, + }, + }, + } + err = sink.EmitRowChangedEvents(ctx, rows...) + require.Nil(t, err) + checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(1), 6) + require.True(t, checkpoint <= 5) + time.Sleep(500 * time.Millisecond) + require.Nil(t, err) + require.Equal(t, uint64(6), sink.getTableCheckpointTs(model.TableID(1))) + rows = []*model.RowChangedEvent{ + { + Table: &model.TableName{Schema: "s1", Table: "t2", TableID: 2}, + CommitTs: 4, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 1}, + }, + }, + } + err = sink.EmitRowChangedEvents(ctx, rows...) + require.Nil(t, err) + checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), 5) + require.True(t, checkpoint <= 5) + time.Sleep(500 * time.Millisecond) + require.Nil(t, err) + require.Equal(t, uint64(5), sink.getTableCheckpointTs(model.TableID(2))) err = sink.Close(ctx) - c.Assert(err, check.IsNil) + require.Nil(t, err) } diff --git a/cdc/sink/mysql_worker_test.go b/cdc/sink/mysql_worker_test.go new file mode 100644 index 00000000000..03b6b794a0b --- /dev/null +++ b/cdc/sink/mysql_worker_test.go @@ -0,0 +1,362 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/pingcap/errors" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/notify" + "github.com/pingcap/tiflow/pkg/util/testleak" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestMysqlSinkWorker(t *testing.T) { + defer testleak.AfterTestT(t)() + tbl := &model.TableName{ + Schema: "test", + Table: "user", + TableID: 1, + IsPartition: false, + } + testCases := []struct { + txns []*model.SingleTableTxn + expectedOutputRows [][]*model.RowChangedEvent + exportedOutputReplicaIDs []uint64 + maxTxnRow int + }{ + { + txns: []*model.SingleTableTxn{}, + maxTxnRow: 4, + }, { + txns: []*model.SingleTableTxn{ + { + Table: tbl, + CommitTs: 1, + Rows: []*model.RowChangedEvent{{CommitTs: 1}}, + ReplicaID: 1, + }, + }, + expectedOutputRows: [][]*model.RowChangedEvent{{{CommitTs: 1}}}, + exportedOutputReplicaIDs: []uint64{1}, + maxTxnRow: 2, + }, { + txns: []*model.SingleTableTxn{ + { + Table: tbl, + CommitTs: 1, + Rows: []*model.RowChangedEvent{{CommitTs: 1}, {CommitTs: 1}, {CommitTs: 1}}, + ReplicaID: 1, + }, + }, + expectedOutputRows: [][]*model.RowChangedEvent{ + {{CommitTs: 1}, {CommitTs: 1}, {CommitTs: 1}}, + }, + exportedOutputReplicaIDs: []uint64{1}, + maxTxnRow: 2, + }, { + txns: []*model.SingleTableTxn{ + { + Table: tbl, + CommitTs: 1, + Rows: []*model.RowChangedEvent{{CommitTs: 1}, {CommitTs: 1}}, + ReplicaID: 1, + }, + { + Table: tbl, + CommitTs: 2, + Rows: []*model.RowChangedEvent{{CommitTs: 2}}, + ReplicaID: 1, + }, + { + Table: tbl, + CommitTs: 3, + Rows: []*model.RowChangedEvent{{CommitTs: 3}, {CommitTs: 3}}, + ReplicaID: 1, + }, + }, + expectedOutputRows: [][]*model.RowChangedEvent{ + {{CommitTs: 1}, {CommitTs: 1}, {CommitTs: 2}}, + {{CommitTs: 3}, {CommitTs: 3}}, + }, + exportedOutputReplicaIDs: []uint64{1, 1}, + maxTxnRow: 4, + }, { + txns: []*model.SingleTableTxn{ + { + Table: tbl, + CommitTs: 1, + Rows: []*model.RowChangedEvent{{CommitTs: 1}}, + ReplicaID: 1, + }, + { + Table: tbl, + CommitTs: 2, + Rows: []*model.RowChangedEvent{{CommitTs: 2}}, + ReplicaID: 2, + }, + { + Table: tbl, + CommitTs: 3, + Rows: []*model.RowChangedEvent{{CommitTs: 3}}, + ReplicaID: 3, + }, + }, + expectedOutputRows: [][]*model.RowChangedEvent{ + {{CommitTs: 1}}, + {{CommitTs: 2}}, + {{CommitTs: 3}}, + }, + exportedOutputReplicaIDs: []uint64{1, 2, 3}, + maxTxnRow: 4, + }, { + txns: []*model.SingleTableTxn{ + { + Table: tbl, + CommitTs: 1, + Rows: []*model.RowChangedEvent{{CommitTs: 1}}, + ReplicaID: 1, + }, + { + Table: tbl, + CommitTs: 2, + Rows: []*model.RowChangedEvent{{CommitTs: 2}, {CommitTs: 2}, {CommitTs: 2}}, + ReplicaID: 1, + }, + { + Table: tbl, + CommitTs: 3, + Rows: []*model.RowChangedEvent{{CommitTs: 3}}, + ReplicaID: 1, + }, + { + Table: tbl, + CommitTs: 4, + Rows: []*model.RowChangedEvent{{CommitTs: 4}}, + ReplicaID: 1, + }, + }, + expectedOutputRows: [][]*model.RowChangedEvent{ + {{CommitTs: 1}}, + {{CommitTs: 2}, {CommitTs: 2}, {CommitTs: 2}}, + {{CommitTs: 3}, {CommitTs: 4}}, + }, + exportedOutputReplicaIDs: []uint64{1, 1, 1}, + maxTxnRow: 2, + }, + } + ctx := context.Background() + + notifier := new(notify.Notifier) + for i, tc := range testCases { + cctx, cancel := context.WithCancel(ctx) + var outputRows [][]*model.RowChangedEvent + var outputReplicaIDs []uint64 + receiver, err := notifier.NewReceiver(-1) + require.Nil(t, err) + w := newMySQLSinkWorker(tc.maxTxnRow, 1, + bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"), + receiver, + func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error { + outputRows = append(outputRows, events) + outputReplicaIDs = append(outputReplicaIDs, replicaID) + return nil + }) + errg, cctx := errgroup.WithContext(cctx) + errg.Go(func() error { + return w.run(cctx) + }) + for _, txn := range tc.txns { + w.appendTxn(cctx, txn) + } + var wg sync.WaitGroup + w.appendFinishTxn(&wg) + // ensure all txns are fetched from txn channel in sink worker + time.Sleep(time.Millisecond * 100) + notifier.Notify() + wg.Wait() + cancel() + require.Equal(t, context.Canceled, errors.Cause(errg.Wait())) + require.Equal(t, tc.expectedOutputRows, outputRows, + fmt.Sprintf("case %v, %s, %s", i, spew.Sdump(outputRows), spew.Sdump(tc.expectedOutputRows))) + require.Equal(t, tc.exportedOutputReplicaIDs, outputReplicaIDs, tc.exportedOutputReplicaIDs, + fmt.Sprintf("case %v, %s, %s", i, spew.Sdump(outputReplicaIDs), spew.Sdump(tc.exportedOutputReplicaIDs))) + } +} + +func TestMySQLSinkWorkerExitWithError(t *testing.T) { + defer testleak.AfterTestT(t)() + tbl := &model.TableName{ + Schema: "test", + Table: "user", + TableID: 1, + IsPartition: false, + } + txns1 := []*model.SingleTableTxn{ + { + Table: tbl, + CommitTs: 1, + Rows: []*model.RowChangedEvent{{CommitTs: 1}}, + }, + { + Table: tbl, + CommitTs: 2, + Rows: []*model.RowChangedEvent{{CommitTs: 2}}, + }, + { + Table: tbl, + CommitTs: 3, + Rows: []*model.RowChangedEvent{{CommitTs: 3}}, + }, + { + Table: tbl, + CommitTs: 4, + Rows: []*model.RowChangedEvent{{CommitTs: 4}}, + }, + } + txns2 := []*model.SingleTableTxn{ + { + Table: tbl, + CommitTs: 5, + Rows: []*model.RowChangedEvent{{CommitTs: 5}}, + }, + { + Table: tbl, + CommitTs: 6, + Rows: []*model.RowChangedEvent{{CommitTs: 6}}, + }, + } + maxTxnRow := 1 + ctx := context.Background() + + errExecFailed := errors.New("sink worker exec failed") + notifier := new(notify.Notifier) + cctx, cancel := context.WithCancel(ctx) + receiver, err := notifier.NewReceiver(-1) + require.Nil(t, err) + w := newMySQLSinkWorker(maxTxnRow, 1, /*bucket*/ + bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"), + receiver, + func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error { + return errExecFailed + }) + errg, cctx := errgroup.WithContext(cctx) + errg.Go(func() error { + return w.run(cctx) + }) + // txn in txns1 will be sent to worker txnCh + for _, txn := range txns1 { + w.appendTxn(cctx, txn) + } + + // simulate notify sink worker to flush existing txns + var wg sync.WaitGroup + w.appendFinishTxn(&wg) + time.Sleep(time.Millisecond * 100) + // txn in txn2 will be blocked since the worker has exited + for _, txn := range txns2 { + w.appendTxn(cctx, txn) + } + notifier.Notify() + + // simulate sink shutdown and send closed singal to sink worker + w.closedCh <- struct{}{} + w.cleanup() + + // the flush notification wait group should be done + wg.Wait() + + cancel() + require.Equal(t, errExecFailed, errg.Wait()) +} + +func TestMySQLSinkWorkerExitCleanup(t *testing.T) { + defer testleak.AfterTestT(t)() + tbl := &model.TableName{ + Schema: "test", + Table: "user", + TableID: 1, + IsPartition: false, + } + txns1 := []*model.SingleTableTxn{ + { + Table: tbl, + CommitTs: 1, + Rows: []*model.RowChangedEvent{{CommitTs: 1}}, + }, + { + Table: tbl, + CommitTs: 2, + Rows: []*model.RowChangedEvent{{CommitTs: 2}}, + }, + } + txns2 := []*model.SingleTableTxn{ + { + Table: tbl, + CommitTs: 5, + Rows: []*model.RowChangedEvent{{CommitTs: 5}}, + }, + } + + maxTxnRow := 1 + ctx := context.Background() + + errExecFailed := errors.New("sink worker exec failed") + notifier := new(notify.Notifier) + cctx, cancel := context.WithCancel(ctx) + receiver, err := notifier.NewReceiver(-1) + require.Nil(t, err) + w := newMySQLSinkWorker(maxTxnRow, 1, /*bucket*/ + bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"), + receiver, + func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error { + return errExecFailed + }) + errg, cctx := errgroup.WithContext(cctx) + errg.Go(func() error { + err := w.run(cctx) + return err + }) + for _, txn := range txns1 { + w.appendTxn(cctx, txn) + } + + // sleep to let txns flushed by tick + time.Sleep(time.Millisecond * 100) + + // simulate more txns are sent to txnCh after the sink worker run has exited + for _, txn := range txns2 { + w.appendTxn(cctx, txn) + } + var wg sync.WaitGroup + w.appendFinishTxn(&wg) + notifier.Notify() + + // simulate sink shutdown and send closed singal to sink worker + w.closedCh <- struct{}{} + w.cleanup() + + // the flush notification wait group should be done + wg.Wait() + + cancel() + require.Equal(t, errExecFailed, errg.Wait()) +} diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go index 4bfd5abf7ae..3eec6494158 100644 --- a/cdc/sink/simple_mysql_tester.go +++ b/cdc/sink/simple_mysql_tester.go @@ -185,7 +185,7 @@ func (s *simpleMySQLSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) // FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream. // TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` -func (s *simpleMySQLSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (s *simpleMySQLSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { s.rowsBufferLock.Lock() defer s.rowsBufferLock.Unlock() newBuffer := make([]*model.RowChangedEvent, 0, len(s.rowsBuffer)) @@ -215,7 +215,7 @@ func (s *simpleMySQLSink) Close(ctx context.Context) error { return s.db.Close() } -func (s *simpleMySQLSink) Barrier(ctx context.Context) error { +func (s *simpleMySQLSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index fc1aaa374dc..c1cf6ecda3c 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -45,7 +45,7 @@ type Sink interface { // FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream. // TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` - FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) + FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) // EmitCheckpointTs sends CheckpointTs to Sink // TiCDC guarantees that all Events **in the cluster** which of commitTs less than or equal `checkpointTs` are sent to downstream successfully. @@ -56,7 +56,7 @@ type Sink interface { // Barrier is a synchronous function to wait all events to be flushed in underlying sink // Note once Barrier is called, the resolved ts won't be pushed until the Barrier call returns. - Barrier(ctx context.Context) error + Barrier(ctx context.Context, tableID model.TableID) error } var sinkIniterMap = make(map[string]sinkInitFunc) diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 1518d636b65..e1d954572a0 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -290,6 +290,13 @@ func main() { } } +type partitionSink struct { + sink.Sink + resolvedTs uint64 + partitionNo int + tablesMap sync.Map +} + // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool @@ -298,10 +305,7 @@ type Consumer struct { maxDDLReceivedTs uint64 ddlListMu sync.Mutex - sinks []*struct { - sink.Sink - resolvedTs uint64 - } + sinks []*partitionSink sinksMu sync.Mutex ddlSink sink.Sink @@ -326,10 +330,7 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { c.fakeTableIDGenerator = &fakeTableIDGenerator{ tableIDs: make(map[string]int64), } - c.sinks = make([]*struct { - sink.Sink - resolvedTs uint64 - }, kafkaPartitionNum) + c.sinks = make([]*partitionSink, kafkaPartitionNum) ctx, cancel := context.WithCancel(ctx) errCh := make(chan error, 1) opts := map[string]string{} @@ -339,10 +340,7 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { cancel() return nil, errors.Trace(err) } - c.sinks[i] = &struct { - sink.Sink - resolvedTs uint64 - }{Sink: s} + c.sinks[i] = &partitionSink{Sink: s, partitionNo: i} } sink, err := sink.NewSink(ctx, "kafka-consumer", downstreamURIStr, filter, config.GetDefaultReplicaConfig(), opts, errCh) if err != nil { @@ -443,6 +441,10 @@ ClaimMessages: if err != nil { log.Fatal("emit row changed event failed", zap.Error(err)) } + lastCommitTs, ok := sink.tablesMap.Load(row.Table.TableID) + if !ok || lastCommitTs.(uint64) < row.CommitTs { + sink.tablesMap.Store(row.Table.TableID, row.CommitTs) + } case model.MqMessageTypeResolved: ts, err := batchDecoder.NextResolvedEvent() if err != nil { @@ -503,10 +505,7 @@ func (c *Consumer) popDDL() *model.DDLEvent { return nil } -func (c *Consumer) forEachSink(fn func(sink *struct { - sink.Sink - resolvedTs uint64 -}) error) error { +func (c *Consumer) forEachSink(fn func(sink *partitionSink) error) error { c.sinksMu.Lock() defer c.sinksMu.Unlock() for _, sink := range c.sinks { @@ -529,10 +528,7 @@ func (c *Consumer) Run(ctx context.Context) error { time.Sleep(100 * time.Millisecond) // handle ddl globalResolvedTs := uint64(math.MaxUint64) - err := c.forEachSink(func(sink *struct { - sink.Sink - resolvedTs uint64 - }) error { + err := c.forEachSink(func(sink *partitionSink) error { resolvedTs := atomic.LoadUint64(&sink.resolvedTs) if resolvedTs < globalResolvedTs { globalResolvedTs = resolvedTs @@ -545,10 +541,7 @@ func (c *Consumer) Run(ctx context.Context) error { todoDDL := c.getFrontDDL() if todoDDL != nil && globalResolvedTs >= todoDDL.CommitTs { // flush DMLs - err := c.forEachSink(func(sink *struct { - sink.Sink - resolvedTs uint64 - }) error { + err := c.forEachSink(func(sink *partitionSink) error { return syncFlushRowChangedEvents(ctx, sink, todoDDL.CommitTs) }) if err != nil { @@ -574,10 +567,7 @@ func (c *Consumer) Run(ctx context.Context) error { atomic.StoreUint64(&c.globalResolvedTs, globalResolvedTs) log.Info("update globalResolvedTs", zap.Uint64("ts", globalResolvedTs)) - err = c.forEachSink(func(sink *struct { - sink.Sink - resolvedTs uint64 - }) error { + err = c.forEachSink(func(sink *partitionSink) error { return syncFlushRowChangedEvents(ctx, sink, globalResolvedTs) }) if err != nil { @@ -586,18 +576,34 @@ func (c *Consumer) Run(ctx context.Context) error { } } -func syncFlushRowChangedEvents(ctx context.Context, sink sink.Sink, resolvedTs uint64) error { +func syncFlushRowChangedEvents(ctx context.Context, sink *partitionSink, resolvedTs uint64) error { for { select { case <-ctx.Done(): return ctx.Err() default: } - checkpointTs, err := sink.FlushRowChangedEvents(ctx, resolvedTs) + // tables are flushed + var ( + err error + checkpointTs uint64 + ) + flushedResolvedTs := true + sink.tablesMap.Range(func(key, value interface{}) bool { + tableID := key.(int64) + checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, resolvedTs) + if err != nil { + return false + } + if checkpointTs < resolvedTs { + flushedResolvedTs = false + } + return true + }) if err != nil { return err } - if checkpointTs >= resolvedTs { + if flushedResolvedTs { return nil } } diff --git a/tests/integration_tests/sink_hang/run.sh b/tests/integration_tests/sink_hang/run.sh index 83d78c9c050..a75689e101d 100644 --- a/tests/integration_tests/sink_hang/run.sh +++ b/tests/integration_tests/sink_hang/run.sh @@ -42,7 +42,7 @@ function run() { *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/MySQLSinkHangLongTime=1*return(true);github.com/pingcap/tiflow/cdc/sink/MySQLSinkExecDMLError=9*return(true)' + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/MySQLSinkExecDMLError=2*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') if [ "$SINK_TYPE" == "kafka" ]; then @@ -54,8 +54,6 @@ function run() { run_sql "CREATE table sink_hang.t2(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "BEGIN; INSERT INTO sink_hang.t1 VALUES (),(),(); INSERT INTO sink_hang.t2 VALUES (),(),(); COMMIT" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "error" - cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "normal" check_table_exists "sink_hang.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}