Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc/sink: buffer sink manage checkpoint ts per table #3625

Merged
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
ba2c2fd
refine Sink interface
sdojjy Nov 25, 2021
26070d9
fix ut
sdojjy Nov 25, 2021
f388718
fix lint
sdojjy Nov 25, 2021
51b754a
fix lint
sdojjy Nov 25, 2021
112b4e0
sink manager manage checkpointTs per table
sdojjy Nov 25, 2021
bf10cc1
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
sdojjy Nov 25, 2021
ce8598f
buffer sink manage checkpointTs per table
sdojjy Nov 26, 2021
aaa2a9d
buffer sink manage checkpointTs per table
sdojjy Nov 26, 2021
6bccf2f
Merge remote-tracking branch 'origin/sink-manager-manage-checkpoint-p…
sdojjy Nov 26, 2021
dbd09d3
sink manager manage checkpointTs per table
sdojjy Nov 26, 2021
1fd207d
Merge branch 'sink-manager-manage-checkpoint-per-table' into buffer_s…
sdojjy Nov 26, 2021
e189647
buffer sink manage checkpointTs per table
sdojjy Nov 26, 2021
fc0d687
sink manager manage checkpointTs per table
sdojjy Nov 26, 2021
4008fce
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
sdojjy Nov 26, 2021
e938ba9
Merge branch 'sink-manager-manage-checkpoint-per-table' into buffer_s…
sdojjy Nov 26, 2021
3421986
fix lint
sdojjy Nov 29, 2021
594593e
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
sdojjy Nov 30, 2021
78f2628
address comment
sdojjy Nov 30, 2021
d87f6e7
address comment
sdojjy Nov 30, 2021
6e34487
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
sdojjy Nov 30, 2021
8bfacd1
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
sdojjy Nov 30, 2021
d3444c1
fix ut
sdojjy Nov 30, 2021
6cd2eaa
fix ut
sdojjy Nov 30, 2021
2227e75
fix ut
sdojjy Nov 30, 2021
e75d9d0
Merge remote-tracking branch 'upstream/master' into sink-manager-mana…
sdojjy Nov 30, 2021
fefb6bf
Merge branch 'sink-manager-manage-checkpoint-per-table' into buffer_s…
sdojjy Nov 30, 2021
9af09b3
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
sdojjy Nov 30, 2021
e243794
update buffer sink global checkpointTs
sdojjy Dec 1, 2021
07d0c99
fix sink_hang integration test
sdojjy Dec 2, 2021
853dd30
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
sdojjy Dec 2, 2021
a9f8020
Merge branch 'buffer_sink_manage_checkpointTs_per_table' of github.co…
sdojjy Dec 2, 2021
b554602
address comment
sdojjy Dec 3, 2021
06ad1d8
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
sdojjy Dec 4, 2021
0b67675
add ut cases
sdojjy Dec 4, 2021
e7f25b4
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
ti-chi-bot Dec 6, 2021
e6f2e91
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
ti-chi-bot Dec 6, 2021
8a80558
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
sdojjy Dec 6, 2021
0675979
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
ti-chi-bot Dec 6, 2021
baf3105
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
ti-chi-bot Dec 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions cdc/sink/buffer_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (

type bufferSink struct {
Sink
checkpointTs uint64
buffer map[model.TableID][]*model.RowChangedEvent
bufferMu sync.Mutex
flushTsChan chan uint64
drawbackChan chan drawbackMsg
changeFeedCheckpointTs uint64
tableCheckpointTsMap sync.Map
buffer map[model.TableID][]*model.RowChangedEvent
bufferMu sync.Mutex
flushTsChan chan flushMsg
drawbackChan chan drawbackMsg
}

func newBufferSink(
Expand All @@ -42,14 +43,14 @@ func newBufferSink(
errCh chan error,
checkpointTs model.Ts,
drawbackChan chan drawbackMsg,
) Sink {
) *bufferSink {
sink := &bufferSink{
Sink: backendSink,
// buffer shares the same flow control with table sink
buffer: make(map[model.TableID][]*model.RowChangedEvent),
checkpointTs: checkpointTs,
flushTsChan: make(chan uint64, 128),
drawbackChan: drawbackChan,
buffer: make(map[model.TableID][]*model.RowChangedEvent),
changeFeedCheckpointTs: checkpointTs,
flushTsChan: make(chan flushMsg, 128),
drawbackChan: drawbackChan,
}
go sink.run(ctx, errCh)
return sink
Expand Down Expand Up @@ -81,8 +82,9 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
delete(b.buffer, drawback.tableID)
b.bufferMu.Unlock()
close(drawback.callback)
case resolvedTs := <-b.flushTsChan:
case flushEvent := <-b.flushTsChan:
b.bufferMu.Lock()
resolvedTs := flushEvent.resolvedTs
// find all rows before resolvedTs and emit to backend sink
for tableID, rows := range b.buffer {
i := sort.Search(len(rows), func(i int) bool {
Expand All @@ -109,15 +111,15 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
b.bufferMu.Unlock()

start := time.Now()
// todo: use real table ID
checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, 0, resolvedTs)
tableID := flushEvent.tableID
checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, flushEvent.tableID, resolvedTs)
if err != nil {
if errors.Cause(err) != context.Canceled {
errCh <- err
}
return
}
atomic.StoreUint64(&b.checkpointTs, checkpointTs)
b.tableCheckpointTsMap.Store(tableID, checkpointTs)

dur := time.Since(start)
metricFlushDuration.Observe(dur.Seconds())
Expand Down Expand Up @@ -150,8 +152,29 @@ func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Ro
func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
select {
case <-ctx.Done():
return atomic.LoadUint64(&b.checkpointTs), ctx.Err()
case b.flushTsChan <- resolvedTs:
return b.getTableCheckpointTs(tableID), ctx.Err()
case b.flushTsChan <- flushMsg{
tableID: tableID,
resolvedTs: resolvedTs,
}:
}
return atomic.LoadUint64(&b.checkpointTs), nil
return b.getTableCheckpointTs(tableID), nil
}

type flushMsg struct {
tableID model.TableID
resolvedTs uint64
}

func (b *bufferSink) getTableCheckpointTs(tableID model.TableID) uint64 {
checkPoints, ok := b.tableCheckpointTsMap.Load(tableID)
if ok {
return checkPoints.(uint64)
}
return atomic.LoadUint64(&b.changeFeedCheckpointTs)
}

// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick
func (b *bufferSink) UpdateChangeFeedCheckpointTs(checkpointTs uint64) {
atomic.StoreUint64(&b.changeFeedCheckpointTs, checkpointTs)
}
91 changes: 91 additions & 0 deletions cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"context"
"testing"
"time"

"github.com/pingcap/ticdc/cdc/model"
"github.com/stretchr/testify/require"
)

func TestTableIsNotFlushed(t *testing.T) {
b := bufferSink{changeFeedCheckpointTs: 1}
require.Equal(t, uint64(1), b.getTableCheckpointTs(2))
b.UpdateChangeFeedCheckpointTs(3)
require.Equal(t, uint64(3), b.getTableCheckpointTs(2))
}

func TestFlushTable(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer func() {
cancel()
}()
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
require.Equal(t, uint64(5), b.getTableCheckpointTs(2))
require.Nil(t, b.EmitRowChangedEvents(ctx))
tbl1 := &model.TableName{TableID: 1}
tbl2 := &model.TableName{TableID: 2}
tbl3 := &model.TableName{TableID: 3}
tbl4 := &model.TableName{TableID: 4}
require.Nil(t, b.EmitRowChangedEvents(ctx, []*model.RowChangedEvent{
{CommitTs: 6, Table: tbl1},
{CommitTs: 6, Table: tbl2},
{CommitTs: 6, Table: tbl3},
{CommitTs: 6, Table: tbl4},
{CommitTs: 10, Table: tbl1},
{CommitTs: 10, Table: tbl2},
{CommitTs: 10, Table: tbl3},
{CommitTs: 10, Table: tbl4},
}...))
checkpoint, err := b.FlushRowChangedEvents(ctx, 1, 7)
require.True(t, checkpoint <= 7)
require.Nil(t, err)
checkpoint, err = b.FlushRowChangedEvents(ctx, 2, 6)
require.True(t, checkpoint <= 6)
require.Nil(t, err)
checkpoint, err = b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(7), b.getTableCheckpointTs(1))
require.Equal(t, uint64(6), b.getTableCheckpointTs(2))
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(5), b.getTableCheckpointTs(4))
b.UpdateChangeFeedCheckpointTs(6)
require.Equal(t, uint64(7), b.getTableCheckpointTs(1))
require.Equal(t, uint64(6), b.getTableCheckpointTs(2))
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(6), b.getTableCheckpointTs(4))
}

func TestFlushFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
cancel()
checkpoint, _ = b.FlushRowChangedEvents(ctx, 3, 18)
require.Equal(t, uint64(8), checkpoint)
checkpoint, _ = b.FlushRowChangedEvents(ctx, 1, 18)
require.Equal(t, uint64(5), checkpoint)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(5), b.getTableCheckpointTs(1))
}
5 changes: 4 additions & 1 deletion cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (

// Manager manages table sinks, maintains the relationship between table sinks and backendSink.
type Manager struct {
backendSink Sink
backendSink *bufferSink
tableCheckpointTsMap sync.Map
tableSinks map[model.TableID]*tableSink
tableSinksMu sync.Mutex
Expand Down Expand Up @@ -161,6 +161,9 @@ func (m *Manager) getCheckpointTs(tableID model.TableID) uint64 {

func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) {
atomic.StoreUint64(&m.changeFeedCheckpointTs, checkpointTs)
if m.backendSink != nil {
m.backendSink.UpdateChangeFeedCheckpointTs(checkpointTs)
}
}

type drawbackMsg struct {
Expand Down