Skip to content

Commit

Permalink
cdc/sink: buffer sink manage checkpoint ts per table (pingcap#3625)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored and okJiang committed Dec 8, 2021
1 parent 784ff88 commit ffe3b15
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 18 deletions.
57 changes: 40 additions & 17 deletions cdc/sink/buffer_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (

type bufferSink struct {
Sink
checkpointTs uint64
buffer map[model.TableID][]*model.RowChangedEvent
bufferMu sync.Mutex
flushTsChan chan uint64
drawbackChan chan drawbackMsg
changeFeedCheckpointTs uint64
tableCheckpointTsMap sync.Map
buffer map[model.TableID][]*model.RowChangedEvent
bufferMu sync.Mutex
flushTsChan chan flushMsg
drawbackChan chan drawbackMsg
}

func newBufferSink(
Expand All @@ -42,14 +43,14 @@ func newBufferSink(
errCh chan error,
checkpointTs model.Ts,
drawbackChan chan drawbackMsg,
) Sink {
) *bufferSink {
sink := &bufferSink{
Sink: backendSink,
// buffer shares the same flow control with table sink
buffer: make(map[model.TableID][]*model.RowChangedEvent),
checkpointTs: checkpointTs,
flushTsChan: make(chan uint64, 128),
drawbackChan: drawbackChan,
buffer: make(map[model.TableID][]*model.RowChangedEvent),
changeFeedCheckpointTs: checkpointTs,
flushTsChan: make(chan flushMsg, 128),
drawbackChan: drawbackChan,
}
go sink.run(ctx, errCh)
return sink
Expand Down Expand Up @@ -81,8 +82,9 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
delete(b.buffer, drawback.tableID)
b.bufferMu.Unlock()
close(drawback.callback)
case resolvedTs := <-b.flushTsChan:
case flushEvent := <-b.flushTsChan:
b.bufferMu.Lock()
resolvedTs := flushEvent.resolvedTs
// find all rows before resolvedTs and emit to backend sink
for tableID, rows := range b.buffer {
i := sort.Search(len(rows), func(i int) bool {
Expand All @@ -109,15 +111,15 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
b.bufferMu.Unlock()

start := time.Now()
// todo: use real table ID
checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, 0, resolvedTs)
tableID := flushEvent.tableID
checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, flushEvent.tableID, resolvedTs)
if err != nil {
if errors.Cause(err) != context.Canceled {
errCh <- err
}
return
}
atomic.StoreUint64(&b.checkpointTs, checkpointTs)
b.tableCheckpointTsMap.Store(tableID, checkpointTs)

dur := time.Since(start)
metricFlushDuration.Observe(dur.Seconds())
Expand Down Expand Up @@ -150,8 +152,29 @@ func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Ro
func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
select {
case <-ctx.Done():
return atomic.LoadUint64(&b.checkpointTs), ctx.Err()
case b.flushTsChan <- resolvedTs:
return b.getTableCheckpointTs(tableID), ctx.Err()
case b.flushTsChan <- flushMsg{
tableID: tableID,
resolvedTs: resolvedTs,
}:
}
return atomic.LoadUint64(&b.checkpointTs), nil
return b.getTableCheckpointTs(tableID), nil
}

type flushMsg struct {
tableID model.TableID
resolvedTs uint64
}

func (b *bufferSink) getTableCheckpointTs(tableID model.TableID) uint64 {
checkPoints, ok := b.tableCheckpointTsMap.Load(tableID)
if ok {
return checkPoints.(uint64)
}
return atomic.LoadUint64(&b.changeFeedCheckpointTs)
}

// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick
func (b *bufferSink) UpdateChangeFeedCheckpointTs(checkpointTs uint64) {
atomic.StoreUint64(&b.changeFeedCheckpointTs, checkpointTs)
}
91 changes: 91 additions & 0 deletions cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"context"
"testing"
"time"

"github.com/pingcap/ticdc/cdc/model"
"github.com/stretchr/testify/require"
)

func TestTableIsNotFlushed(t *testing.T) {
b := bufferSink{changeFeedCheckpointTs: 1}
require.Equal(t, uint64(1), b.getTableCheckpointTs(2))
b.UpdateChangeFeedCheckpointTs(3)
require.Equal(t, uint64(3), b.getTableCheckpointTs(2))
}

func TestFlushTable(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer func() {
cancel()
}()
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
require.Equal(t, uint64(5), b.getTableCheckpointTs(2))
require.Nil(t, b.EmitRowChangedEvents(ctx))
tbl1 := &model.TableName{TableID: 1}
tbl2 := &model.TableName{TableID: 2}
tbl3 := &model.TableName{TableID: 3}
tbl4 := &model.TableName{TableID: 4}
require.Nil(t, b.EmitRowChangedEvents(ctx, []*model.RowChangedEvent{
{CommitTs: 6, Table: tbl1},
{CommitTs: 6, Table: tbl2},
{CommitTs: 6, Table: tbl3},
{CommitTs: 6, Table: tbl4},
{CommitTs: 10, Table: tbl1},
{CommitTs: 10, Table: tbl2},
{CommitTs: 10, Table: tbl3},
{CommitTs: 10, Table: tbl4},
}...))
checkpoint, err := b.FlushRowChangedEvents(ctx, 1, 7)
require.True(t, checkpoint <= 7)
require.Nil(t, err)
checkpoint, err = b.FlushRowChangedEvents(ctx, 2, 6)
require.True(t, checkpoint <= 6)
require.Nil(t, err)
checkpoint, err = b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(7), b.getTableCheckpointTs(1))
require.Equal(t, uint64(6), b.getTableCheckpointTs(2))
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(5), b.getTableCheckpointTs(4))
b.UpdateChangeFeedCheckpointTs(6)
require.Equal(t, uint64(7), b.getTableCheckpointTs(1))
require.Equal(t, uint64(6), b.getTableCheckpointTs(2))
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(6), b.getTableCheckpointTs(4))
}

func TestFlushFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
cancel()
checkpoint, _ = b.FlushRowChangedEvents(ctx, 3, 18)
require.Equal(t, uint64(8), checkpoint)
checkpoint, _ = b.FlushRowChangedEvents(ctx, 1, 18)
require.Equal(t, uint64(5), checkpoint)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(5), b.getTableCheckpointTs(1))
}
5 changes: 4 additions & 1 deletion cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (

// Manager manages table sinks, maintains the relationship between table sinks and backendSink.
type Manager struct {
backendSink Sink
backendSink *bufferSink
tableCheckpointTsMap sync.Map
tableSinks map[model.TableID]*tableSink
tableSinksMu sync.Mutex
Expand Down Expand Up @@ -161,6 +161,9 @@ func (m *Manager) getCheckpointTs(tableID model.TableID) uint64 {

func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) {
atomic.StoreUint64(&m.changeFeedCheckpointTs, checkpointTs)
if m.backendSink != nil {
m.backendSink.UpdateChangeFeedCheckpointTs(checkpointTs)
}
}

type drawbackMsg struct {
Expand Down

0 comments on commit ffe3b15

Please sign in to comment.