Skip to content

Commit

Permalink
scheduler(ticdc): optimize owner memory allocation (#8010)
Browse files Browse the repository at this point in the history
ref #7720
  • Loading branch information
overvenus authored Jan 6, 2023
1 parent 5b31ba1 commit 857e7cf
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 104 deletions.
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (a *agent) handleMessageDispatchTableRequest(
zap.String("capture", a.CaptureID),
zap.String("namespace", a.ChangeFeedID.Namespace),
zap.String("changefeed", a.ChangeFeedID.ID),
zap.Stringer("span", &span),
zap.String("span", span.String()),
zap.Any("request", request))
return
}
Expand Down
56 changes: 28 additions & 28 deletions cdc/scheduler/internal/v3/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,67 +972,67 @@ func newMockTableExecutor() *MockTableExecutor {

// AddTableSpan adds a table span to the executor.
func (e *MockTableExecutor) AddTableSpan(
ctx context.Context, tableID tablepb.Span, startTs model.Ts, isPrepare bool,
ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool,
) (bool, error) {
log.Info("AddTableSpan",
zap.Stringer("span", &tableID),
zap.String("span", span.String()),
zap.Any("startTs", startTs),
zap.Bool("isPrepare", isPrepare))

state, ok := e.tables.Get(tableID)
state, ok := e.tables.Get(span)
if ok {
switch state {
case tablepb.TableStatePreparing:
return true, nil
case tablepb.TableStatePrepared:
if !isPrepare {
e.tables.ReplaceOrInsert(tableID, tablepb.TableStateReplicating)
e.tables.ReplaceOrInsert(span, tablepb.TableStateReplicating)
}
return true, nil
case tablepb.TableStateReplicating:
return true, nil
case tablepb.TableStateStopped:
e.tables.Delete(tableID)
e.tables.Delete(span)
}
}
args := e.Called(ctx, tableID, startTs, isPrepare)
args := e.Called(ctx, span, startTs, isPrepare)
if args.Bool(0) {
e.tables.ReplaceOrInsert(tableID, tablepb.TableStatePreparing)
e.tables.ReplaceOrInsert(span, tablepb.TableStatePreparing)
}
return args.Bool(0), args.Error(1)
}

// IsAddTableSpanFinished determines if the table span has been added.
func (e *MockTableExecutor) IsAddTableSpanFinished(tableID tablepb.Span, isPrepare bool) bool {
_, ok := e.tables.Get(tableID)
func (e *MockTableExecutor) IsAddTableSpanFinished(span tablepb.Span, isPrepare bool) bool {
_, ok := e.tables.Get(span)
if !ok {
log.Panic("table which was added is not found",
zap.Stringer("span", &tableID),
zap.String("span", span.String()),
zap.Bool("isPrepare", isPrepare))
}

args := e.Called(tableID, isPrepare)
args := e.Called(span, isPrepare)
if args.Bool(0) {
e.tables.ReplaceOrInsert(tableID, tablepb.TableStatePrepared)
e.tables.ReplaceOrInsert(span, tablepb.TableStatePrepared)
if !isPrepare {
e.tables.ReplaceOrInsert(tableID, tablepb.TableStateReplicating)
e.tables.ReplaceOrInsert(span, tablepb.TableStateReplicating)
}
return true
}

e.tables.ReplaceOrInsert(tableID, tablepb.TableStatePreparing)
e.tables.ReplaceOrInsert(span, tablepb.TableStatePreparing)
if !isPrepare {
e.tables.ReplaceOrInsert(tableID, tablepb.TableStatePrepared)
e.tables.ReplaceOrInsert(span, tablepb.TableStatePrepared)
}

return false
}

// RemoveTableSpan removes a table span from the executor.
func (e *MockTableExecutor) RemoveTableSpan(tableID tablepb.Span) bool {
state, ok := e.tables.Get(tableID)
func (e *MockTableExecutor) RemoveTableSpan(span tablepb.Span) bool {
state, ok := e.tables.Get(span)
if !ok {
log.Warn("table to be remove is not found", zap.Stringer("span", &tableID))
log.Warn("table to be remove is not found", zap.String("span", span.String()))
return true
}
switch state {
Expand All @@ -1042,33 +1042,33 @@ func (e *MockTableExecutor) RemoveTableSpan(tableID tablepb.Span) bool {
default:
}
// the current `processor implementation, does not consider table's state
log.Info("RemoveTableSpan", zap.Stringer("span", &tableID), zap.Any("state", state))
log.Info("RemoveTableSpan", zap.String("span", span.String()), zap.Any("state", state))

args := e.Called(tableID)
args := e.Called(span)
if args.Bool(0) {
e.tables.ReplaceOrInsert(tableID, tablepb.TableStateStopped)
e.tables.ReplaceOrInsert(span, tablepb.TableStateStopped)
}
return args.Bool(0)
}

// IsRemoveTableSpanFinished determines if the table span has been removed.
func (e *MockTableExecutor) IsRemoveTableSpanFinished(tableID tablepb.Span) (model.Ts, bool) {
state, ok := e.tables.Get(tableID)
func (e *MockTableExecutor) IsRemoveTableSpanFinished(span tablepb.Span) (model.Ts, bool) {
state, ok := e.tables.Get(span)
if !ok {
// the real `table executor` processor, would panic in such case.
log.Warn("table to be removed is not found",
zap.Stringer("span", &tableID))
zap.String("span", span.String()))
return 0, true
}
args := e.Called(tableID)
args := e.Called(span)
if args.Bool(1) {
log.Info("remove table finished, remove it from the executor",
zap.Stringer("span", &tableID), zap.Any("state", state))
e.tables.Delete(tableID)
zap.String("span", span.String()), zap.Any("state", state))
e.tables.Delete(span)
} else {
// revert the state back to old state, assume it's `replicating`,
// but `preparing` / `prepared` can also be removed.
e.tables.ReplaceOrInsert(tableID, tablepb.TableStateReplicating)
e.tables.ReplaceOrInsert(span, tablepb.TableStateReplicating)
}

return model.Ts(args.Int(0)), args.Bool(1)
Expand Down
6 changes: 3 additions & 3 deletions cdc/scheduler/internal/v3/agent/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,22 +348,22 @@ func (tm *tableSpanManager) dropTableSpan(span tablepb.Span) {
log.Warn("schedulerv3: tableManager drop table not found",
zap.String("namespace", tm.changefeedID.Namespace),
zap.String("changefeed", tm.changefeedID.ID),
zap.Stringer("span", &span))
zap.String("span", span.String()))
return
}
state, _ := table.getAndUpdateTableSpanState()
if state != tablepb.TableStateAbsent {
log.Panic("schedulerv3: tableManager drop table undesired",
zap.String("namespace", tm.changefeedID.Namespace),
zap.String("changefeed", tm.changefeedID.ID),
zap.Stringer("span", &span),
zap.String("span", span.String()),
zap.Stringer("state", table.state))
}

log.Debug("schedulerv3: tableManager drop table",
zap.String("namespace", tm.changefeedID.Namespace),
zap.String("changefeed", tm.changefeedID.ID),
zap.Stringer("span", &span))
zap.String("span", span.String()))
tm.tables.Delete(span)
}

Expand Down
8 changes: 5 additions & 3 deletions cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type coordinator struct {
reconciler *keyspan.Reconciler
compat *compat.Compat
pdClock pdutil.Clock
tableRanges replication.TableRanges

lastCollectTime time.Time
changefeedID model.ChangeFeedID
Expand Down Expand Up @@ -289,10 +290,11 @@ func (c *coordinator) poll(
}
}

c.tableRanges.UpdateTables(currentTables)
if !c.captureM.CheckAllCaptureInitialized() {
// Skip generating schedule tasks for replication manager,
// as not all capture are initialized.
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(currentTables, pdTime)
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime)
return newCheckpointTs, newResolvedTs, c.sendMsgs(ctx, msgBuf)
}

Expand All @@ -309,7 +311,7 @@ func (c *coordinator) poll(
// Generate schedule tasks based on the current status.
replications := c.replicationM.ReplicationSets()
runningTasks := c.replicationM.RunningTasks()
currentSpans := c.reconciler.Reconcile(ctx, currentTables, replications, c.compat)
currentSpans := c.reconciler.Reconcile(ctx, &c.tableRanges, replications, c.compat)
allTasks := c.schedulerM.Schedule(
checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks)

Expand All @@ -327,7 +329,7 @@ func (c *coordinator) poll(
}

// Checkpoint calculation
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(currentTables, pdTime)
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime)
return newCheckpointTs, newResolvedTs, nil
}

Expand Down
51 changes: 29 additions & 22 deletions cdc/scheduler/internal/v3/keyspan/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,29 @@ func NewReconciler(
// 6. Some captures fail, does NOT affect spans.
func (m *Reconciler) Reconcile(
ctx context.Context,
currentTables []model.TableID,
currentTables *replication.TableRanges,
replications *spanz.Map[*replication.ReplicationSet],
compat *compat.Compat,
) []tablepb.Span {
tablesLenEqual := len(currentTables) == len(m.tableSpans)
tablesLenEqual := currentTables.Len() == len(m.tableSpans)
allTablesFound := true
updateCache := false
for _, tableID := range currentTables {
currentTables.Iter(func(tableID model.TableID, tableStart, tableEnd tablepb.Span) bool {
if _, ok := m.tableSpans[tableID]; !ok {
// Find a new table.
allTablesFound = false
updateCache = true
}

// Reconcile spans from current replications.
tableStart, tableEnd := spanz.TableIDToComparableRange(tableID)
coveredSpans, holes := replications.FindHoles(tableStart, tableEnd)
if len(coveredSpans) == 0 {
// No such spans in replications.
if _, ok := m.tableSpans[tableID]; ok {
// And we have seen such spans before, it means these spans are
// not yet be scheduled due to basic scheduler's batch add task
// rate limit.
continue
return true
}
// And we have not seen such spans before, maybe:
// 1. it's a table being added when starting a changefeed
Expand All @@ -110,37 +109,44 @@ func (m *Reconciler) Reconcile(
if spans, ok := m.tableSpans[tableID]; ok && spans.byAddTable {
// These spans are split by reconciler add table. It may be
// still in progress because of basic scheduler rate limit.
continue
return true
}
// 3. owner switch after some captures failed.
log.Info("schedulerv3: detect owner switch after captures fail",
zap.String("changefeed", m.changefeedID.ID),
zap.String("namespace", m.changefeedID.Namespace),
zap.Int64("tableID", tableID),
zap.Int("holes", len(holes)),
zap.Stringer("spanStart", &tableStart),
zap.Stringer("spanEnd", &tableEnd),
zap.Stringer("foundStart", &coveredSpans[0]),
zap.Stringer("foundEnd", &coveredSpans[len(coveredSpans)-1]))
for i := range holes {
holes[i].TableID = tableID
coveredSpans = append(coveredSpans, holes[i])
zap.String("spanStart", tableStart.String()),
zap.String("spanEnd", tableEnd.String()),
zap.String("foundStart", coveredSpans[0].String()),
zap.String("foundEnd", coveredSpans[len(coveredSpans)-1].String()))
spans := make([]tablepb.Span, 0, len(coveredSpans)+len(holes))
spans = append(spans, coveredSpans...)
for _, s := range holes {
spans = append(spans, tablepb.Span{
TableID: tableID,
StartKey: s.StartKey,
EndKey: s.EndKey,
})
// TODO: maybe we should split holes too.
}
m.tableSpans[tableID] = splitSpans{
byAddTable: false,
spans: coveredSpans,
spans: spans,
}
updateCache = true
} else {
// Found and no hole, maybe:
// 2. owner switch and no capture fails.
m.tableSpans[tableID] = splitSpans{
byAddTable: false,
spans: coveredSpans,
}
ss := m.tableSpans[tableID]
ss.byAddTable = false
ss.spans = ss.spans[:0]
ss.spans = append(ss.spans, coveredSpans...)
m.tableSpans[tableID] = ss
}
}
return true
})

// 4. Drop table by DDL.
// For most of the time, remove tables are unlikely to happen.
Expand All @@ -151,10 +157,11 @@ func (m *Reconciler) Reconcile(
if !tablesLenEqual || !allTablesFound {
// The two sets are not identical. We need to find removed tables.
// Build a tableID hash set to improve performance.
currentTableSet := make(map[model.TableID]struct{}, len(currentTables))
for _, tableID := range currentTables {
currentTableSet := make(map[model.TableID]struct{}, currentTables.Len())
currentTables.Iter(func(tableID model.TableID, _, _ tablepb.Span) bool {
currentTableSet[tableID] = struct{}{}
}
return true
})
for tableID := range m.tableSpans {
_, ok := currentTableSet[tableID]
if !ok {
Expand Down
Loading

0 comments on commit 857e7cf

Please sign in to comment.