Skip to content

Commit

Permalink
Merge branch 'master' into fixConcurrentTest
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD authored Mar 10, 2022
2 parents cfd5447 + 19a1fcf commit ebb9f63
Show file tree
Hide file tree
Showing 4 changed files with 818 additions and 21 deletions.
65 changes: 50 additions & 15 deletions dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ const (
// change of table
// binlog changes are clustered into table changes
// the validator validates changes of table-grain at a time.
//nolint
type tableChange struct {
table *validateTableInfo
rows map[string]*rowChange
Expand All @@ -77,12 +76,11 @@ type tableChange struct {
type rowChange struct {
table *validateTableInfo
key string
pkValues []string
pkValues []string // todo: might be removed in later pr
data []interface{}
tp rowChangeType
lastMeetTS int64 // the last meet timestamp(in seconds)
//nolint
failedCnt int // failed count
failedCnt int // failed count
}

// DataValidator
Expand Down Expand Up @@ -116,6 +114,7 @@ type DataValidator struct {

result pb.ProcessResult
validateInterval time.Duration
workers []*validateWorker
changeEventCount []atomic.Int64
workerCnt int

Expand All @@ -133,6 +132,7 @@ func NewContinuousDataValidator(cfg *config.SubTaskConfig, syncerObj *Syncer) *D

v.workerCnt = cfg.ValidatorCfg.WorkerCount
v.changeEventCount = make([]atomic.Int64, 3)
v.workers = make([]*validateWorker, v.workerCnt)
v.validateInterval = validationInterval

v.unsupportedTable = make(map[string]string)
Expand All @@ -144,7 +144,7 @@ func (v *DataValidator) initialize() error {
v.ctx, v.cancel = context.WithCancel(context.Background())
v.tctx = tcontext.NewContext(v.ctx, v.L)
v.result.Reset()
// todo: many place may put into this channel, choose a proper channel size or enhance error handling
// todo: enhance error handling
v.errChan = make(chan error, 10)

newCtx, cancelFunc := context.WithTimeout(v.ctx, unit.DefaultInitTimeout)
Expand Down Expand Up @@ -206,17 +206,34 @@ func (v *DataValidator) Start(expect pb.Stage) {
}

v.wg.Add(1)
go func() {
defer v.wg.Done()
v.doValidate()
}()
go v.doValidate()

v.wg.Add(1)
go v.printStatusRoutine()

v.errProcessWg.Add(1)
go v.errorProcessRoutine()

v.stage = pb.Stage_Running
}

func (v *DataValidator) printStatusRoutine() {
defer v.wg.Done()
for {
select {
case <-v.ctx.Done():
return
case <-time.After(checkInterval):
// todo: status about pending row changes
v.L.Info("processed event status",
zap.Int64("insert", v.changeEventCount[rowInsert].Load()),
zap.Int64("update", v.changeEventCount[rowUpdated].Load()),
zap.Int64("delete", v.changeEventCount[rowDeleted].Load()),
)
}
}
}

func (v *DataValidator) fillResult(err error, needLock bool) {
if needLock {
v.Lock()
Expand Down Expand Up @@ -292,6 +309,8 @@ func (v *DataValidator) waitSyncerRunning() error {

// doValidate: runs in a separate goroutine.
func (v *DataValidator) doValidate() {
defer v.wg.Done()

if err := v.waitSyncerRunning(); err != nil {
v.errChan <- terror.Annotate(err, "failed to wait syncer running")
return
Expand All @@ -311,6 +330,11 @@ func (v *DataValidator) doValidate() {

v.L.Info("start continuous validation")
v.startValidateWorkers()
defer func() {
for _, worker := range v.workers {
worker.close()
}
}()

currLoc := location.CloneWithFlavor(v.cfg.Flavor)
for {
Expand Down Expand Up @@ -389,9 +413,20 @@ func (v *DataValidator) Stage() pb.Stage {
}

func (v *DataValidator) startValidateWorkers() {
v.wg.Add(v.workerCnt)
for i := 0; i < v.workerCnt; i++ {
worker := newValidateWorker(v, i)
v.workers[i] = worker
go func() {
v.wg.Done()
worker.run()
}()
}
}

func (v *DataValidator) dispatchRowChange(key string, row *rowChange) {
hashVal := int(utils.GenHashKey(key)) % v.workerCnt
v.workers[hashVal].rowChangeCh <- row
}

func (v *DataValidator) processRowsEvent(header *replication.EventHeader, ev *replication.RowsEvent) error {
Expand Down Expand Up @@ -479,7 +514,7 @@ func (v *DataValidator) processRowsEvent(header *replication.EventHeader, ev *re
row := ev.Rows[i]
pkValue := make([]string, len(pk.Columns))
for _, idx := range pkIndices {
pkValue[idx] = string(genColData(row[idx]))
pkValue[idx] = genColData(row[idx])
}
key := genRowKey(pkValue)

Expand All @@ -488,7 +523,7 @@ func (v *DataValidator) processRowsEvent(header *replication.EventHeader, ev *re
afterRow := ev.Rows[i+1]
afterPkValue := make([]string, len(pk.Columns))
for _, idx := range pkIndices {
afterPkValue[idx] = string(genColData(afterRow[idx]))
afterPkValue[idx] = genColData(afterRow[idx])
}
afterKey := genRowKey(afterPkValue)
if afterKey != key {
Expand Down Expand Up @@ -542,13 +577,13 @@ func genRowKey(pkValues []string) string {
return strings.Join(pkValues, "-")
}

func genColData(v interface{}) []byte {
func genColData(v interface{}) string {
switch dv := v.(type) {
case []byte:
return dv
return string(dv)
case string:
return []byte(dv)
return dv
}
s := fmt.Sprintf("%v", v)
return []byte(s)
return s
}
12 changes: 6 additions & 6 deletions dm/syncer/data_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func genSubtaskConfig(t *testing.T) *config.SubTaskConfig {
return cfg
}

//nolint
func genDBConn(t *testing.T, db *sql.DB, cfg *config.SubTaskConfig) *dbconn.DBConn {
t.Helper()
baseDB := conn.NewBaseDB(db, func() {})
Expand Down Expand Up @@ -424,6 +423,7 @@ func TestValidatorDoValidate(t *testing.T) {
streamer: mockStreamer,
closed: false,
}
validator.wg.Add(1) // wg.Done is run in doValidate
validator.doValidate()
require.Equal(t, int64(1), validator.changeEventCount[rowInsert].Load())
require.Equal(t, int64(1), validator.changeEventCount[rowUpdated].Load())
Expand All @@ -448,13 +448,13 @@ func TestValidatorGetRowChangeType(t *testing.T) {

func TestValidatorGenColData(t *testing.T) {
res := genColData(1)
require.Equal(t, "1", string(res))
require.Equal(t, "1", res)
res = genColData(1.2)
require.Equal(t, "1.2", string(res))
require.Equal(t, "1.2", res)
res = genColData("abc")
require.Equal(t, "abc", string(res))
require.Equal(t, "abc", res)
res = genColData([]byte{'\x01', '\x02', '\x03'})
require.Equal(t, "\x01\x02\x03", string(res))
require.Equal(t, "\x01\x02\x03", res)
res = genColData(decimal.NewFromInt(222123123))
require.Equal(t, "222123123", string(res))
require.Equal(t, "222123123", res)
}
Loading

0 comments on commit ebb9f63

Please sign in to comment.