From 593c33dec31aaa954028b3d2f961e0c4bb27bc13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Fri, 10 Apr 2020 18:11:32 +0800 Subject: [PATCH] backup,restore: fix --checksum flag. (#223) * backup,restore: work on progress to fix a bug that causes --checksum flag won't work properly. Some code of backup and restore ignored the flag (a.k.a. config.Checksum), so when checksum is disabled, we will face failure. * backup: backup will report total bytes and kvs when checksums check disabled. Some code of backup and restore ignored the flag (a.k.a. config.Checksum), so when checksum is disabled, we will face failure. * backup: backup will report total bytes and kvs when checksums check disabled. Some code of backup and restore ignored the flag (a.k.a. config.Checksum), so when checksum is disabled, we will face failure. * backup: add log to ChecksumMatches and new version of FastChecksum. Some of log has been lose. They are in ChecksumMatches now. * restore: restore could find non-checksum tables and skip them automatically. for backup, ChecksumMatches returns error now. * misc: add document for Table::NoChecksum. * backup: omit checksum progress bar when user specify `--checksum=false`. * backup: `CopyMetaFrom` overrides original `client.Schemes` instead of append at its end. * backup: refactor about checksum logic, fix a bug. the bug would cause: when multi tables are backup, the metadata contains only one table. * backup: do some lints. * backup,restore: do some refactor so that cyclomatic complexity won't be too large. * misc: don't use underscore on receiver. * backup: print "quick checksum success" message per table. ...to make br_full_index happy! * backup: refactor a MinInt pattern. * backup: Apply suggestions from code review Co-Authored-By: kennytm Co-authored-by: 3pointer Co-authored-by: kennytm --- pkg/backup/client.go | 103 +++++++++++++++++++++++++------- pkg/backup/schema.go | 12 ---- pkg/restore/client.go | 8 +++ pkg/task/backup.go | 73 +++++++++++++--------- pkg/task/restore.go | 25 ++++---- pkg/utils/math.go | 12 ++++ pkg/utils/math_test.go | 17 ++++++ pkg/utils/schema.go | 5 ++ tests/br_skip_checksum/run.sh | 88 +++++++++++++++++++++++++++ tests/br_skip_checksum/workload | 12 ++++ 10 files changed, 281 insertions(+), 74 deletions(-) create mode 100644 pkg/utils/math.go create mode 100644 pkg/utils/math_test.go create mode 100755 tests/br_skip_checksum/run.sh create mode 100644 tests/br_skip_checksum/workload diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 72563096b..1b016ba8b 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -48,6 +48,13 @@ type ClientMgr interface { Close() } +// Checksum is the checksum of some backup files calculated by CollectChecksums. +type Checksum struct { + Crc64Xor uint64 + TotalKvs uint64 + TotalBytes uint64 +} + // Maximum total sleep time(in ms) for kv/cop commands. const ( backupFineGrainedMaxBackoff = 80000 @@ -748,8 +755,59 @@ func SendBackup( return nil } -// FastChecksum check data integrity by xor all(sst_checksum) per table -func (bc *Client) FastChecksum() (bool, error) { +// ChecksumMatches tests whether the "local" checksum matches the checksum from TiKV. +func (bc *Client) ChecksumMatches(local []Checksum) (bool, error) { + if len(local) != len(bc.backupMeta.Schemas) { + return false, nil + } + + for i, schema := range bc.backupMeta.Schemas { + localChecksum := local[i] + dbInfo := &model.DBInfo{} + err := json.Unmarshal(schema.Db, dbInfo) + if err != nil { + log.Error("failed in fast checksum, and cannot parse db info.") + return false, err + } + tblInfo := &model.TableInfo{} + err = json.Unmarshal(schema.Table, tblInfo) + if err != nil { + log.Error("failed in fast checksum, and cannot parse table info.") + return false, err + } + if localChecksum.Crc64Xor != schema.Crc64Xor || + localChecksum.TotalBytes != schema.TotalBytes || + localChecksum.TotalKvs != schema.TotalKvs { + log.Error("failed in fast checksum", + zap.Stringer("db", dbInfo.Name), + zap.Stringer("table", tblInfo.Name), + zap.Uint64("origin tidb crc64", schema.Crc64Xor), + zap.Uint64("calculated crc64", localChecksum.Crc64Xor), + zap.Uint64("origin tidb total kvs", schema.TotalKvs), + zap.Uint64("calculated total kvs", localChecksum.TotalKvs), + zap.Uint64("origin tidb total bytes", schema.TotalBytes), + zap.Uint64("calculated total bytes", localChecksum.TotalBytes), + ) + return false, nil + } + log.Info("fast checksum success", + zap.String("database", dbInfo.Name.L), + zap.String("table", tblInfo.Name.L)) + } + return true, nil +} + +// CollectFileInfo collects ungrouped file summary information, like kv count and size. +func (bc *Client) CollectFileInfo() { + for _, file := range bc.backupMeta.Files { + summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs) + summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes) + } +} + +// CollectChecksums check data integrity by xor all(sst_checksum) per table +// it returns the checksum of all local files. +func (bc *Client) CollectChecksums() ([]Checksum, error) { start := time.Now() defer func() { elapsed := time.Since(start) @@ -758,19 +816,20 @@ func (bc *Client) FastChecksum() (bool, error) { dbs, err := utils.LoadBackupTables(&bc.backupMeta) if err != nil { - return false, err + return nil, err } + checksums := make([]Checksum, 0, len(bc.backupMeta.Schemas)) for _, schema := range bc.backupMeta.Schemas { dbInfo := &model.DBInfo{} err = json.Unmarshal(schema.Db, dbInfo) if err != nil { - return false, err + return nil, err } tblInfo := &model.TableInfo{} err = json.Unmarshal(schema.Table, tblInfo) if err != nil { - return false, err + return nil, err } tbl := dbs[dbInfo.Name.String()].GetTable(tblInfo.Name.String()) @@ -785,25 +844,16 @@ func (bc *Client) FastChecksum() (bool, error) { summary.CollectSuccessUnit(summary.TotalKV, 1, totalKvs) summary.CollectSuccessUnit(summary.TotalBytes, 1, totalBytes) - - if schema.Crc64Xor == checksum && schema.TotalKvs == totalKvs && schema.TotalBytes == totalBytes { - log.Info("fast checksum success", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name)) - } else { - log.Error("failed in fast checksum", - zap.String("database", dbInfo.Name.String()), - zap.String("table", tblInfo.Name.String()), - zap.Uint64("origin tidb crc64", schema.Crc64Xor), - zap.Uint64("calculated crc64", checksum), - zap.Uint64("origin tidb total kvs", schema.TotalKvs), - zap.Uint64("calculated total kvs", totalKvs), - zap.Uint64("origin tidb total bytes", schema.TotalBytes), - zap.Uint64("calculated total bytes", totalBytes), - ) - return false, nil + log.Info("fast checksum calculated", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name)) + localChecksum := Checksum{ + Crc64Xor: checksum, + TotalKvs: totalKvs, + TotalBytes: totalBytes, } + checksums = append(checksums, localChecksum) } - return true, nil + return checksums, nil } // CompleteMeta wait response of admin checksum from TiDB to complete backup meta @@ -815,3 +865,14 @@ func (bc *Client) CompleteMeta(backupSchemas *Schemas) error { bc.backupMeta.Schemas = schemas return nil } + +// CopyMetaFrom copies schema metadata directly from pending backupSchemas, without calculating checksum. +// use this when user skip the checksum generating. +func (bc *Client) CopyMetaFrom(backupSchemas *Schemas) { + schemas := make([]*kvproto.Schema, 0, len(backupSchemas.schemas)) + for _, v := range backupSchemas.schemas { + schema := v + schemas = append(schemas, &schema) + } + bc.backupMeta.Schemas = schemas +} diff --git a/pkg/backup/schema.go b/pkg/backup/schema.go index 73a62477d..d1b5943a8 100644 --- a/pkg/backup/schema.go +++ b/pkg/backup/schema.go @@ -36,7 +36,6 @@ type Schemas struct { backupSchemaCh chan backup.Schema errCh chan error wg *sync.WaitGroup - skipChecksum bool } func newBackupSchemas() *Schemas { @@ -57,11 +56,6 @@ func (pending *Schemas) pushPending( pending.schemas[name] = schema } -// SetSkipChecksum sets whether it should skip checksum -func (pending *Schemas) SetSkipChecksum(skip bool) { - pending.skipChecksum = skip -} - // Start backups schemas func (pending *Schemas) Start( ctx context.Context, @@ -81,12 +75,6 @@ func (pending *Schemas) Start( workerPool.Apply(func() { defer pending.wg.Done() - if pending.skipChecksum { - pending.backupSchemaCh <- schema - updateCh.Inc() - return - } - start := time.Now() table := model.TableInfo{} err := json.Unmarshal(schema.Table, &table) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index fde382fb0..2b685e599 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -650,6 +650,14 @@ func (rc *Client) ValidateChecksum( workers.Apply(func() { defer wg.Done() + if table.NoChecksum() { + log.Info("table doesn't have checksum, skipping checksum", + zap.Stringer("db", table.Db.Name), + zap.Stringer("table", table.Info.Name)) + updateCh.Inc() + return + } + startTS, err := rc.GetTS(ctx) if err != nil { errCh <- errors.Trace(err) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 040be0444..43a426b0f 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -7,6 +7,8 @@ import ( "strconv" "time" + "github.com/pingcap/br/pkg/utils" + "github.com/pingcap/errors" kvproto "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" @@ -180,39 +182,32 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig // Backup has finished updateCh.Close() - // Checksum - backupSchemasConcurrency := backup.DefaultSchemaConcurrency - if backupSchemas.Len() < backupSchemasConcurrency { - backupSchemasConcurrency = backupSchemas.Len() - } - updateCh = g.StartProgress( - ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress) - backupSchemas.SetSkipChecksum(!cfg.Checksum) - backupSchemas.Start( - ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh) - - err = client.CompleteMeta(backupSchemas) - if err != nil { - return err - } - - if cfg.LastBackupTS == 0 { - var valid bool - valid, err = client.FastChecksum() + // Checksum from server, and then fulfill the backup metadata. + if cfg.Checksum { + backupSchemasConcurrency := utils.MinInt(backup.DefaultSchemaConcurrency, backupSchemas.Len()) + updateCh = g.StartProgress( + ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress) + backupSchemas.Start( + ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh) + err = client.CompleteMeta(backupSchemas) if err != nil { return err } - if !valid { - log.Error("backup FastChecksum mismatch!") - return errors.Errorf("mismatched checksum") + // Checksum has finished + updateCh.Close() + // collect file information. + err = checkChecksums(client, cfg) + if err != nil { + return err } - } else { - // Since we don't support checksum for incremental data, fast checksum should be skipped. - log.Info("Skip fast checksum in incremental backup") + // When user specified not to calculate checksum, don't calculate checksum. + // Just... copy schemas from origin. + log.Info("Skip fast checksum because user requirement.") + client.CopyMetaFrom(backupSchemas) + // Anyway, let's collect file info for summary. + client.CollectFileInfo() } - // Checksum has finished - updateCh.Close() err = client.SaveBackupMeta(ctx, ddlJobs) if err != nil { @@ -224,6 +219,30 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return nil } +// checkChecksums checks the checksum of the client, once failed, +// returning a error with message: "mismatched checksum". +func checkChecksums(client *backup.Client, cfg *BackupConfig) error { + checksums, err := client.CollectChecksums() + if err != nil { + return err + } + if cfg.LastBackupTS == 0 { + var matches bool + matches, err = client.ChecksumMatches(checksums) + if err != nil { + return err + } + if !matches { + log.Error("backup FastChecksum mismatch!") + return errors.New("mismatched checksum") + } + return nil + } + // Since we don't support checksum for incremental data, fast checksum should be skipped. + log.Info("Skip fast checksum in incremental backup") + return nil +} + // parseTSString port from tidb setSnapshotTS func parseTSString(ts string) (uint64, error) { if len(ts) == 0 { diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 16061bd94..4cd470367 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -224,10 +224,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } // Restore sst files in batch. - batchSize := int(cfg.Concurrency) - if batchSize > maxRestoreBatchSizeLimit { - batchSize = maxRestoreBatchSizeLimit // 256 - } + batchSize := utils.MinInt(int(cfg.Concurrency), maxRestoreBatchSizeLimit) tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly) if err != nil { @@ -242,9 +239,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if len(ranges) == 0 { break } - if batchSize > len(ranges) { - batchSize = len(ranges) - } + batchSize = utils.MinInt(batchSize, len(ranges)) var rangeBatch []rtree.Range ranges, rangeBatch = ranges[batchSize:], ranges[0:batchSize:batchSize] @@ -287,14 +282,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf updateCh.Close() // Checksum - updateCh = g.StartProgress( - ctx, "Checksum", int64(len(newTables)), !cfg.LogProgress) - err = client.ValidateChecksum( - ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh) - if err != nil { - return err + if cfg.Checksum { + updateCh = g.StartProgress( + ctx, "Checksum", int64(len(newTables)), !cfg.LogProgress) + err = client.ValidateChecksum( + ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh) + if err != nil { + return err + } + updateCh.Close() } - updateCh.Close() // Set task summary to success status. summary.SetSuccessStatus(true) diff --git a/pkg/utils/math.go b/pkg/utils/math.go new file mode 100644 index 000000000..00c8dcc4b --- /dev/null +++ b/pkg/utils/math.go @@ -0,0 +1,12 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package utils + +// MinInt choice smaller integer from its two arguments. +func MinInt(x, y int) int { + if x < y { + return x + } + + return y +} diff --git a/pkg/utils/math_test.go b/pkg/utils/math_test.go new file mode 100644 index 000000000..90c8e6bed --- /dev/null +++ b/pkg/utils/math_test.go @@ -0,0 +1,17 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package utils + +import ( + . "github.com/pingcap/check" +) + +type testMathSuite struct{} + +var _ = Suite(&testMathSuite{}) + +func (*testMathSuite) TestMinInt(c *C) { + c.Assert(MinInt(1, 2), Equals, 1) + c.Assert(MinInt(2, 1), Equals, 1) + c.Assert(MinInt(1, 1), Equals, 1) +} diff --git a/pkg/utils/schema.go b/pkg/utils/schema.go index 5ac439e36..a135dac1c 100644 --- a/pkg/utils/schema.go +++ b/pkg/utils/schema.go @@ -33,6 +33,11 @@ type Table struct { TiFlashReplicas int } +// NoChecksum checks whether the table has a calculated checksum. +func (tbl *Table) NoChecksum() bool { + return tbl.Crc64Xor == 0 && tbl.TotalKvs == 0 && tbl.TotalBytes == 0 +} + // Database wraps the schema and tables of a database. type Database struct { Info *model.DBInfo diff --git a/tests/br_skip_checksum/run.sh b/tests/br_skip_checksum/run.sh new file mode 100755 index 000000000..f4b93adea --- /dev/null +++ b/tests/br_skip_checksum/run.sh @@ -0,0 +1,88 @@ +#!/bin/sh +# +# Copyright 2020 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" +TABLE="usertable" +DB_COUNT=3 + +for i in $(seq $DB_COUNT); do + run_sql "CREATE DATABASE $DB${i};" + go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB${i} +done + +for i in $(seq $DB_COUNT); do + row_count_ori[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') +done + +# backup full, skipping generate checksum. +echo "backup start..." +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --checksum=false + +for i in $(seq $DB_COUNT); do + run_sql "DROP DATABASE $DB${i};" +done + +# restore full, skipping genreate checksum. +echo "restore start..." +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --ratelimit 1024 --checksum=false + +for i in $(seq $DB_COUNT); do + row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') +done + +fail=false +for i in $(seq $DB_COUNT); do + if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then + fail=true + echo "TEST: [$TEST_NAME] fail on database $DB${i}" + fi + echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}" +done + +if $fail; then + echo "TEST: [$TEST_NAME] failed on restore with skipping checksum!" + exit 1 +fi + +# Let drop it again. Try to restore without disable checksum. +for i in $(seq $DB_COUNT); do + run_sql "DROP DATABASE $DB${i};" +done +echo "restore(with checksum) start..." +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --ratelimit 1024 + +for i in $(seq $DB_COUNT); do + row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') +done + +for i in $(seq $DB_COUNT); do + if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then + fail=true + echo "TEST: [$TEST_NAME] fail on database $DB${i}" + fi + echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}" +done + +if $fail; then + echo "TEST: [$TEST_NAME] failed on restore without skipping checksum!" + exit 1 +else + echo "TEST $TEST_NAME passed." +fi + +for i in $(seq $DB_COUNT); do + run_sql "DROP DATABASE $DB${i};" +done diff --git a/tests/br_skip_checksum/workload b/tests/br_skip_checksum/workload new file mode 100644 index 000000000..84335df96 --- /dev/null +++ b/tests/br_skip_checksum/workload @@ -0,0 +1,12 @@ +recordcount=100 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform \ No newline at end of file