Skip to content

Commit

Permalink
sink(cdc): close table sinks when sink factory fails (pingcap#9449) (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and CharlesCheung96 committed Aug 11, 2023
1 parent 1e2f277 commit 2caa14c
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 82 deletions.
69 changes: 29 additions & 40 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,20 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
zap.Error(err))
m.clearSinkFactory()
sinkFactoryErrors = make(chan error, 16)

start := time.Now()
log.Info("Sink manager is closing all table sinks",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
m.tableSinks.Range(func(key, value interface{}) bool {
value.(*tableSinkWrapper).closeTableSink()
m.sinkMemQuota.ClearTable(key.(model.TableID))
return true
})
log.Info("Sink manager has closed all table sinks",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))
}

if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled {
Expand Down Expand Up @@ -413,22 +427,17 @@ func (m *SinkManager) backgroundGC(errors chan<- error) {
}()
}

// generateSinkTasks generates tasks to fetch data from the source manager.
func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
// Task upperbound is limited by barrierTs and schemaResolvedTs.
// But receivedSorterResolvedTs can be less than barrierTs, in which case
// the table is just scheduled to this node.
getUpperBound := func(
tableSinkUpperBoundTs model.Ts,
) engine.Position {
schemaTs := m.schemaStorage.ResolvedTs()
if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 {
// schemaTs == math.MaxUint64 means it's in tests.
tableSinkUpperBoundTs = schemaTs + 1
}
return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
func (m *SinkManager) getUpperBound(tableSinkUpperBoundTs model.Ts) engine.Position {
schemaTs := m.schemaStorage.ResolvedTs()
if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 {
// schemaTs == math.MaxUint64 means it's in tests.
tableSinkUpperBoundTs = schemaTs + 1
}
return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
}

// generateSinkTasks generates tasks to fetch data from the source manager.
func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
dispatchTasks := func() error {
tables := make([]*tableSinkWrapper, 0, sinkWorkerNum)
progs := make([]*progress, 0, sinkWorkerNum)
Expand Down Expand Up @@ -476,7 +485,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
tableSink := tables[i]
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := getUpperBound(tableSink.getUpperBoundTs())
upperBound := m.getUpperBound(tableSink.getUpperBoundTs())
// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
m.sinkProgressHeap.push(slowestTableProgress)
Expand All @@ -502,7 +511,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
t := &sinkTask{
tableID: tableSink.tableID,
lowerBound: lowerBound,
getUpperBound: getUpperBound,
getUpperBound: m.getUpperBound,
tableSink: tableSink,
callback: func(lastWrittenPos engine.Position) {
p := &progress{
Expand Down Expand Up @@ -566,18 +575,6 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
}

func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
// We use the table's resolved ts as the upper bound to fetch events.
getUpperBound := func(tableSinkUpperBoundTs model.Ts) engine.Position {
// If a task carries events after schemaResolvedTs, mounter group threads
// can be blocked on waiting schemaResolvedTs get advanced.
schemaTs := m.schemaStorage.ResolvedTs()
if tableSinkUpperBoundTs > schemaTs+1 {
tableSinkUpperBoundTs = schemaTs + 1
}

return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
}

dispatchTasks := func() error {
tables := make([]*tableSinkWrapper, 0, redoWorkerNum)
progs := make([]*progress, 0, redoWorkerNum)
Expand Down Expand Up @@ -624,7 +621,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
tableSink := tables[i]
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := getUpperBound(tableSink.getReceivedSorterResolvedTs())
upperBound := m.getUpperBound(tableSink.getReceivedSorterResolvedTs())

// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
Expand All @@ -646,7 +643,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
t := &redoTask{
tableID: tableSink.tableID,
lowerBound: lowerBound,
getUpperBound: getUpperBound,
getUpperBound: m.getUpperBound,
tableSink: tableSink,
callback: func(lastWrittenPos engine.Position) {
p := &progress{
Expand Down Expand Up @@ -840,7 +837,7 @@ func (m *SinkManager) AsyncStopTable(tableID model.TableID) bool {
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
}
if tableSink.(*tableSinkWrapper).asyncClose() {
if tableSink.(*tableSinkWrapper).asyncStop() {
cleanedBytes := m.sinkMemQuota.RemoveTable(tableID)
cleanedBytes += m.redoMemQuota.RemoveTable(tableID)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
Expand Down Expand Up @@ -910,7 +907,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState,
// again or not if it returns false. So we must retry `tableSink.asyncClose` here
// if necessary. It's better to remove the dirty logic in the future.
tableSink := wrapper.(*tableSinkWrapper)
if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncClose() {
if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncStop() {
cleanedBytes := m.sinkMemQuota.RemoveTable(tableID)
cleanedBytes += m.redoMemQuota.RemoveTable(tableID)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
Expand Down Expand Up @@ -982,14 +979,6 @@ func (m *SinkManager) Close() {
zap.String("changefeed", m.changefeedID.ID))
start := time.Now()
m.waitSubroutines()
m.tableSinks.Range(func(_, value interface{}) bool {
sink := value.(*tableSinkWrapper)
sink.close()
if m.eventCache != nil {
m.eventCache.removeTable(sink.tableID)
}
return true
})
m.clearSinkFactory()

log.Info("Closed sink manager",
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// events have been reported. Then we can continue the table
// at the checkpoint position.
case tablesink.SinkInternalError:
task.tableSink.clearTableSink()
task.tableSink.closeAndClearTableSink()
// After the table sink is cleared all pending events are sent out or dropped.
// So we can re-add the table into sinkMemQuota.
w.sinkMemQuota.ClearTable(task.tableSink.tableID)
Expand Down
49 changes: 31 additions & 18 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type tableSinkWrapper struct {
// tableSink is the underlying sink.
tableSink tablesink.TableSink
tableSinkCheckpointTs model.ResolvedTs
tableSinkMu sync.Mutex
tableSinkMu sync.RWMutex

// state used to control the lifecycle of the table.
state *tablepb.TableState
Expand Down Expand Up @@ -282,18 +282,18 @@ func (t *tableSinkWrapper) markAsClosed() {
}
}

func (t *tableSinkWrapper) asyncClose() bool {
func (t *tableSinkWrapper) asyncStop() bool {
t.markAsClosing()
if t.asyncClearTableSink() {
if t.asyncCloseAndClearTableSink() {
t.markAsClosed()
return true
}
return false
}

func (t *tableSinkWrapper) close() {
func (t *tableSinkWrapper) stop() {
t.markAsClosing()
t.clearTableSink()
t.closeAndClearTableSink()
t.markAsClosed()
}

Expand All @@ -308,23 +308,36 @@ func (t *tableSinkWrapper) initTableSink() bool {
return true
}

func (t *tableSinkWrapper) asyncClearTableSink() bool {
t.tableSinkMu.Lock()
defer t.tableSinkMu.Unlock()
if t.tableSink != nil {
if !t.tableSink.AsyncClose() {
return false
}
checkpointTs := t.tableSink.GetCheckpointTs()
if t.tableSinkCheckpointTs.Less(checkpointTs) {
t.tableSinkCheckpointTs = checkpointTs
}
t.tableSink = nil
func (t *tableSinkWrapper) asyncCloseTableSink() bool {
t.tableSinkMu.RLock()
defer t.tableSinkMu.RUnlock()
if t.tableSink == nil {
return true
}
return t.tableSink.AsyncClose()
}

func (t *tableSinkWrapper) closeTableSink() {
t.tableSinkMu.RLock()
defer t.tableSinkMu.RUnlock()
if t.tableSink == nil {
return
}
t.tableSink.Close()
}

func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool {
t.asyncCloseTableSink()
t.doTableSinkClear()
return true
}

func (t *tableSinkWrapper) clearTableSink() {
func (t *tableSinkWrapper) closeAndClearTableSink() {
t.closeTableSink()
t.doTableSinkClear()
}

func (t *tableSinkWrapper) doTableSinkClear() {
t.tableSinkMu.Lock()
defer t.tableSinkMu.Unlock()
if t.tableSink != nil {
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestTableSinkWrapperClose(t *testing.T) {

wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1)
require.Equal(t, tablepb.TableStatePreparing, wrapper.getState())
wrapper.close()
wrapper.stop()
require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped")
}

Expand Down
4 changes: 3 additions & 1 deletion cdc/sinkv2/tablesink/table_sink_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ func (e *EventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) error
// GetCheckpointTs returns the checkpoint ts of the table sink.
func (e *EventTableSink[E]) GetCheckpointTs() model.ResolvedTs {
if e.state.Load() == state.TableSinkStopping {
e.progressTracker.checkClosed(e.backendSink.Dead())
if e.progressTracker.checkClosed(e.backendSink.Dead()) {
e.markAsClosed()
}
}
return e.progressTracker.advance()
}
Expand Down
30 changes: 9 additions & 21 deletions cdc/sinkv2/tablesink/table_sink_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,26 +366,14 @@ func TestCheckpointTsFrozenWhenStopping(t *testing.T) {
require.Nil(t, err)
require.Len(t, sink.events, 7, "all events should be flushed")

go func() {
time.Sleep(time.Millisecond * 10)
sink.Close()
}()
// Table sink close should return even if callbacks are not called,
// because the backend sink is closed.
sink.Close()
tb.Close()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
tb.Close()
}()
require.Eventually(t, func() bool {
return state.TableSinkStopping == tb.state.Load()
}, time.Second, time.Microsecond, "table should be stopping")
wg.Add(1)
go func() {
defer wg.Done()
currentTs := tb.GetCheckpointTs()
sink.acknowledge(105)
require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated")
}()
wg.Wait()
require.Equal(t, state.TableSinkStopped, tb.state.Load())

currentTs := tb.GetCheckpointTs()
sink.acknowledge(105)
require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated")
}

0 comments on commit 2caa14c

Please sign in to comment.