Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync-diff-inspector: skip validation for tables that exist only upstream or downstream and print skipped information in summary and progress #693

Merged
merged 21 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sync_diff_inspector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ type Config struct {
CheckStructOnly bool `toml:"check-struct-only" json:"check-struct-only"`
// experimental feature: only check table data without table struct
CheckDataOnly bool `toml:"check-data-only" json:"-"`
// skip validation for tables that don't exist upstream or downstream
SkipNonExistingTable bool `toml:"skip-non-existing-table" json:"-"`
// DMAddr is dm-master's address, the format should like "http://127.0.0.1:8261"
DMAddr string `toml:"dm-addr" json:"dm-addr"`
// DMTask string `toml:"dm-task" json:"dm-task"`
Expand Down Expand Up @@ -411,6 +413,7 @@ func NewConfig() *Config {
fs.IntVar(&cfg.CheckThreadCount, "check-thread-count", 4, "how many goroutines are created to check data")
fs.BoolVar(&cfg.ExportFixSQL, "export-fix-sql", true, "set true if want to compare rows or set to false will only compare checksum")
fs.BoolVar(&cfg.CheckStructOnly, "check-struct-only", false, "ignore check table's data")
fs.BoolVar(&cfg.SkipNonExistingTable, "skip-non-existing-table", false, "skip validation for tables that don't exist upstream or downstream")
fs.BoolVar(&cfg.CheckDataOnly, "check-data-only", false, "ignore check table's struct")

_ = fs.MarkHidden("check-data-only")
Expand Down
28 changes: 20 additions & 8 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,16 @@ func (df *Diff) StructEqual(ctx context.Context) error {
tableIndex = df.startRange.ChunkRange.Index.TableIndex
}
for ; tableIndex < len(tables); tableIndex++ {
isEqual, isSkip, err := df.compareStruct(ctx, tableIndex)
if err != nil {
return errors.Trace(err)
isEqual, isSkip, isAllTableExist := false, true, tables[tableIndex].TableLack
if common.AllTableExist(isAllTableExist) {
var err error
isEqual, isSkip, err = df.compareStruct(ctx, tableIndex)
if err != nil {
return errors.Trace(err)
}
}
progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip)
df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip)
progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip, isAllTableExist)
df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip, isAllTableExist)
}
return nil
}
Expand Down Expand Up @@ -411,12 +415,21 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
node: rangeInfo.ToNode(),
}
defer func() { df.sqlCh <- dml }()
tableDiff := df.downstream.GetTables()[rangeInfo.GetTableIndex()]
schema, table := tableDiff.Schema, tableDiff.Table
id := rangeInfo.ChunkRange.Index
if rangeInfo.ChunkRange.Type == chunk.Empty {
dml.node.State = checkpoints.IgnoreState
// for tables that don't exist upstream or downstream
if !common.AllTableExist(tableDiff.TableLack) {
upCount := df.upstream.GetCountForLackTable(ctx, rangeInfo)
downCount := df.downstream.GetCountForLackTable(ctx, rangeInfo)
df.report.SetTableDataCheckResult(schema, table, false, int(upCount), int(downCount), upCount, downCount, id)
return false
}
return true
}
tableDiff := df.downstream.GetTables()[rangeInfo.GetTableIndex()]
schema, table := tableDiff.Schema, tableDiff.Table

var state string = checkpoints.SuccessState

isEqual, upCount, downCount, err := df.compareChecksumAndGetCount(ctx, rangeInfo)
Expand Down Expand Up @@ -447,7 +460,6 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
isEqual = isDataEqual
}
dml.node.State = state
id := rangeInfo.ChunkRange.Index
df.report.SetTableDataCheckResult(schema, table, isEqual, dml.rowAdd, dml.rowDelete, upCount, downCount, id)
return isEqual
}
Expand Down
41 changes: 34 additions & 7 deletions sync_diff_inspector/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"os"
"strings"
"time"

"github.com/pingcap/tidb-tools/sync_diff_inspector/source/common"
)

type TableProgressPrinter struct {
Expand Down Expand Up @@ -53,7 +55,9 @@ const (
TABLE_STATE_RESULT_FAIL_STRUCTURE_PASS table_state_t = 0x40
TABLE_STATE_RESULT_DIFFERENT table_state_t = 0x80
TABLE_STATE_HEAD table_state_t = 0xff
TABLE_STATE_RESULT_MASK table_state_t = 0xf0
TABLE_STATE_RESULT_MASK table_state_t = 0xff0
TABLE_STATE_NOT_EXSIT_UPSTREAM table_state_t = 0x100
TABLE_STATE_NOT_EXSIT_DOWNSTREAM table_state_t = 0x200
)

type TableProgress struct {
Expand Down Expand Up @@ -127,11 +131,18 @@ func (tpp *TableProgressPrinter) UpdateTotal(name string, total int, stopUpdate
}
}

func (tpp *TableProgressPrinter) RegisterTable(name string, isFailed bool, isDone bool) {
func (tpp *TableProgressPrinter) RegisterTable(name string, isFailed bool, isDone bool, isExist int) {
var state table_state_t
if isFailed {
if isDone {
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE | TABLE_STATE_REGISTER
switch isExist {
case common.UpstreamTableLackFlag:
state = TABLE_STATE_NOT_EXSIT_UPSTREAM | TABLE_STATE_REGISTER
case common.DownstreamTableLackFlag:
state = TABLE_STATE_NOT_EXSIT_DOWNSTREAM | TABLE_STATE_REGISTER
default:
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE | TABLE_STATE_REGISTER
}
} else {
state = TABLE_STATE_RESULT_FAIL_STRUCTURE_CONTINUE | TABLE_STATE_REGISTER
}
Expand Down Expand Up @@ -181,6 +192,7 @@ func (tpp *TableProgressPrinter) PrintSummary() {
tpp.tableNums,
)
} else {
SkippedNum := 0
for p := tpp.tableFailList.Front(); p != nil; p = p.Next() {
tp := p.Value.(*TableProgress)
if tp.state&(TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE|TABLE_STATE_RESULT_FAIL_STRUCTURE_CONTINUE) != 0 {
Expand All @@ -189,10 +201,18 @@ func (tpp *TableProgressPrinter) PrintSummary() {
if tp.state&(TABLE_STATE_RESULT_DIFFERENT) != 0 {
fixStr = fmt.Sprintf("%sThe data of `%s` is not equal.\n", fixStr, tp.name)
}
if tp.state&(TABLE_STATE_NOT_EXSIT_DOWNSTREAM) != 0 {
fixStr = fmt.Sprintf("%sThe data of `%s` does not exist in downstream database.\n", fixStr, tp.name)
SkippedNum++
}
if tp.state&(TABLE_STATE_NOT_EXSIT_UPSTREAM) != 0 {
fixStr = fmt.Sprintf("%sThe data of `%s` does not exist in upstream database.\n", fixStr, tp.name)
SkippedNum++
}
}
fixStr = fmt.Sprintf(
"%s\nThe rest of the tables are all equal.\nThe patch file has been generated to './output_dir/patch.sql'\nYou can view the comparison details through './output_dir/sync_diff_inspector.log'\n",
fixStr,
"%s\nThe rest of the tables are all equal.\nA total of %d tables have been compared, %d tables finished, %d tables failed, %d tables skipped.\nThe patch file has been generated to './output_dir/patch.sql'\nYou can view the comparison details through './output_dir/sync_diff_inspector.log'\n",
fixStr, tpp.tableNums, tpp.tableNums-tpp.tableFailList.Len(), tpp.tableFailList.Len()-SkippedNum, SkippedNum,
)
}

Expand Down Expand Up @@ -337,6 +357,13 @@ func (tpp *TableProgressPrinter) flush(stateIsChanged bool) {
tpp.lines++
tpp.progressTableNums++
tp.state = TABLE_STATE_COMPARING
case TABLE_STATE_NOT_EXSIT_UPSTREAM, TABLE_STATE_NOT_EXSIT_DOWNSTREAM:
dynStr = fmt.Sprintf("%sComparing the table data of `%s` ...skipped\n", dynStr, tp.name)
tpp.tableFailList.PushBack(tp)
preNode := p.Prev()
tpp.tableList.Remove(p)
p = preNode
tpp.finishTableNums++
case TABLE_STATE_RESULT_FAIL_STRUCTURE_DONE:
fixStr = fmt.Sprintf("%sComparing the table structure of `%s` ... failure\n", fixStr, tp.name)
tpp.tableFailList.PushBack(tp)
Expand Down Expand Up @@ -410,9 +437,9 @@ func UpdateTotal(name string, total int, stopUpdate bool) {
}
}

func RegisterTable(name string, isFailed bool, isDone bool) {
func RegisterTable(name string, isFailed bool, isDone bool, isExist int) {
if progress_ != nil {
progress_.RegisterTable(name, isFailed, isDone)
progress_.RegisterTable(name, isFailed, isDone, isExist)
}
}

Expand Down
31 changes: 21 additions & 10 deletions sync_diff_inspector/progress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ import (
"testing"
"time"

"github.com/pingcap/tidb-tools/sync_diff_inspector/source/common"
"github.com/stretchr/testify/require"
)

func TestProgress(t *testing.T) {
p := NewTableProgressPrinter(4, 0)
p.RegisterTable("1", true, true)
p := NewTableProgressPrinter(6, 0)
p.RegisterTable("1", true, true, common.AllTableExistFlag)
p.StartTable("1", 50, true)
p.RegisterTable("2", true, false)
p.RegisterTable("2", true, false, common.AllTableExistFlag)
p.StartTable("2", 2, true)
p.Inc("2")
p.RegisterTable("3", false, false)
p.RegisterTable("3", false, false, common.AllTableExistFlag)
p.StartTable("3", 1, false)
p.Inc("2")
p.Inc("3")
Expand All @@ -39,6 +40,10 @@ func TestProgress(t *testing.T) {
p.FailTable("4")
p.Inc("3")
p.Inc("4")
p.RegisterTable("5", true, true, common.UpstreamTableLackFlag)
p.StartTable("5", 1, true)
p.RegisterTable("6", true, true, common.DownstreamTableLackFlag)
p.StartTable("6", 1, true)
time.Sleep(500 * time.Millisecond)
p.Close()
buffer := new(bytes.Buffer)
Expand All @@ -47,18 +52,21 @@ func TestProgress(t *testing.T) {
require.Equal(
t,
buffer.String(),
"\x1b[1A\x1b[J\nSummary:\n\nThe structure of `1` is not equal.\nThe structure of `2` is not equal.\nThe data of `4` is not equal.\n"+
"\nThe rest of the tables are all equal.\nThe patch file has been generated to './output_dir/patch.sql'\n"+
"\x1b[1A\x1b[J\nSummary:\n\nThe structure of `1` is not equal.\nThe structure of `2` is not equal.\nThe data of `4` is not equal.\nThe data of `5` does not exist in upstream database.\nThe data of `6` does not exist in downstream database.\n"+
"\nThe rest of the tables are all equal.\nA total of 6 tables have been compared, 1 tables finished, 3 tables failed, 2 tables skipped.\nThe patch file has been generated to './output_dir/patch.sql'\n"+
"You can view the comparison details through './output_dir/sync_diff_inspector.log'\n\n",
)
}

func TestTableError(t *testing.T) {
p := NewTableProgressPrinter(4, 0)
p.RegisterTable("1", true, true)
p.RegisterTable("1", true, true, common.AllTableExistFlag)
p.StartTable("1", 50, true)
p.RegisterTable("2", true, true)
p.RegisterTable("2", true, true, common.AllTableExistFlag)
p.StartTable("2", 1, true)
p.RegisterTable("3", true, true, common.DownstreamTableLackFlag)
p.StartTable("3", 1, true)

p.Inc("2")
buffer := new(bytes.Buffer)
p.SetOutput(buffer)
Expand All @@ -73,16 +81,19 @@ func TestTableError(t *testing.T) {
"\x1b[2A\x1b[JComparing the table structure of `2` ... failure\n"+
"_____________________________________________________________________________\n"+
"Progress [==============================>------------------------------] 50% 0/0\n"+
"\x1b[2A\x1b[JComparing the table data of `3` ...skipped\n"+
"_____________________________________________________________________________\n"+
"Progress [=============================================>---------------] 75% 0/1\n"+
"\x1b[1A\x1b[J\nError in comparison process:\n[aaa]\n\n"+
"You can view the comparison details through './output_dir/sync_diff_inspector.log'\n",
)
}

func TestAllSuccess(t *testing.T) {
Init(2, 0)
RegisterTable("1", false, false)
RegisterTable("1", false, false, common.AllTableExistFlag)
StartTable("1", 1, true)
RegisterTable("2", false, false)
RegisterTable("2", false, false, common.AllTableExistFlag)
StartTable("2", 1, true)
Inc("1")
Inc("2")
Expand Down
46 changes: 32 additions & 14 deletions sync_diff_inspector/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type TableResult struct {
ChunkMap map[string]*ChunkResult `json:"chunk-result"` // `ChunkMap` stores the `ChunkResult` of each chunk of the table
UpCount int64 `json:"up-count"` // `UpCount` is the number of rows in the table from upstream
DownCount int64 `json:"down-count"` // `DownCount` is the number of rows in the table from downstream

TableLack int `json:"table-lack"`
}

// ChunkResult save the necessarily information to provide summary information
Expand All @@ -80,6 +80,7 @@ type Report struct {
Result string `json:"-"` // Result is pass or fail
PassNum int32 `json:"-"` // The pass number of tables
FailedNum int32 `json:"-"` // The failed number of tables
SkippedNum int32 `json:"-"` // The skipped number of tables
TableResults map[string]map[string]*TableResult `json:"table-results"` // TableResult saved the map of `schema` => `table` => `tableResult`
StartTime time.Time `json:"start-time"`
Duration time.Duration `json:"time-duration"`
Expand Down Expand Up @@ -131,6 +132,11 @@ func (r *Report) getDiffRows() [][]string {
}
diffRow := make([]string, 0)
diffRow = append(diffRow, dbutil.TableName(schema, table))
if !common.AllTableExist(result.TableLack) {
diffRow = append(diffRow, "skipped")
} else {
diffRow = append(diffRow, "succeed")
}
if !result.StructEqual {
diffRow = append(diffRow, "false")
} else {
Expand All @@ -154,7 +160,6 @@ func (r *Report) CalculateTotalSize(ctx context.Context, db *sql.DB) {
for schema, tableMap := range r.TableResults {
for table := range tableMap {
size, err := utils.GetTableSize(ctx, db, schema, table)

if size == 0 || err != nil {
log.Warn("fail to get the correct size of table, if you want to get the correct size, please analyze the corresponding tables", zap.String("table", dbutil.TableName(schema, table)), zap.Error(err))
} else {
Expand All @@ -166,18 +171,21 @@ func (r *Report) CalculateTotalSize(ctx context.Context, db *sql.DB) {

// CommitSummary commit summary info
func (r *Report) CommitSummary() error {
passNum, failedNum := int32(0), int32(0)
passNum, failedNum, skippedNum := int32(0), int32(0), int32(0)
for _, tableMap := range r.TableResults {
for _, result := range tableMap {
if result.StructEqual && result.DataEqual {
passNum++
} else if !common.AllTableExist(result.TableLack) {
skippedNum++
} else {
failedNum++
}
}
}
r.PassNum = passNum
r.FailedNum = failedNum
r.SkippedNum = skippedNum
summaryPath := filepath.Join(r.task.OutputDir, "summary.txt")
summaryFile, err := os.Create(summaryPath)
if err != nil {
Expand Down Expand Up @@ -208,11 +216,11 @@ func (r *Report) CommitSummary() error {
summaryFile.WriteString(tableString.String())
summaryFile.WriteString("\n\n")
}
if r.Result == Fail {
if r.Result == Fail || r.SkippedNum != 0 {
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
summaryFile.WriteString("The following tables contains inconsistent data\n\n")
tableString := &strings.Builder{}
table := tablewriter.NewWriter(tableString)
table.SetHeader([]string{"Table", "Structure equality", "Data diff rows", "UpCount", "DownCount"})
table.SetHeader([]string{"Table", "Result", "Structure equality", "Data diff rows", "UpCount", "DownCount"})
diffRows := r.getDiffRows()
for _, v := range diffRows {
table.Append(v)
Expand All @@ -228,26 +236,35 @@ func (r *Report) CommitSummary() error {

func (r *Report) Print(w io.Writer) error {
var summary strings.Builder
if r.Result == Pass {
summary.WriteString(fmt.Sprintf("A total of %d table have been compared and all are equal.\n", r.FailedNum+r.PassNum))
if r.Result == Pass && r.SkippedNum == 0 {
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
summary.WriteString(fmt.Sprintf("A total of %d table have been compared and all are equal.\n", r.FailedNum+r.PassNum+r.SkippedNum))
summary.WriteString(fmt.Sprintf("You can view the comparision details through '%s/%s'\n", r.task.OutputDir, config.LogFileName))
} else if r.Result == Fail {
} else if r.Result == Fail || r.SkippedNum != 0 {
for schema, tableMap := range r.TableResults {
for table, result := range tableMap {
if !result.StructEqual {
if result.DataSkip {
summary.WriteString(fmt.Sprintf("The structure of %s is not equal, and data-check is skipped\n", dbutil.TableName(schema, table)))
switch result.TableLack {
case common.UpstreamTableLackFlag:
summary.WriteString(fmt.Sprintf("The data of %s does not exist in upstream database\n", dbutil.TableName(schema, table)))
case common.DownstreamTableLackFlag:
summary.WriteString(fmt.Sprintf("The data of %s does not exist in downstream database\n", dbutil.TableName(schema, table)))
default:
summary.WriteString(fmt.Sprintf("The structure of %s is not equal, and data-check is skipped\n", dbutil.TableName(schema, table)))
}
} else {
summary.WriteString(fmt.Sprintf("The structure of %s is not equal\n", dbutil.TableName(schema, table)))
}
}
if !result.DataEqual {
if !result.DataEqual && common.AllTableExist(result.TableLack) {
summary.WriteString(fmt.Sprintf("The data of %s is not equal\n", dbutil.TableName(schema, table)))
}
}
}
summary.WriteString("\n")
summary.WriteString("The rest of tables are all equal.\n")
summary.WriteString("\n")
summary.WriteString(fmt.Sprintf("A total of %d tables have been compared, %d tables finished, %d tables failed, %d tables skipped.\n", r.FailedNum+r.PassNum+r.SkippedNum, r.PassNum, r.FailedNum, r.SkippedNum))
summary.WriteString(fmt.Sprintf("The patch file has been generated in \n\t'%s/'\n", r.task.FixDir))
summary.WriteString(fmt.Sprintf("You can view the comparision details through '%s/%s'\n", r.task.OutputDir, config.LogFileName))
} else {
Expand Down Expand Up @@ -295,13 +312,14 @@ func (r *Report) Init(tableDiffs []*common.TableDiff, sourceConfig [][]byte, tar
}

// SetTableStructCheckResult sets the struct check result for table.
func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, skip bool) {
func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, skip bool, exist int) {
r.Lock()
defer r.Unlock()
tableResult := r.TableResults[schema][table]
tableResult.StructEqual = equal
tableResult.DataSkip = skip
if !equal && r.Result != Error {
tableResult.TableLack = exist
if !equal && common.AllTableExist(tableResult.TableLack) && r.Result != Error {
r.Result = Fail
}
}
Expand All @@ -323,11 +341,11 @@ func (r *Report) SetTableDataCheckResult(schema, table string, equal bool, rowsA
}
result.ChunkMap[id.ToString()].RowsAdd += rowsAdd
result.ChunkMap[id.ToString()].RowsDelete += rowsDelete
if r.Result != Error {
if r.Result != Error && common.AllTableExist(result.TableLack) {
r.Result = Fail
}
}
if !equal && r.Result != Error {
if !equal && common.AllTableExist(result.TableLack) && r.Result != Error {
r.Result = Fail
}
}
Expand Down
Loading