Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: remove unused code and add some logs #10458

Merged
merged 2 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"unsafe"

Expand Down Expand Up @@ -272,8 +271,8 @@
}

// TrySplitAndSortUpdateEvent redo log do nothing
func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string) error {
return nil

Check warning on line 275 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L274-L275

Added lines #L274 - L275 were not covered by tests
}

// RedoRowChangedEvent represents the DML event used in RedoLog
Expand Down Expand Up @@ -380,8 +379,8 @@
}

// TrySplitAndSortUpdateEvent do nothing
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string) error {
return nil

Check warning on line 383 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L382-L383

Added lines #L382 - L383 were not covered by tests
}

// IsDelete returns true if the row is a delete event
Expand Down Expand Up @@ -420,23 +419,23 @@
}

// GetHandleKeyColumnValues returns all handle key's column values
func (r *RowChangedEvent) GetHandleKeyColumnValues() []string {
var result []string

var cols []*Column
if r.IsDelete() {
cols = r.PreColumns
} else {
cols = r.Columns
}

Check warning on line 430 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L422-L430

Added lines #L422 - L430 were not covered by tests

result = make([]string, 0)
for _, col := range cols {
if col != nil && col.Flag.IsHandleKey() {
result = append(result, ColumnValueString(col.Value))
}

Check warning on line 436 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L432-L436

Added lines #L432 - L436 were not covered by tests
}
return result

Check warning on line 438 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L438

Added line #L438 was not covered by tests
}

// HandleKeyColInfos returns the column(s) and colInfo(s) corresponding to the handle key(s)
Expand Down Expand Up @@ -610,20 +609,20 @@
indexInfo.Unique = true
}

isPrimary := true

Check warning on line 612 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L612

Added line #L612 was not covered by tests
for _, offset := range colOffsets {
col := columns[offset]
// When only all columns in the index are primary key, then the index is primary key.
if col == nil || !col.Flag.IsPrimaryKey() {
isPrimary = false
}

Check warning on line 618 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L614-L618

Added lines #L614 - L618 were not covered by tests

tiCol := ret.Columns[offset]

Check warning on line 620 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L620

Added line #L620 was not covered by tests
indexCol := &model.IndexColumn{}
indexCol.Name = tiCol.Name

Check warning on line 622 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L622

Added line #L622 was not covered by tests
indexCol.Offset = offset
indexInfo.Columns = append(indexInfo.Columns, indexCol)
indexInfo.Primary = isPrimary

Check warning on line 625 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L625

Added line #L625 was not covered by tests
}

// TODO: revert the "all column set index related flag" to "only the
Expand Down Expand Up @@ -754,13 +753,13 @@
// We set Bootstrap DDL event's startTs and commitTs to 0.
// Because it is generated by the TiCDC, not from the upstream TiDB.
// And they ere useless for a bootstrap DDL event.
func NewBootstrapDDLEvent(tableInfo *TableInfo) *DDLEvent {
return &DDLEvent{
StartTs: 0,
CommitTs: 0,
TableInfo: tableInfo,
IsBootstrap: true,
}

Check warning on line 762 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L756-L762

Added lines #L756 - L762 were not covered by tests
}

// SingleTableTxn represents a transaction which includes many row events in a single table
Expand All @@ -778,11 +777,6 @@
StartTs uint64
CommitTs uint64
Rows []*RowChangedEvent

// control fields of SingleTableTxn
// FinishWg is a barrier txn, after this txn is received, the worker must
// flush cached txns and call FinishWg.Done() to mark txns have been flushed.
FinishWg *sync.WaitGroup
}

// GetCommitTs returns the commit timestamp of the transaction.
Expand All @@ -797,8 +791,8 @@
}
newRows, err := trySplitAndSortUpdateEvent(t.Rows)
if err != nil {
return errors.Trace(err)
}

Check warning on line 795 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L794-L795

Added lines #L794 - L795 were not covered by tests
t.Rows = newRows
return nil
}
Expand Down Expand Up @@ -849,8 +843,8 @@
if e.IsUpdate() && shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if err != nil {
return nil, errors.Trace(err)
}

Check warning on line 847 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L846-L847

Added lines #L846 - L847 were not covered by tests
split = true
rowChangedEvents = append(rowChangedEvents, deleteEvent, insertEvent)
} else {
Expand All @@ -870,8 +864,8 @@
func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
}

Check warning on line 868 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L867-L868

Added lines #L867 - L868 were not covered by tests

for i := range updateEvent.Columns {
col := updateEvent.Columns[i]
Expand All @@ -894,8 +888,8 @@
updateEvent *RowChangedEvent,
) (*RowChangedEvent, *RowChangedEvent, error) {
if updateEvent == nil {
return nil, nil, errors.New("nil event cannot be split")
}

Check warning on line 892 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L891-L892

Added lines #L891 - L892 were not covered by tests

// If there is an update to handle key columns,
// we need to split the event into two events to be compatible with the old format.
Expand Down Expand Up @@ -924,11 +918,6 @@
t.Rows = append(t.Rows, row)
}

// ToWaitFlush indicates whether to wait flushing after the txn is processed or not.
func (t *SingleTableTxn) ToWaitFlush() bool {
return t.FinishWg != nil
}

// TopicPartitionKey contains the topic and partition key of the message.
type TopicPartitionKey struct {
Topic string
Expand Down
11 changes: 0 additions & 11 deletions cdc/processor/sinkmanager/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,6 @@ import (
)

var (
// MemoryQuota indicates memory usage of a changefeed.
MemoryQuota = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sinkmanager",
Name: "memory_quota",
Help: "memory quota of the changefeed",
},
// type includes total, used, component includes sink and redo.
[]string{"namespace", "changefeed", "type", "component"})

// RedoEventCache indicates redo event memory usage of a changefeed.
RedoEventCache = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func NewMySQLBackends(
func (s *mysqlBackend) OnTxnEvent(event *dmlsink.TxnCallbackableEvent) (needFlush bool) {
s.events = append(s.events, event)
s.rows += len(event.Event.Rows)
return event.Event.ToWaitFlush() || s.rows >= s.cfg.MaxTxnRow
return s.rows >= s.cfg.MaxTxnRow
}

// Flush implements interface backend.
Expand Down
5 changes: 2 additions & 3 deletions cdc/sink/dmlsink/txn/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (w *worker) close() {
w.txnCh.CloseAndDrain()
}

// Run a loop.
// Continuously get events from txnCh and call backend flush based on conditions.
func (w *worker) runLoop() error {
defer func() {
if err := w.backend.Close(); err != nil {
Expand Down Expand Up @@ -148,7 +148,7 @@ func (w *worker) runLoop() error {
}

// onEvent is called when a new event is received.
// It returns true if the event is sent to backend.
// It returns true if it needs flush immediately.
func (w *worker) onEvent(txn txnWithNotifier) bool {
w.hasPending = true

Expand All @@ -169,7 +169,6 @@ func (w *worker) onEvent(txn txnWithNotifier) bool {
}

// doFlush flushes the backend.
// It returns true only if it can no longer be flushed.
func (w *worker) doFlush(flushTimeSlice *time.Duration) error {
if w.hasPending {
start := time.Now()
Expand Down
Loading