diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 72563096b..1b016ba8b 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -48,6 +48,13 @@ type ClientMgr interface { Close() } +// Checksum is the checksum of some backup files calculated by CollectChecksums. +type Checksum struct { + Crc64Xor uint64 + TotalKvs uint64 + TotalBytes uint64 +} + // Maximum total sleep time(in ms) for kv/cop commands. const ( backupFineGrainedMaxBackoff = 80000 @@ -748,8 +755,59 @@ func SendBackup( return nil } -// FastChecksum check data integrity by xor all(sst_checksum) per table -func (bc *Client) FastChecksum() (bool, error) { +// ChecksumMatches tests whether the "local" checksum matches the checksum from TiKV. +func (bc *Client) ChecksumMatches(local []Checksum) (bool, error) { + if len(local) != len(bc.backupMeta.Schemas) { + return false, nil + } + + for i, schema := range bc.backupMeta.Schemas { + localChecksum := local[i] + dbInfo := &model.DBInfo{} + err := json.Unmarshal(schema.Db, dbInfo) + if err != nil { + log.Error("failed in fast checksum, and cannot parse db info.") + return false, err + } + tblInfo := &model.TableInfo{} + err = json.Unmarshal(schema.Table, tblInfo) + if err != nil { + log.Error("failed in fast checksum, and cannot parse table info.") + return false, err + } + if localChecksum.Crc64Xor != schema.Crc64Xor || + localChecksum.TotalBytes != schema.TotalBytes || + localChecksum.TotalKvs != schema.TotalKvs { + log.Error("failed in fast checksum", + zap.Stringer("db", dbInfo.Name), + zap.Stringer("table", tblInfo.Name), + zap.Uint64("origin tidb crc64", schema.Crc64Xor), + zap.Uint64("calculated crc64", localChecksum.Crc64Xor), + zap.Uint64("origin tidb total kvs", schema.TotalKvs), + zap.Uint64("calculated total kvs", localChecksum.TotalKvs), + zap.Uint64("origin tidb total bytes", schema.TotalBytes), + zap.Uint64("calculated total bytes", localChecksum.TotalBytes), + ) + return false, nil + } + log.Info("fast checksum success", + zap.String("database", dbInfo.Name.L), + zap.String("table", tblInfo.Name.L)) + } + return true, nil +} + +// CollectFileInfo collects ungrouped file summary information, like kv count and size. +func (bc *Client) CollectFileInfo() { + for _, file := range bc.backupMeta.Files { + summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs) + summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes) + } +} + +// CollectChecksums check data integrity by xor all(sst_checksum) per table +// it returns the checksum of all local files. +func (bc *Client) CollectChecksums() ([]Checksum, error) { start := time.Now() defer func() { elapsed := time.Since(start) @@ -758,19 +816,20 @@ func (bc *Client) FastChecksum() (bool, error) { dbs, err := utils.LoadBackupTables(&bc.backupMeta) if err != nil { - return false, err + return nil, err } + checksums := make([]Checksum, 0, len(bc.backupMeta.Schemas)) for _, schema := range bc.backupMeta.Schemas { dbInfo := &model.DBInfo{} err = json.Unmarshal(schema.Db, dbInfo) if err != nil { - return false, err + return nil, err } tblInfo := &model.TableInfo{} err = json.Unmarshal(schema.Table, tblInfo) if err != nil { - return false, err + return nil, err } tbl := dbs[dbInfo.Name.String()].GetTable(tblInfo.Name.String()) @@ -785,25 +844,16 @@ func (bc *Client) FastChecksum() (bool, error) { summary.CollectSuccessUnit(summary.TotalKV, 1, totalKvs) summary.CollectSuccessUnit(summary.TotalBytes, 1, totalBytes) - - if schema.Crc64Xor == checksum && schema.TotalKvs == totalKvs && schema.TotalBytes == totalBytes { - log.Info("fast checksum success", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name)) - } else { - log.Error("failed in fast checksum", - zap.String("database", dbInfo.Name.String()), - zap.String("table", tblInfo.Name.String()), - zap.Uint64("origin tidb crc64", schema.Crc64Xor), - zap.Uint64("calculated crc64", checksum), - zap.Uint64("origin tidb total kvs", schema.TotalKvs), - zap.Uint64("calculated total kvs", totalKvs), - zap.Uint64("origin tidb total bytes", schema.TotalBytes), - zap.Uint64("calculated total bytes", totalBytes), - ) - return false, nil + log.Info("fast checksum calculated", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name)) + localChecksum := Checksum{ + Crc64Xor: checksum, + TotalKvs: totalKvs, + TotalBytes: totalBytes, } + checksums = append(checksums, localChecksum) } - return true, nil + return checksums, nil } // CompleteMeta wait response of admin checksum from TiDB to complete backup meta @@ -815,3 +865,14 @@ func (bc *Client) CompleteMeta(backupSchemas *Schemas) error { bc.backupMeta.Schemas = schemas return nil } + +// CopyMetaFrom copies schema metadata directly from pending backupSchemas, without calculating checksum. +// use this when user skip the checksum generating. +func (bc *Client) CopyMetaFrom(backupSchemas *Schemas) { + schemas := make([]*kvproto.Schema, 0, len(backupSchemas.schemas)) + for _, v := range backupSchemas.schemas { + schema := v + schemas = append(schemas, &schema) + } + bc.backupMeta.Schemas = schemas +} diff --git a/pkg/backup/schema.go b/pkg/backup/schema.go index 73a62477d..d1b5943a8 100644 --- a/pkg/backup/schema.go +++ b/pkg/backup/schema.go @@ -36,7 +36,6 @@ type Schemas struct { backupSchemaCh chan backup.Schema errCh chan error wg *sync.WaitGroup - skipChecksum bool } func newBackupSchemas() *Schemas { @@ -57,11 +56,6 @@ func (pending *Schemas) pushPending( pending.schemas[name] = schema } -// SetSkipChecksum sets whether it should skip checksum -func (pending *Schemas) SetSkipChecksum(skip bool) { - pending.skipChecksum = skip -} - // Start backups schemas func (pending *Schemas) Start( ctx context.Context, @@ -81,12 +75,6 @@ func (pending *Schemas) Start( workerPool.Apply(func() { defer pending.wg.Done() - if pending.skipChecksum { - pending.backupSchemaCh <- schema - updateCh.Inc() - return - } - start := time.Now() table := model.TableInfo{} err := json.Unmarshal(schema.Table, &table) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index fde382fb0..2b685e599 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -650,6 +650,14 @@ func (rc *Client) ValidateChecksum( workers.Apply(func() { defer wg.Done() + if table.NoChecksum() { + log.Info("table doesn't have checksum, skipping checksum", + zap.Stringer("db", table.Db.Name), + zap.Stringer("table", table.Info.Name)) + updateCh.Inc() + return + } + startTS, err := rc.GetTS(ctx) if err != nil { errCh <- errors.Trace(err) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 040be0444..43a426b0f 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -7,6 +7,8 @@ import ( "strconv" "time" + "github.com/pingcap/br/pkg/utils" + "github.com/pingcap/errors" kvproto "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" @@ -180,39 +182,32 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig // Backup has finished updateCh.Close() - // Checksum - backupSchemasConcurrency := backup.DefaultSchemaConcurrency - if backupSchemas.Len() < backupSchemasConcurrency { - backupSchemasConcurrency = backupSchemas.Len() - } - updateCh = g.StartProgress( - ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress) - backupSchemas.SetSkipChecksum(!cfg.Checksum) - backupSchemas.Start( - ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh) - - err = client.CompleteMeta(backupSchemas) - if err != nil { - return err - } - - if cfg.LastBackupTS == 0 { - var valid bool - valid, err = client.FastChecksum() + // Checksum from server, and then fulfill the backup metadata. + if cfg.Checksum { + backupSchemasConcurrency := utils.MinInt(backup.DefaultSchemaConcurrency, backupSchemas.Len()) + updateCh = g.StartProgress( + ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress) + backupSchemas.Start( + ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh) + err = client.CompleteMeta(backupSchemas) if err != nil { return err } - if !valid { - log.Error("backup FastChecksum mismatch!") - return errors.Errorf("mismatched checksum") + // Checksum has finished + updateCh.Close() + // collect file information. + err = checkChecksums(client, cfg) + if err != nil { + return err } - } else { - // Since we don't support checksum for incremental data, fast checksum should be skipped. - log.Info("Skip fast checksum in incremental backup") + // When user specified not to calculate checksum, don't calculate checksum. + // Just... copy schemas from origin. + log.Info("Skip fast checksum because user requirement.") + client.CopyMetaFrom(backupSchemas) + // Anyway, let's collect file info for summary. + client.CollectFileInfo() } - // Checksum has finished - updateCh.Close() err = client.SaveBackupMeta(ctx, ddlJobs) if err != nil { @@ -224,6 +219,30 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return nil } +// checkChecksums checks the checksum of the client, once failed, +// returning a error with message: "mismatched checksum". +func checkChecksums(client *backup.Client, cfg *BackupConfig) error { + checksums, err := client.CollectChecksums() + if err != nil { + return err + } + if cfg.LastBackupTS == 0 { + var matches bool + matches, err = client.ChecksumMatches(checksums) + if err != nil { + return err + } + if !matches { + log.Error("backup FastChecksum mismatch!") + return errors.New("mismatched checksum") + } + return nil + } + // Since we don't support checksum for incremental data, fast checksum should be skipped. + log.Info("Skip fast checksum in incremental backup") + return nil +} + // parseTSString port from tidb setSnapshotTS func parseTSString(ts string) (uint64, error) { if len(ts) == 0 { diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 16061bd94..4cd470367 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -224,10 +224,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } // Restore sst files in batch. - batchSize := int(cfg.Concurrency) - if batchSize > maxRestoreBatchSizeLimit { - batchSize = maxRestoreBatchSizeLimit // 256 - } + batchSize := utils.MinInt(int(cfg.Concurrency), maxRestoreBatchSizeLimit) tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly) if err != nil { @@ -242,9 +239,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if len(ranges) == 0 { break } - if batchSize > len(ranges) { - batchSize = len(ranges) - } + batchSize = utils.MinInt(batchSize, len(ranges)) var rangeBatch []rtree.Range ranges, rangeBatch = ranges[batchSize:], ranges[0:batchSize:batchSize] @@ -287,14 +282,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf updateCh.Close() // Checksum - updateCh = g.StartProgress( - ctx, "Checksum", int64(len(newTables)), !cfg.LogProgress) - err = client.ValidateChecksum( - ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh) - if err != nil { - return err + if cfg.Checksum { + updateCh = g.StartProgress( + ctx, "Checksum", int64(len(newTables)), !cfg.LogProgress) + err = client.ValidateChecksum( + ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh) + if err != nil { + return err + } + updateCh.Close() } - updateCh.Close() // Set task summary to success status. summary.SetSuccessStatus(true) diff --git a/pkg/utils/math.go b/pkg/utils/math.go new file mode 100644 index 000000000..00c8dcc4b --- /dev/null +++ b/pkg/utils/math.go @@ -0,0 +1,12 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package utils + +// MinInt choice smaller integer from its two arguments. +func MinInt(x, y int) int { + if x < y { + return x + } + + return y +} diff --git a/pkg/utils/math_test.go b/pkg/utils/math_test.go new file mode 100644 index 000000000..90c8e6bed --- /dev/null +++ b/pkg/utils/math_test.go @@ -0,0 +1,17 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package utils + +import ( + . "github.com/pingcap/check" +) + +type testMathSuite struct{} + +var _ = Suite(&testMathSuite{}) + +func (*testMathSuite) TestMinInt(c *C) { + c.Assert(MinInt(1, 2), Equals, 1) + c.Assert(MinInt(2, 1), Equals, 1) + c.Assert(MinInt(1, 1), Equals, 1) +} diff --git a/pkg/utils/schema.go b/pkg/utils/schema.go index 5ac439e36..a135dac1c 100644 --- a/pkg/utils/schema.go +++ b/pkg/utils/schema.go @@ -33,6 +33,11 @@ type Table struct { TiFlashReplicas int } +// NoChecksum checks whether the table has a calculated checksum. +func (tbl *Table) NoChecksum() bool { + return tbl.Crc64Xor == 0 && tbl.TotalKvs == 0 && tbl.TotalBytes == 0 +} + // Database wraps the schema and tables of a database. type Database struct { Info *model.DBInfo diff --git a/tests/br_skip_checksum/run.sh b/tests/br_skip_checksum/run.sh new file mode 100755 index 000000000..f4b93adea --- /dev/null +++ b/tests/br_skip_checksum/run.sh @@ -0,0 +1,88 @@ +#!/bin/sh +# +# Copyright 2020 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" +TABLE="usertable" +DB_COUNT=3 + +for i in $(seq $DB_COUNT); do + run_sql "CREATE DATABASE $DB${i};" + go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB${i} +done + +for i in $(seq $DB_COUNT); do + row_count_ori[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') +done + +# backup full, skipping generate checksum. +echo "backup start..." +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --checksum=false + +for i in $(seq $DB_COUNT); do + run_sql "DROP DATABASE $DB${i};" +done + +# restore full, skipping genreate checksum. +echo "restore start..." +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --ratelimit 1024 --checksum=false + +for i in $(seq $DB_COUNT); do + row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') +done + +fail=false +for i in $(seq $DB_COUNT); do + if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then + fail=true + echo "TEST: [$TEST_NAME] fail on database $DB${i}" + fi + echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}" +done + +if $fail; then + echo "TEST: [$TEST_NAME] failed on restore with skipping checksum!" + exit 1 +fi + +# Let drop it again. Try to restore without disable checksum. +for i in $(seq $DB_COUNT); do + run_sql "DROP DATABASE $DB${i};" +done +echo "restore(with checksum) start..." +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --ratelimit 1024 + +for i in $(seq $DB_COUNT); do + row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') +done + +for i in $(seq $DB_COUNT); do + if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then + fail=true + echo "TEST: [$TEST_NAME] fail on database $DB${i}" + fi + echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}" +done + +if $fail; then + echo "TEST: [$TEST_NAME] failed on restore without skipping checksum!" + exit 1 +else + echo "TEST $TEST_NAME passed." +fi + +for i in $(seq $DB_COUNT); do + run_sql "DROP DATABASE $DB${i};" +done diff --git a/tests/br_skip_checksum/workload b/tests/br_skip_checksum/workload new file mode 100644 index 000000000..84335df96 --- /dev/null +++ b/tests/br_skip_checksum/workload @@ -0,0 +1,12 @@ +recordcount=100 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform \ No newline at end of file