Skip to content

Commit

Permalink
br: error out if pitr with table filter has exchange partition in/out…
Browse files Browse the repository at this point in the history
… filter range (#59683)

close #59723
  • Loading branch information
Tristan1900 authored Mar 1, 2025
1 parent b31b12d commit c2588e1
Show file tree
Hide file tree
Showing 5 changed files with 780 additions and 8 deletions.
7 changes: 7 additions & 0 deletions br/pkg/restore/log_client/batch_meta_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ func (mp *MetaKVInfoProcessor) ProcessBatch(

// add to table rename history
mp.tableHistoryManager.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID)

// track partitions if this is a partitioned table
if tableInfo.Partition != nil {
for _, def := range tableInfo.Partition.Definitions {
mp.tableHistoryManager.AddPartitionHistory(def.ID, tableInfo.Name.String(), dbID, tableInfo.ID)
}
}
}
}
}
Expand Down
24 changes: 20 additions & 4 deletions br/pkg/stream/table_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package stream

// TableLocationInfo stores the table name, db id, and parent table id if is a partition
type TableLocationInfo struct {
DbID int64
TableName string
DbID int64
TableName string
IsPartition bool
ParentTableID int64 // only meaningful when IsPartition is true
}

type LogBackupTableHistoryManager struct {
Expand All @@ -36,12 +38,26 @@ func NewTableHistoryManager() *LogBackupTableHistoryManager {
// AddTableHistory adds or updates history for a regular table
func (info *LogBackupTableHistoryManager) AddTableHistory(tableId int64, tableName string, dbID int64) {
locationInfo := TableLocationInfo{
DbID: dbID,
TableName: tableName,
DbID: dbID,
TableName: tableName,
IsPartition: false,
ParentTableID: 0,
}
info.addHistory(tableId, locationInfo)
}

// AddPartitionHistory adds or updates history for a partition
func (info *LogBackupTableHistoryManager) AddPartitionHistory(partitionID int64, tableName string,
dbID int64, parentTableID int64) {
locationInfo := TableLocationInfo{
DbID: dbID,
TableName: tableName,
IsPartition: true,
ParentTableID: parentTableID,
}
info.addHistory(partitionID, locationInfo)
}

// addHistory is a helper method to maintain the history
func (info *LogBackupTableHistoryManager) addHistory(id int64, locationInfo TableLocationInfo) {
existing, exists := info.tableNameHistory[id]
Expand Down
90 changes: 88 additions & 2 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,11 @@ func handleTableRenames(
start := dbIDAndTableName[0]
end := dbIDAndTableName[1]

// skip if it contains partition
if start.IsPartition || end.IsPartition {
continue
}

startDBName, exists := getDBNameFromIDInBackup(start.DbID, snapshotDBMap, history)
if !exists {
continue
Expand Down Expand Up @@ -1488,6 +1493,84 @@ func handleTableRenames(
}
}

// shouldRestoreTable checks if a table or partition is being tracked for restore
func shouldRestoreTable(
dbID int64,
tableName string,
isPartition bool,
parentTableID int64,
snapshotDBMap map[int64]*metautil.Database,
history *stream.LogBackupTableHistoryManager,
cfg *RestoreConfig,
) bool {
if isPartition {
return cfg.PiTRTableTracker.ContainsTableId(dbID, parentTableID)
}
dbName, exists := getDBNameFromIDInBackup(dbID, snapshotDBMap, history)
if !exists {
return false
}
return utils.MatchTable(cfg.TableFilter, dbName, tableName, cfg.WithSysTable)
}

// handlePartitionExchanges checks for partition exchanges and returns an error if a partition
// was exchanged between tables where one is in the filter and one is not
func handlePartitionExchanges(
history *stream.LogBackupTableHistoryManager,
snapshotDBMap map[int64]*metautil.Database,
cfg *RestoreConfig,
) error {
for tableId, dbIDAndTableName := range history.GetTableHistory() {
start := dbIDAndTableName[0]
end := dbIDAndTableName[1]

// skip if both are not partition
if !start.IsPartition && !end.IsPartition {
continue
}

// skip if parent table id are the same (if it's a table, parent table id will be 0)
if start.ParentTableID == end.ParentTableID {
continue
}

restoreStart := shouldRestoreTable(start.DbID, start.TableName, start.IsPartition, start.ParentTableID,
snapshotDBMap, history, cfg)
restoreEnd := shouldRestoreTable(end.DbID, end.TableName, end.IsPartition, end.ParentTableID,
snapshotDBMap, history, cfg)

// error out if partition is exchanged between tables where one should restore and one shouldn't
if restoreStart != restoreEnd {
startDBName, exists := getDBNameFromIDInBackup(start.DbID, snapshotDBMap, history)
if !exists {
startDBName = fmt.Sprintf("(unknown db name %d)", start.DbID)
}
endDBName, exists := getDBNameFromIDInBackup(end.DbID, snapshotDBMap, history)
if !exists {
endDBName = fmt.Sprintf("(unknown db name %d)", end.DbID)
}

return errors.Annotatef(berrors.ErrRestoreModeMismatch,
"partition exchange detected: partition ID %d was exchanged from table '%s.%s' (ID: %d) "+
"eventually to table '%s.%s' (ID: %d), which is not supported in table filter",
tableId, startDBName, start.TableName, start.ParentTableID,
endDBName, end.TableName, end.ParentTableID)
}

// if we reach here, it will only be both are restore or not restore,
// if it's table, need to add to table tracker, this is for table created during log backup.
// if it's table and exist in snapshot, the actual table and files should already been added
// since matches filter.
if restoreStart && !start.IsPartition {
cfg.PiTRTableTracker.TrackTableId(start.DbID, tableId)
}
if restoreEnd && !end.IsPartition {
cfg.PiTRTableTracker.TrackTableId(end.DbID, tableId)
}
}
return nil
}

func AdjustTablesToRestoreAndCreateTableTracker(
logBackupTableHistory *stream.LogBackupTableHistoryManager,
cfg *RestoreConfig,
Expand All @@ -1498,6 +1581,7 @@ func AdjustTablesToRestoreAndCreateTableTracker(
) (err error) {
// build tracker for pitr restore to use later
piTRIdTracker := utils.NewPiTRIdTracker()
cfg.PiTRTableTracker = piTRIdTracker

// track newly created databases
newlyCreatedDBs := logBackupTableHistory.GetNewlyCreatedDBHistory()
Expand All @@ -1510,15 +1594,17 @@ func AdjustTablesToRestoreAndCreateTableTracker(
// first handle table renames to determine which tables we need
handleTableRenames(logBackupTableHistory, snapshotDBMap, cfg, tableMap, dbMap, fileMap, piTRIdTracker)

// handle partition exchange if needed in future
// handle partition exchange after all tables are tracked
if err := handlePartitionExchanges(logBackupTableHistory, snapshotDBMap, cfg); err != nil {
return err
}

// track all snapshot tables that's going to restore in PiTR tracker
for tableID, table := range tableMap {
piTRIdTracker.TrackTableId(table.DB.ID, tableID)
}

log.Info("pitr table tracker", zap.String("map", piTRIdTracker.String()))
cfg.PiTRTableTracker = piTRIdTracker
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1981,7 +1981,10 @@ func buildPauseSafePointName(taskName string) string {
return fmt.Sprintf("%s_pause_safepoint", taskName)
}

func checkPiTRRequirements(mgr *conn.Mgr) error {
func checkPiTRRequirements(mgr *conn.Mgr, hasExplicitFilter bool) error {
if hasExplicitFilter {
return nil
}
return restore.AssertUserDBsEmpty(mgr.GetDomain())
}

Expand Down Expand Up @@ -2057,7 +2060,7 @@ func generatePiTRTaskInfo(
// Only when use checkpoint and not the first execution,
// skip checking requirements.
log.Info("check pitr requirements for the first execution")
if err := checkPiTRRequirements(mgr); err != nil {
if err := checkPiTRRequirements(mgr, cfg.ExplicitFilter); err != nil {
// delay cluster checks after we get the backupmeta.
// for the case that the restore inc + log backup,
// we can still restore them.
Expand Down
Loading

0 comments on commit c2588e1

Please sign in to comment.