Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Incremental BR: support DDL #155

Merged
merged 14 commits into from
Feb 20, 2020
Merged
7 changes: 6 additions & 1 deletion cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
return err
}

ddlJobs, err := backup.GetBackupDDLJobs(mgr.GetDomain(), lastBackupTS, backupTS)
if err != nil {
return err
}

// The number of regions need to backup
approximateRegions := 0
for _, r := range ranges {
Expand Down Expand Up @@ -166,7 +171,7 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
// Checksum has finished
close(updateCh)

err = client.SaveBackupMeta(ctx)
err = client.SaveBackupMeta(ctx, ddlJobs)
if err != nil {
return err
}
Expand Down
56 changes: 52 additions & 4 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package cmd

import (
"context"
"sort"
"strings"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/session"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
Expand Down Expand Up @@ -92,14 +94,15 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error

files := make([]*backup.File, 0)
tables := make([]*utils.Table, 0)
ddlJobs := make([]*model.Job, 0)

defer summary.Summary(cmdName)

switch {
case len(dbName) == 0 && len(tableName) == 0:
// full restore
for _, db := range client.GetDatabases() {
err = client.CreateDatabase(db.Schema)
err = client.CreateDatabase(db.Info)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -108,33 +111,73 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error
}
tables = append(tables, db.Tables...)
}
ddlJobs = client.GetDDLJobs()
case len(dbName) != 0 && len(tableName) == 0:
// database restore
db := client.GetDatabase(dbName)
if db == nil {
return errors.Errorf("database %s not found in backup", dbName)
}
err = client.CreateDatabase(db.Schema)
err = client.CreateDatabase(db.Info)
if err != nil {
return errors.Trace(err)
}
for _, table := range db.Tables {
files = append(files, table.Files...)
}
tables = db.Tables
allDDLJobs := client.GetDDLJobs()
// Sort the ddl jobs by schema version in descending order.
sort.Slice(allDDLJobs, func(i, j int) bool {
return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion
})
// The map is for resolving some corner case.
// Let "t=2" indicates that the id of database "t" is 2, if there is a ddl execution sequence is:
// rename "a" to "b"(a=1) -> drop "b"(b=1) -> create "b"(b=2) -> rename "b" to "a"(a=2)
// Which we cannot find the "create" DDL by name and id.
// To cover †his case, we must find all ids the database ever had.
dbIDs := make(map[int64]bool)
for _, job := range allDDLJobs {
if job.SchemaID == db.Info.ID ||
(job.BinlogInfo.DBInfo != nil && job.BinlogInfo.DBInfo.ID == db.Info.ID) ||
dbIDs[job.SchemaID] {
dbIDs[job.SchemaID] = true
if job.BinlogInfo.DBInfo != nil {
dbIDs[job.BinlogInfo.DBInfo.ID] = true
}
ddlJobs = append(ddlJobs, job)
}
}
case len(dbName) != 0 && len(tableName) != 0:
// table restore
db := client.GetDatabase(dbName)
if db == nil {
return errors.Errorf("database %s not found in backup", dbName)
}
err = client.CreateDatabase(db.Schema)
err = client.CreateDatabase(db.Info)
if err != nil {
return errors.Trace(err)
}
table := db.GetTable(tableName)
files = table.Files
tables = append(tables, table)
allDDLJobs := client.GetDDLJobs()
// Sort the ddl jobs by schema version in descending order.
sort.Slice(allDDLJobs, func(i, j int) bool {
return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion
})
tableIDs := make(map[int64]bool)
for _, job := range allDDLJobs {
if job.SchemaID == table.Info.ID ||
(job.BinlogInfo.TableInfo != nil && job.BinlogInfo.TableInfo.ID == table.Info.ID) ||
tableIDs[job.SchemaID] {
tableIDs[job.SchemaID] = true
if job.BinlogInfo.TableInfo != nil {
tableIDs[job.BinlogInfo.TableInfo.ID] = true
}
ddlJobs = append(ddlJobs, job)
}
}
default:
return errors.New("must set db when table was set")
}
Expand All @@ -152,10 +195,15 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error
}
ranges, err := restore.ValidateFileRanges(files, rewriteRules)
if err != nil {
return err
return errors.Trace(err)
}
summary.CollectInt("restore ranges", len(ranges))

err = client.ExecDDLs(ddlJobs)
if err != nil {
return errors.Trace(err)
}

// Redirect to log if there is no log file to avoid unreadable output.
updateCh := utils.StartProgress(
ctx,
Expand Down
10 changes: 5 additions & 5 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,19 +216,19 @@ func newBackupMetaCommand() *cobra.Command {
newTable := new(model.TableInfo)
tableID, _ := tableIDAllocator.Alloc()
newTable.ID = int64(tableID)
newTable.Name = table.Schema.Name
newTable.Indices = make([]*model.IndexInfo, len(table.Schema.Indices))
for i, indexInfo := range table.Schema.Indices {
newTable.Name = table.Info.Name
newTable.Indices = make([]*model.IndexInfo, len(table.Info.Indices))
for i, indexInfo := range table.Info.Indices {
indexID, _ := indexIDAllocator.Alloc()
newTable.Indices[i] = &model.IndexInfo{
ID: int64(indexID),
Name: indexInfo.Name,
}
}
rules := restore.GetRewriteRules(newTable, table.Schema, 0)
rules := restore.GetRewriteRules(newTable, table.Info, 0)
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
tableIDMap[table.Schema.ID] = int64(tableID)
tableIDMap[table.Info.ID] = int64(tableID)
}
// Validate rewrite rules
for _, file := range files {
Expand Down
51 changes: 50 additions & 1 deletion pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -121,7 +122,12 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend
}

// SaveBackupMeta saves the current backup meta at the given path.
func (bc *Client) SaveBackupMeta(ctx context.Context) error {
func (bc *Client) SaveBackupMeta(ctx context.Context, ddlJobs []*model.Job) error {
ddlJobsData, err := json.Marshal(ddlJobs)
if err != nil {
return errors.Trace(err)
}
bc.backupMeta.Ddls = ddlJobsData
backupMetaData, err := proto.Marshal(&bc.backupMeta)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -276,6 +282,49 @@ LoadDb:
return ranges, backupSchemas, nil
}

// GetBackupDDLJobs returns the ddl jobs are done in (lastBackupTS, backupTS]
func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*model.Job, error) {
snapMeta, err := dom.GetSnapshotMeta(backupTS)
if err != nil {
return nil, errors.Trace(err)
}
lastSnapMeta, err := dom.GetSnapshotMeta(lastBackupTS)
if err != nil {
return nil, errors.Trace(err)
}
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersion()
if err != nil {
return nil, errors.Trace(err)
}
allJobs := make([]*model.Job, 0)
defaultJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.DefaultJobListKey)
if err != nil {
return nil, errors.Trace(err)
}
allJobs = append(allJobs, defaultJobs...)
addIndexJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.AddIndexJobListKey)
if err != nil {
return nil, errors.Trace(err)
}
allJobs = append(allJobs, addIndexJobs...)
historyJobs, err := snapMeta.GetAllHistoryDDLJobs()
if err != nil {
return nil, errors.Trace(err)
}
allJobs = append(allJobs, historyJobs...)

completedJobs := make([]*model.Job, 0)
for _, job := range allJobs {
if job.State != model.JobStateDone ||
job.BinlogInfo == nil ||
job.BinlogInfo.SchemaVersion <= lastSchemaVersion {
continue
}
completedJobs = append(completedJobs, job)
}
return completedJobs, nil
}

// BackupRanges make a backup of the given key ranges.
func (bc *Client) BackupRanges(
ctx context.Context,
Expand Down
8 changes: 4 additions & 4 deletions pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func buildChecksumRequest(
reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1))
var oldTableID int64
if oldTable != nil {
oldTableID = oldTable.Schema.ID
oldTableID = oldTable.Info.ID
}
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS)
if err != nil {
Expand All @@ -72,7 +72,7 @@ func buildChecksumRequest(
for _, partDef := range partDefs {
var oldPartID int64
if oldTable != nil {
for _, oldPartDef := range oldTable.Schema.Partition.Definitions {
for _, oldPartDef := range oldTable.Info.Partition.Definitions {
if oldPartDef.Name == partDef.Name {
oldPartID = oldPartDef.ID
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func buildRequest(
}
var oldIndexInfo *model.IndexInfo
if oldTable != nil {
for _, oldIndex := range oldTable.Schema.Indices {
for _, oldIndex := range oldTable.Info.Indices {
if oldIndex.Name == indexInfo.Name {
oldIndexInfo = oldIndex
break
Expand All @@ -117,7 +117,7 @@ func buildRequest(
if oldIndexInfo == nil {
log.Panic("index not found",
zap.Reflect("table", tableInfo),
zap.Reflect("oldTable", oldTable.Schema),
zap.Reflect("oldTable", oldTable.Info),
zap.Stringer("index", indexInfo.Name))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/checksum/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testChecksumSuite) TestChecksum(c *C) {
// Test rewrite rules
tk.MustExec("alter table t1 add index i2(a);")
tableInfo1 = s.getTableInfo(c, "test", "t1")
oldTable := utils.Table{Schema: tableInfo1}
oldTable := utils.Table{Info: tableInfo1}
exe2, err = NewExecutorBuilder(tableInfo2, math.MaxUint64).
SetOldTable(&oldTable).Build()
c.Assert(err, IsNil)
Expand Down
Loading