Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync-diff-inspector: skip validation for tables that exist only upstream or downstream and print skipped information in summary and progress #693

Merged
merged 21 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ func (df *Diff) StructEqual(ctx context.Context) error {
tableIndex = df.startRange.ChunkRange.Index.TableIndex
}
for ; tableIndex < len(tables); tableIndex++ {
isEqual, isSkip, isAllTableExist := false, true, tables[tableIndex].NeedSkippedTable
if source.AllTableExist(tables[tableIndex]) {
isEqual, isSkip, isAllTableExist := false, true, tables[tableIndex].TableLack
if common.AllTableExist(tables[tableIndex].TableLack) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if common.AllTableExist(tables[tableIndex].TableLack) {
if common.AllTableExist(isAllTableExist) {

var err error
isEqual, isSkip, err = df.compareStruct(ctx, tableIndex)
if err != nil {
Expand Down Expand Up @@ -421,9 +421,9 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
if rangeInfo.ChunkRange.Type == chunk.Empty {
dml.node.State = checkpoints.IgnoreState
// for tables that don't exist upstream or downstream
if !source.AllTableExist(tableDiff) {
upCount, _ := dbutil.GetRowCount(ctx, df.upstream.GetDB(), schema, table, "", nil)
downCount, _ := dbutil.GetRowCount(ctx, df.downstream.GetDB(), schema, table, "", nil)
if !common.AllTableExist(tableDiff.TableLack) {
upCount := df.upstream.GetCountAndCrc32(ctx, rangeInfo).Count
downCount := df.downstream.GetCountAndCrc32(ctx, rangeInfo).Count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need count here. However, this function will also compute checksum. I'm afraid this will affect the efficiency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I defined a new interface GetCountForLackTable in the next commit(34293bb).

df.report.SetTableDataCheckResult(schema, table, false, int(upCount), int(downCount), upCount, downCount, id)
return false
}
Expand Down
31 changes: 10 additions & 21 deletions sync_diff_inspector/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (r *Report) getDiffRows() [][]string {
}
diffRow := make([]string, 0)
diffRow = append(diffRow, dbutil.TableName(schema, table))
if !AllTableExist(result) {
if !common.AllTableExist(result.TableLack) {
diffRow = append(diffRow, "skipped")
} else {
diffRow = append(diffRow, "succeed")
Expand Down Expand Up @@ -176,7 +176,7 @@ func (r *Report) CommitSummary() error {
for _, result := range tableMap {
if result.StructEqual && result.DataEqual {
passNum++
} else if !AllTableExist(result) {
} else if !common.AllTableExist(result.TableLack) {
skippedNum++
} else {
failedNum++
Expand Down Expand Up @@ -244,18 +244,19 @@ func (r *Report) Print(w io.Writer) error {
for table, result := range tableMap {
if !result.StructEqual {
if result.DataSkip {
if UpstreamTableLack(result) {
switch result.TableLack {
case common.UpstreamTableLackFlag:
summary.WriteString(fmt.Sprintf("The data of %s does not exist in upstream database\n", dbutil.TableName(schema, table)))
} else if DownstreamTableLack(result) {
case common.DownstreamTableLackFlag:
summary.WriteString(fmt.Sprintf("The data of %s does not exist in downstream database\n", dbutil.TableName(schema, table)))
} else {
default:
summary.WriteString(fmt.Sprintf("The structure of %s is not equal, and data-check is skipped\n", dbutil.TableName(schema, table)))
}
} else {
summary.WriteString(fmt.Sprintf("The structure of %s is not equal\n", dbutil.TableName(schema, table)))
}
}
if !result.DataEqual && AllTableExist(result) {
if !result.DataEqual && common.AllTableExist(result.TableLack) {
summary.WriteString(fmt.Sprintf("The data of %s is not equal\n", dbutil.TableName(schema, table)))
}
}
Expand Down Expand Up @@ -318,7 +319,7 @@ func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, ski
tableResult.StructEqual = equal
tableResult.DataSkip = skip
tableResult.TableLack = exist
if !equal && AllTableExist(tableResult) && r.Result != Error {
if !equal && common.AllTableExist(tableResult.TableLack) && r.Result != Error {
r.Result = Fail
}
}
Expand All @@ -340,11 +341,11 @@ func (r *Report) SetTableDataCheckResult(schema, table string, equal bool, rowsA
}
result.ChunkMap[id.ToString()].RowsAdd += rowsAdd
result.ChunkMap[id.ToString()].RowsDelete += rowsDelete
if r.Result != Error && AllTableExist(result) {
if r.Result != Error && common.AllTableExist(result.TableLack) {
r.Result = Fail
}
}
if !equal && AllTableExist(result) && r.Result != Error {
if !equal && common.AllTableExist(result.TableLack) && r.Result != Error {
r.Result = Fail
}
}
Expand Down Expand Up @@ -416,15 +417,3 @@ func (r *Report) GetSnapshot(chunkID *chunk.ChunkID, schema, table string) (*Rep
task: task,
}, nil
}

func AllTableExist(result *TableResult) bool {
return result.TableLack == common.AllTableExistFlag
}

func UpstreamTableLack(result *TableResult) bool {
return result.TableLack == common.UpstreamTableLackFlag
}

func DownstreamTableLack(result *TableResult) bool {
return result.TableLack == common.DownstreamTableLackFlag
}
2 changes: 1 addition & 1 deletion sync_diff_inspector/source/chunks_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (t *ChunksIterator) produceChunks(ctx context.Context, startRange *splitter
for ; t.nextTableIndex < len(t.TableDiffs); t.nextTableIndex++ {
curTableIndex := t.nextTableIndex
// skip data-check, but still need to send a empty chunk to make checkpoint continuous
if t.TableDiffs[curTableIndex].IgnoreDataCheck || !AllTableExist(t.TableDiffs[curTableIndex]) {
if t.TableDiffs[curTableIndex].IgnoreDataCheck || !common.AllTableExist(t.TableDiffs[curTableIndex].TableLack) {
pool.Apply(func() {
table := t.TableDiffs[curTableIndex]
progressID := dbutil.TableName(table.Schema, table.Table)
Expand Down
12 changes: 8 additions & 4 deletions sync_diff_inspector/source/common/table_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@ type TableDiff struct {

ChunkSize int64 `json:"chunk-size"`

// NeedSkippedTable = 1: the table only exists downstream,
// NeedSkippedTable = -1: the table only exists upstream,
// NeedSkippedTable = 0: the table exists both upstream and downstream.
NeedSkippedTable int `json:"-"`
// TableLack = 1: the table only exists downstream,
// TableLack = -1: the table only exists upstream,
// TableLack = 0: the table exists both upstream and downstream.
TableLack int `json:"-"`
}

const (
AllTableExistFlag = 0
DownstreamTableLackFlag = -1
UpstreamTableLackFlag = 1
)

func AllTableExist(tableLack int) bool {
return tableLack == AllTableExistFlag
}
70 changes: 37 additions & 33 deletions sync_diff_inspector/source/mysql_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func getMatchedSourcesForTable(sourceTablesMap map[string][]*common.TableShardSo
log.Fatal("unreachable, source tables map shouldn't be nil.")
}
matchSources, ok := sourceTablesMap[utils.UniqueID(table.Schema, table.Table)]
if !ok {
if !ok && common.AllTableExist(table.TableLack) {
log.Fatal("unreachable, no match source tables in mysql shard source.")
}
return matchSources
Expand Down Expand Up @@ -97,42 +97,44 @@ func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitte
beginTime := time.Now()
table := s.tableDiffs[tableRange.GetTableIndex()]
chunk := tableRange.GetChunk()

// for tables that do not exist upstream or downstream
if !AllTableExist(table) {
return &ChecksumInfo{
Count: 0,
}
}
matchSources := getMatchedSourcesForTable(s.sourceTablesMap, table)
infoCh := make(chan *ChecksumInfo, len(s.sourceTablesMap))

for _, ms := range matchSources {
go func(ms *common.TableShardSource) {
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args)
infoCh <- &ChecksumInfo{
Checksum: checksum,
Count: count,
Err: err,
}
}(ms)
}
defer close(infoCh)

var (
err error
totalCount int64
totalChecksum int64
)
matchSources := getMatchedSourcesForTable(s.sourceTablesMap, table)

if common.AllTableExist(table.TableLack) {
infoCh := make(chan *ChecksumInfo, len(s.sourceTablesMap))
for _, ms := range matchSources {
go func(ms *common.TableShardSource) {
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args)
infoCh <- &ChecksumInfo{
Checksum: checksum,
Count: count,
Err: err,
}
}(ms)
}
defer close(infoCh)

for range matchSources {
info := <-infoCh
// catch the first error
if err == nil && info.Err != nil {
err = info.Err
for range matchSources {
info := <-infoCh
// catch the first error
if err == nil && info.Err != nil {
err = info.Err
}
totalCount += info.Count
totalChecksum ^= info.Checksum
}
} else {
var count int64
if matchSources != nil {
for _, ms := range matchSources {
count, err = dbutil.GetRowCount(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, "", nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error will be returned in ChecksumInfo. Is that expected?

totalCount += count
}
}
totalCount += info.Count
totalChecksum ^= info.Checksum
}

cost := time.Since(beginTime)
Expand Down Expand Up @@ -169,7 +171,7 @@ func (s *MySQLSources) GetRowsIterator(ctx context.Context, tableRange *splitter

table := s.tableDiffs[tableRange.GetTableIndex()]
// for tables that do not exist upstream or downstream
if !AllTableExist(table) {
if !common.AllTableExist(table.TableLack) {
return nil, nil
}
matchSources := getMatchedSourcesForTable(s.sourceTablesMap, table)
Expand Down Expand Up @@ -234,7 +236,7 @@ func (s *MySQLSources) GetSnapshot() string {
func (s *MySQLSources) GetSourceStructInfo(ctx context.Context, tableIndex int) ([]*model.TableInfo, error) {
tableDiff := s.GetTables()[tableIndex]
// for tables that do not exist upstream or downstream
if !AllTableExist(tableDiff) {
if !common.AllTableExist(tableDiff.TableLack) {
return nil, nil
}
tableSources := getMatchedSourcesForTable(s.sourceTablesMap, tableDiff)
Expand Down Expand Up @@ -334,12 +336,14 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*
}
}
uniqueId := utils.UniqueID(targetSchema, targetTable)
var isMatched bool
// get all tables from all source db instance
if f.MatchTable(targetSchema, targetTable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var isMatched bool
// get all tables from all source db instance
if f.MatchTable(targetSchema, targetTable) {
isMatched := f.MatchTable(targetSchema, targetTable)
if isMatched {

// if match the filter, we should respect it and check target has this table later.
sourceTablesAfterRoute[uniqueId] = struct{}{}
isMatched = true
}
if _, ok := targetUniqueTableMap[uniqueId]; !ok {
if _, ok := targetUniqueTableMap[uniqueId]; !ok && !(isMatched && skipNonExistingTable) {
continue
}
maxSourceRouteTableCount[uniqueId]++
Expand Down
14 changes: 5 additions & 9 deletions sync_diff_inspector/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ func checkTableMatched(tableDiffs []*common.TableDiff, targetMap map[string]stru
return tableDiffs, errors.Errorf("the source has no table to be compared. target-table is `%s`", tableDiff)
}
index := tableIndexMap[tableDiff]
if tableDiffs[index].NeedSkippedTable == 0 {
tableDiffs[index].NeedSkippedTable = common.UpstreamTableLackFlag
if tableDiffs[index].TableLack == 0 {
tableDiffs[index].TableLack = common.UpstreamTableLackFlag
log.Info("the source has no table to be compared", zap.String("target-table", tableDiff))
}
}
Expand All @@ -400,9 +400,9 @@ func checkTableMatched(tableDiffs []*common.TableDiff, targetMap map[string]stru
}
slice := strings.Split(strings.Replace(tableDiff, "`", "", -1), ".")
tableDiffs = append(tableDiffs, &common.TableDiff{
Schema: slice[0],
Table: slice[1],
NeedSkippedTable: common.DownstreamTableLackFlag,
Schema: slice[0],
Table: slice[1],
TableLack: common.DownstreamTableLackFlag,
})
log.Info("the target has no table to be compared", zap.String("source-table", tableDiff))
}
Expand All @@ -419,7 +419,3 @@ func getIndexMapForTable(tableDiffs []*common.TableDiff) map[string]int {
}
return tableIndexMap
}

func AllTableExist(tableDiffs *common.TableDiff) bool {
return tableDiffs.NeedSkippedTable == common.AllTableExistFlag
}
6 changes: 3 additions & 3 deletions sync_diff_inspector/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ func TestCheckTableMatched(t *testing.T) {

tables, err = checkTableMatched(tableDiffs, tmap, smap, true)
require.NoError(t, err)
require.Equal(t, 0, tables[0].NeedSkippedTable)
require.Equal(t, 1, tables[1].NeedSkippedTable)
require.Equal(t, -1, tables[2].NeedSkippedTable)
require.Equal(t, 0, tables[0].TableLack)
require.Equal(t, 1, tables[1].TableLack)
require.Equal(t, -1, tables[2].TableLack)
}
17 changes: 15 additions & 2 deletions sync_diff_inspector/source/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,20 @@ func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter.
beginTime := time.Now()
table := s.tableDiffs[tableRange.GetTableIndex()]
chunk := tableRange.GetChunk()
var (
checksum int64
count int64
err error
)

matchSource := getMatchSource(s.sourceTableMap, table)
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args)
if common.AllTableExist(table.TableLack) {
count, checksum, err = utils.GetCountAndCRC32Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args)
} else {
if matchSource != nil {
count, err = dbutil.GetRowCount(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, "", nil)
}
}

cost := time.Since(beginTime)
return &ChecksumInfo{
Expand Down Expand Up @@ -236,11 +247,13 @@ func NewTiDBSource(ctx context.Context, tableDiffs []*common.TableDiff, ds *conf
}

uniqueId := utils.UniqueID(targetSchema, targetTable)
var isMatched bool
if f.MatchTable(targetSchema, targetTable) {
// if match the filter, we should respect it and check target has this table later.
sourceTablesAfterRoute[uniqueId] = struct{}{}
isMatched = true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified.

if _, ok := targetUniqueTableMap[uniqueId]; ok {
if _, ok := targetUniqueTableMap[uniqueId]; ok || (isMatched && skipNonExistingTable) {
if _, ok := sourceTableMap[uniqueId]; ok {
log.Error("TiDB source don't support compare multiple source tables with one downstream table," +
" if this happening when diff on same instance is fine. otherwise we are not guarantee this diff result is right")
Expand Down
Loading