Skip to content

Commit

Permalink
fix filter skip checkpoint problem
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored and ti-chi-bot committed Oct 11, 2021
1 parent 8d1aa3c commit 20ddd85
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 25 deletions.
58 changes: 40 additions & 18 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/pkg/loader"
"github.com/pingcap/tidb-binlog/pkg/util"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -290,14 +291,27 @@ func (s *Syncer) run() error {
return err
}

var lastDDLSchemaVersion int64
var b *binlogItem
var (
lastDDLSchemaVersion int64
b *binlogItem
fakeBinlog *pb.Binlog
pushFakeBinlog chan<- *pb.Binlog
lastAddCommitTS int64
lastFakeCommitTime time.Time
)
dsyncError := s.dsyncer.Error()

var fakeBinlog *pb.Binlog
var pushFakeBinlog chan<- *pb.Binlog
appendFakeBinlogIfNeeded := func(binlog *pb.Binlog, commitTS int64) {
if fakeCommitTime := oracle.GetTimeFromTS(uint64(commitTS)); fakeCommitTime.Sub(lastFakeCommitTime) > 3*time.Second {
lastFakeCommitTime = fakeCommitTime
if binlog == nil {
binlog = util.GenFakeBinlog(commitTS)
}
fakeBinlogs = append(fakeBinlogs, binlog)
fakeBinlogPreAddTS = append(fakeBinlogPreAddTS, lastAddCommitTS)
}
}

var lastAddComitTS int64
dsyncError := s.dsyncer.Error()
ForLoop:
for {
// check if we can safely push a fake binlog
Expand Down Expand Up @@ -335,8 +349,7 @@ ForLoop:
}

if startTS == commitTS {
fakeBinlogs = append(fakeBinlogs, binlog)
fakeBinlogPreAddTS = append(fakeBinlogPreAddTS, lastAddComitTS)
appendFakeBinlogIfNeeded(binlog, commitTS)
} else if jobID == 0 {
preWriteValue := binlog.GetPrewriteValue()
preWrite := &pb.PrewriteValue{}
Expand All @@ -363,39 +376,45 @@ ForLoop:
break ForLoop
}

var isFilterTransaction = false
var err1 error
var (
isFilterTransaction = false
ignore = false
err1 error
)
if s.loopbackSync != nil && s.loopbackSync.LoopbackControl {
isFilterTransaction, err1 = loopBackStatus(binlog, preWrite, s.schema, s.loopbackSync)
if err1 != nil {
err = errors.Annotate(err1, "analyze transaction failed")
break ForLoop
}
}

var ignore bool
ignore, err = filterTable(preWrite, s.filter, s.schema)
if err != nil {
err = errors.Annotate(err, "filterTable failed")
break ForLoop
if !isFilterTransaction {
ignore, err = filterTable(preWrite, s.filter, s.schema)
if err != nil {
err = errors.Annotate(err, "filterTable failed")
break ForLoop
}
}

if !ignore && !isFilterTransaction {
s.addDMLEventMetrics(preWrite.GetMutations())
beginTime := time.Now()
lastAddComitTS = binlog.GetCommitTs()
lastAddCommitTS = binlog.GetCommitTs()
err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite, SchemaVersion: preWrite.SchemaVersion})
if err != nil {
err = errors.Annotatef(err, "failed to add item")
break ForLoop
}
executeHistogram.Observe(time.Since(beginTime).Seconds())
} else {
appendFakeBinlogIfNeeded(nil, commitTS)
}
} else if jobID > 0 {
log.Debug("get ddl binlog job", zap.Stringer("job", b.job))

if skipUnsupportedDDLJob(b.job) {
log.Info("skip unsupported DDL job", zap.Stringer("job", b.job))
appendFakeBinlogIfNeeded(nil, commitTS)
continue
}

Expand All @@ -413,6 +432,7 @@ ForLoop:

if b.job.SchemaState == model.StateDeleteOnly && b.job.Type == model.ActionDropColumn {
log.Info("Syncer skips DeleteOnly DDL", zap.Stringer("job", b.job), zap.Int64("ts", b.GetCommitTs()))
appendFakeBinlogIfNeeded(nil, commitTS)
continue
}

Expand All @@ -427,6 +447,7 @@ ForLoop:
if s.filter.SkipSchemaAndTable(schema, table) {
log.Info("skip ddl by filter", zap.String("schema", schema), zap.String("table", table),
zap.String("sql", sql), zap.Int64("commit ts", commitTS))
appendFakeBinlogIfNeeded(nil, commitTS)
continue
}

Expand All @@ -439,14 +460,15 @@ ForLoop:
if s.cfg.DestDBType == "tidb" || s.cfg.DestDBType == "mysql" {
shouldSkip = true
} else {
appendFakeBinlogIfNeeded(nil, commitTS)
continue
}
}

// Add ddl item to downstream.
s.addDDLCount()
beginTime := time.Now()
lastAddComitTS = binlog.GetCommitTs()
lastAddCommitTS = binlog.GetCommitTs()

log.Info("add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed",
zap.String("sql", sql), zap.Int64("commit ts", binlog.CommitTs))
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tipb/go-binlog"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/net/context"
Expand Down Expand Up @@ -55,6 +56,15 @@ func GetTSO(pdCli pd.Client) (int64, error) {
return ts, nil
}

// GenFakeBinlog generates a fake binlog from given tso
func GenFakeBinlog(ts int64) *binlog.Binlog {
return &binlog.Binlog{
StartTs: ts,
Tp: binlog.BinlogType_Rollback,
CommitTs: ts,
}
}

// TSOToRoughTime translates tso to rough time that used to display
func TSOToRoughTime(ts int64) time.Time {
t := time.Unix(ts>>18/1000, 0)
Expand Down
8 changes: 1 addition & 7 deletions pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,13 +469,7 @@ func (s *Server) genFakeBinlog() (*pb.Binlog, error) {
return nil, errors.Trace(err)
}

bl := &binlog.Binlog{
StartTs: ts,
Tp: binlog.BinlogType_Rollback,
CommitTs: ts,
}

return bl, nil
return util.GenFakeBinlog(ts), nil
}

func (s *Server) writeFakeBinlog() (*pb.Binlog, error) {
Expand Down

0 comments on commit 20ddd85

Please sign in to comment.