Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): refine sink interface and add init method (#5196) #5226

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Init(tableID model.TableID) error {
return nil
}

func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
Expand Down
6 changes: 5 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,11 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
tableNameStr = tableName.QuoteString()
}

sink := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs, p.redoManager)
sink, err := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs, p.redoManager)
if err != nil {
return nil, errors.Trace(err)
}

table := tablepipeline.NewTablePipeline(
ctx,
p.mounter,
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type blackHoleSink struct {
lastAccumulated uint64
}

// Init table sink resources
func (b *blackHoleSink) Init(tableID model.TableID) error {
return nil
}

func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
log.Debug("BlockHoleSink: EmitRowChangedEvents", zap.Any("row", row))
Expand Down
32 changes: 24 additions & 8 deletions cdc/sink/buffer_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,20 @@ type bufferSink struct {
buffer map[model.TableID][]*model.RowChangedEvent
bufferMu sync.Mutex
flushTsChan chan flushMsg
drawbackChan chan drawbackMsg
}

func newBufferSink(
ctx context.Context,
backendSink Sink,
errCh chan error,
checkpointTs model.Ts,
drawbackChan chan drawbackMsg,
) *bufferSink {
sink := &bufferSink{
Sink: backendSink,
// buffer shares the same flow control with table sink
buffer: make(map[model.TableID][]*model.RowChangedEvent),
changeFeedCheckpointTs: checkpointTs,
flushTsChan: make(chan flushMsg, 128),
drawbackChan: drawbackChan,
}
go sink.run(ctx, errCh)
return sink
Expand All @@ -77,11 +74,6 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
errCh <- err
}
return
case drawback := <-b.drawbackChan:
b.bufferMu.Lock()
delete(b.buffer, drawback.tableID)
b.bufferMu.Unlock()
close(drawback.callback)
case flushEvent := <-b.flushTsChan:
b.bufferMu.Lock()
resolvedTs := flushEvent.resolvedTs
Expand Down Expand Up @@ -133,6 +125,30 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
}
}

// Init table sink resources
func (b *bufferSink) Init(tableID model.TableID) error {
b.clearBufferedTableData(tableID)
return b.Sink.Init(tableID)
}

// Barrier delete buffer
func (b *bufferSink) Barrier(ctx context.Context, tableID model.TableID) error {
b.clearBufferedTableData(tableID)
return b.Sink.Barrier(ctx, tableID)
}

func (b *bufferSink) clearBufferedTableData(tableID model.TableID) {
b.bufferMu.Lock()
defer b.bufferMu.Unlock()
delete(b.buffer, tableID)
checkpointTs, loaded := b.tableCheckpointTsMap.LoadAndDelete(tableID)
if loaded {
log.Info("clean up table checkpoint ts in buffer sink",
zap.Int64("tableID", tableID),
zap.Uint64("checkpointTs", checkpointTs.(uint64)))
}
}

func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
select {
case <-ctx.Done():
Expand Down
19 changes: 17 additions & 2 deletions cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestTableIsNotFlushed(t *testing.T) {
func TestFlushTable(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5, make(chan drawbackMsg))
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5)

require.Equal(t, uint64(5), b.getTableCheckpointTs(2))
require.Nil(t, b.EmitRowChangedEvents(ctx))
Expand Down Expand Up @@ -73,7 +73,7 @@ func TestFlushTable(t *testing.T) {

func TestFlushFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5, make(chan drawbackMsg))
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5)

checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
Expand All @@ -89,3 +89,18 @@ func TestFlushFailed(t *testing.T) {
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(5), b.getTableCheckpointTs(1))
}

func TestCleanBufferedData(t *testing.T) {
t.Parallel()

tblID := model.TableID(1)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5)
b.buffer[tblID] = []*model.RowChangedEvent{}
_, ok := b.buffer[tblID]
require.True(t, ok)
require.Nil(t, b.Init(tblID))
_, ok = b.buffer[tblID]
require.False(t, ok)
}
4 changes: 4 additions & 0 deletions cdc/sink/cdclog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ type fileSink struct {
ddlEncoder codec.EventBatchEncoder
}

func (f *fileSink) Init(_ model.TableID) error {
return nil
}

func (f *fileSink) flushLogMeta() error {
data, err := f.logMeta.Marshal()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cdc/sink/cdclog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ type s3Sink struct {
ddlEncoder codec.EventBatchEncoder
}

func (s *s3Sink) Init(_ model.TableID) error {
return nil
}

func (s *s3Sink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
return s.emitRowChangedEvents(ctx, newTableBuffer, rows...)
}
Expand Down
7 changes: 7 additions & 0 deletions cdc/sink/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func NewUnresolvedTxnCache() *UnresolvedTxnCache {
}
}

// RemoveTableTxn removes unresolved rows from cache
func (c *UnresolvedTxnCache) RemoveTableTxn(tableID model.TableID) {
c.unresolvedTxnsMu.Lock()
defer c.unresolvedTxnsMu.Unlock()
delete(c.unresolvedTxns, tableID)
}

// Append adds unresolved rows to cache
// the rows inputed into this function will go through the following handling logic
// 1. group by tableID from one input stream
Expand Down
44 changes: 16 additions & 28 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ type Manager struct {
flushMu sync.Mutex
flushing int64

drawbackChan chan drawbackMsg

captureAddr string
changefeedID model.ChangeFeedID
metricsTableSinkTotalRows prometheus.Counter
Expand All @@ -55,34 +53,40 @@ func NewManager(
ctx context.Context, backendSink Sink, errCh chan error, checkpointTs model.Ts,
captureAddr string, changefeedID model.ChangeFeedID,
) *Manager {
drawbackChan := make(chan drawbackMsg, 16)
return &Manager{
backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs, drawbackChan),
backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs),
changeFeedCheckpointTs: checkpointTs,
tableSinks: make(map[model.TableID]*tableSink),
drawbackChan: drawbackChan,
captureAddr: captureAddr,
changefeedID: changefeedID,
metricsTableSinkTotalRows: tableSinkTotalRowsCountCounter.WithLabelValues(captureAddr, changefeedID),
}
}

// CreateTableSink creates a table sink
func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts, redoManager redo.LogManager) Sink {
m.tableSinksMu.Lock()
defer m.tableSinksMu.Unlock()
if _, exist := m.tableSinks[tableID]; exist {
log.Panic("the table sink already exists", zap.Uint64("tableID", uint64(tableID)))
}
func (m *Manager) CreateTableSink(
tableID model.TableID,
checkpointTs model.Ts,
redoManager redo.LogManager,
) (Sink, error) {
sink := &tableSink{
tableID: tableID,
manager: m,
buffer: make([]*model.RowChangedEvent, 0, 128),
emittedTs: checkpointTs,
redoManager: redoManager,
}

m.tableSinksMu.Lock()
defer m.tableSinksMu.Unlock()
if _, exist := m.tableSinks[tableID]; exist {
log.Panic("the table sink already exists", zap.Uint64("tableID", uint64(tableID)))
}
if err := sink.Init(tableID); err != nil {
return nil, errors.Trace(err)
}
m.tableSinks[tableID] = sink
return sink
return sink, nil
}

// Close closes the Sink manager and backend Sink, this method can be reentrantly called
Expand Down Expand Up @@ -149,17 +153,6 @@ func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) e
m.tableSinksMu.Lock()
delete(m.tableSinks, tableID)
m.tableSinksMu.Unlock()
callback := make(chan struct{})
select {
case <-ctx.Done():
return ctx.Err()
case m.drawbackChan <- drawbackMsg{tableID: tableID, callback: callback}:
}
select {
case <-ctx.Done():
return ctx.Err()
case <-callback:
}
return m.backendSink.Barrier(ctx, tableID)
}

Expand All @@ -180,8 +173,3 @@ func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) {
m.backendSink.UpdateChangeFeedCheckpointTs(checkpointTs)
}
}

type drawbackMsg struct {
tableID model.TableID
callback chan struct{}
}
35 changes: 27 additions & 8 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func newCheckSink(c *check.C) *checkSink {
}
}

// Init table sink resources
func (c *checkSink) Init(tableID model.TableID) error {
return nil
}

func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
Expand Down Expand Up @@ -110,7 +115,10 @@ func (s *managerSuite) TestManagerRandom(c *check.C) {
wg.Add(1)
go func() {
defer wg.Done()
tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0, redo.NewDisabledManager())
var err error
tableSinks[i], err = manager.CreateTableSink(model.TableID(i),
0, redo.NewDisabledManager())
c.Assert(err, check.IsNil)
}()
}
wg.Wait()
Expand Down Expand Up @@ -203,7 +211,8 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) {
for i := 0; i < goroutineNum; i++ {
if i%4 != 3 {
// add table
table := manager.CreateTableSink(model.TableID(i), maxResolvedTs, redoManager)
table, err := manager.CreateTableSink(model.TableID(i), maxResolvedTs, redoManager)
c.Assert(err, check.IsNil)
ctx, cancel := context.WithCancel(ctx)
tableCancels = append(tableCancels, cancel)
tableSinks = append(tableSinks, table)
Expand Down Expand Up @@ -245,9 +254,11 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) {
defer manager.Close(ctx)

tableID := int64(49)
tableSink := manager.CreateTableSink(tableID, 100, redo.NewDisabledManager())
err := tableSink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{
Table: &model.TableName{TableID: tableID},
table := &model.TableName{TableID: int64(49)}
tableSink, err := manager.CreateTableSink(table.TableID, 100, redo.NewDisabledManager())
c.Assert(err, check.IsNil)
err = tableSink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{
Table: table,
CommitTs: uint64(110),
})
c.Assert(err, check.IsNil)
Expand All @@ -274,7 +285,10 @@ func BenchmarkManagerFlushing(b *testing.B) {
wg.Add(1)
go func() {
defer wg.Done()
tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0, redo.NewDisabledManager())
var err error
tableSinks[i], err = manager.CreateTableSink(model.TableID(i),
0, redo.NewDisabledManager())
panic(err)
}()
}
wg.Wait()
Expand Down Expand Up @@ -340,6 +354,10 @@ type errorSink struct {
*check.C
}

func (e *errorSink) Init(_ model.TableID) error {
return nil
}

func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
return errors.New("error in emit row changed events")
}
Expand Down Expand Up @@ -371,8 +389,9 @@ func (s *managerSuite) TestManagerError(c *check.C) {
errCh := make(chan error, 16)
manager := NewManager(ctx, &errorSink{C: c}, errCh, 0, "", "")
defer manager.Close(ctx)
sink := manager.CreateTableSink(1, 0, redo.NewDisabledManager())
err := sink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{
sink, err := manager.CreateTableSink(1, 0, redo.NewDisabledManager())
c.Assert(err, check.IsNil)
err = sink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{TableID: 1},
})
Expand Down
Loading