Skip to content

Commit

Permalink
br: filter (start-ts, restore-ts) when restore data kv (#34703)
Browse files Browse the repository at this point in the history
close #33873
  • Loading branch information
joccau authored May 18, 2022
1 parent 95e359f commit 2faac4f
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 64 deletions.
4 changes: 2 additions & 2 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func newStreamCheckCommand() *cobra.Command {
Short: "get the metadata of log dir.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return streamCommand(cmd, task.StreamCheck)
return streamCommand(cmd, task.StreamMetadata)
},
}
return command
Expand All @@ -171,7 +171,7 @@ func streamCommand(command *cobra.Command, cmdName string) error {
}

switch cmdName {
case task.StreamCheck:
case task.StreamMetadata:
{
// do nothing.
}
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1681,7 +1681,11 @@ func (rc *Client) RestoreKVFiles(
summary.CollectInt("File", 1)
log.Info("import files done", zap.String("name", file.Path), zap.Duration("take", time.Since(fileStart)))
}()
return rc.fileImporter.ImportKVFiles(ectx, file, rule, rc.restoreTS)
startTS := rc.startTS
if file.Cf == stream.DefaultCF {
startTS = rc.shiftStartTS
}
return rc.fileImporter.ImportKVFiles(ectx, file, rule, startTS, rc.restoreTS)
})
}
}
Expand Down
27 changes: 16 additions & 11 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,12 @@ func (importer *FileImporter) ImportKVFileForRegion(
ctx context.Context,
file *backuppb.DataFileInfo,
rule *RewriteRules,
restoreTs uint64,
startTS uint64,
restoreTS uint64,
info *RegionInfo,
) RPCResult {
// Try to download file.
result := importer.downloadAndApplyKVFile(ctx, file, rule, info, restoreTs)
result := importer.downloadAndApplyKVFile(ctx, file, rule, info, startTS, restoreTS)
if !result.OK() {
errDownload := result.Err
for _, e := range multierr.Errors(errDownload) {
Expand Down Expand Up @@ -380,11 +381,13 @@ func (importer *FileImporter) ClearFiles(ctx context.Context, pdClient pd.Client
return nil
}

// ImportKVFiles restores the kv events.
func (importer *FileImporter) ImportKVFiles(
ctx context.Context,
file *backuppb.DataFileInfo,
rule *RewriteRules,
restoreTs uint64,
startTS uint64,
restoreTS uint64,
) error {
startTime := time.Now()
log.Debug("import kv files", zap.String("file", file.Path))
Expand All @@ -401,7 +404,7 @@ func (importer *FileImporter) ImportKVFiles(
rs := utils.InitialRetryState(32, 100*time.Millisecond, 8*time.Second)
ctl := OverRegionsInRange(startKey, endKey, importer.metaClient, &rs)
err = ctl.Run(ctx, func(ctx context.Context, r *RegionInfo) RPCResult {
return importer.ImportKVFileForRegion(ctx, file, rule, restoreTs, r)
return importer.ImportKVFileForRegion(ctx, file, rule, startTS, restoreTS, r)
})

log.Debug("download and apply file done",
Expand Down Expand Up @@ -801,7 +804,8 @@ func (importer *FileImporter) downloadAndApplyKVFile(
file *backuppb.DataFileInfo,
rules *RewriteRules,
regionInfo *RegionInfo,
restoreTs uint64,
startTS uint64,
restoreTS uint64,
) RPCResult {
leader := regionInfo.Leader
if leader == nil {
Expand All @@ -823,12 +827,13 @@ func (importer *FileImporter) downloadAndApplyKVFile(
Name: file.Path,
Cf: file.Cf,
// TODO fill the length
Length: 0,
IsDelete: file.Type == backuppb.FileType_Delete,
RestoreTs: restoreTs,
StartKey: regionInfo.Region.GetStartKey(),
EndKey: regionInfo.Region.GetEndKey(),
Sha256: file.GetSha256(),
Length: 0,
IsDelete: file.Type == backuppb.FileType_Delete,
StartSnapshotTs: startTS,
RestoreTs: restoreTS,
StartKey: regionInfo.Region.GetStartKey(),
EndKey: regionInfo.Region.GetEndKey(),
Sha256: file.GetSha256(),
}

reqCtx := &kvrpcpb.Context{
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ func swapAndOverrideFile(ctx context.Context, s storage.ExternalStorage, path st
const (
// TruncateSafePointFileName is the filename that the ts(the log have been truncated) is saved into.
TruncateSafePointFileName = "v1_stream_trancate_safepoint.txt"
// GlobalCheckpointFileName is the filename that the ts(the global checkpoint) is saved into.
GlobalCheckpointFileName = "v1_stream_global_checkpoint.txt"
)

// GetTSFromFile gets the current truncate safepoint.
Expand Down
107 changes: 61 additions & 46 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"sort"
"strings"
Expand Down Expand Up @@ -67,7 +68,7 @@ var (
StreamResume = "log resume"
StreamStatus = "log status"
StreamTruncate = "log truncate"
StreamCheck = "log check"
StreamMetadata = "log metadata"

skipSummaryCommandList = map[string]struct{}{
StreamStatus: {},
Expand All @@ -87,7 +88,7 @@ var StreamCommandMap = map[string]func(c context.Context, g glue.Glue, cmdName s
StreamResume: RunStreamResume,
StreamStatus: RunStreamStatus,
StreamTruncate: RunStreamTruncate,
StreamCheck: RunStreamCheck,
StreamMetadata: RunStreamMetadata,
}

// StreamConfig specifies the configure about backup stream
Expand Down Expand Up @@ -137,14 +138,14 @@ func DefineStreamStartFlags(flags *pflag.FlagSet) {
flags.String(flagStreamEndTS, "2035-1-1 00:00:00", "end ts, indicate stopping observe after endTS"+
"support TSO or datetime")
_ = flags.MarkHidden(flagStreamEndTS)
flags.Int64(flagGCSafePointTTS, utils.DefaultBRGCSafePointTTL,
flags.Int64(flagGCSafePointTTS, utils.DefaultStreamStartSafePointTTL,
"the TTL (in seconds) that PD holds for BR's GC safepoint")
_ = flags.MarkHidden(flagGCSafePointTTS)
}

func DefineStreamPauseFlags(flags *pflag.FlagSet) {
DefineStreamCommonFlags(flags)
flags.Int64(flagGCSafePointTTS, utils.DefaultStreamGCSafePointTTL,
flags.Int64(flagGCSafePointTTS, utils.DefaultStreamPauseSafePointTTL,
"the TTL (in seconds) that PD holds for BR's GC safepoint")
}

Expand Down Expand Up @@ -230,7 +231,7 @@ func (cfg *StreamConfig) ParseStreamStartFromFlags(flags *pflag.FlagSet) error {
}

if cfg.SafePointTTL <= 0 {
cfg.SafePointTTL = utils.DefaultBRGCSafePointTTL
cfg.SafePointTTL = utils.DefaultStreamStartSafePointTTL
}

return nil
Expand All @@ -247,7 +248,7 @@ func (cfg *StreamConfig) ParseStreamPauseFromFlags(flags *pflag.FlagSet) error {
return errors.Trace(err)
}
if cfg.SafePointTTL <= 0 {
cfg.SafePointTTL = utils.DefaultStreamGCSafePointTTL
cfg.SafePointTTL = utils.DefaultStreamPauseSafePointTTL
}
return nil
}
Expand Down Expand Up @@ -296,6 +297,10 @@ func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamS
if err != nil {
return nil, errors.Trace(err)
}
if backend.GetS3() == nil {
return nil, errors.Annotate(berrors.ErrStorageInvalidConfig,
"Only support s3 storage currently.")
}

opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
Expand Down Expand Up @@ -351,20 +356,14 @@ func (s *streamMgr) adjustAndCheckStartTS(ctx context.Context) error {
return nil
}

// setGCSafePoint specifies currentTS should belong to (gcSafePoint, currentTS),
// and set startTS as a serverSafePoint to PD
func (s *streamMgr) setGCSafePoint(ctx context.Context, safePoint uint64) error {
err := utils.CheckGCSafePoint(ctx, s.mgr.GetPDClient(), safePoint)
// setGCSafePoint sets the server safe point to PD.
func (s *streamMgr) setGCSafePoint(ctx context.Context, sp utils.BRServiceSafePoint) error {
err := utils.CheckGCSafePoint(ctx, s.mgr.GetPDClient(), sp.BackupTS)
if err != nil {
return errors.Annotatef(err,
"failed to check gc safePoint, ts %v", safePoint)
"failed to check gc safePoint, ts %v", sp.BackupTS)
}

sp := utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: s.cfg.SafePointTTL,
BackupTS: safePoint,
}
err = utils.UpdateServiceSafePoint(ctx, s.mgr.GetPDClient(), sp)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -508,7 +507,15 @@ func RunStreamStart(
if err = streamMgr.adjustAndCheckStartTS(ctx); err != nil {
return errors.Trace(err)
}
if err = streamMgr.setGCSafePoint(ctx, streamMgr.cfg.StartTS); err != nil {

if err = streamMgr.setGCSafePoint(
ctx,
utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: cfg.SafePointTTL,
BackupTS: cfg.StartTS,
},
); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -558,7 +565,7 @@ func RunStreamStart(
return nil
}

func RunStreamCheck(
func RunStreamMetadata(
c context.Context,
g glue.Glue,
cmdName string,
Expand Down Expand Up @@ -614,34 +621,25 @@ func RunStreamStop(
}
defer streamMgr.close()

storage, err := cfg.makeStorage(ctx)
if err != nil {
return errors.Trace(err)
}

cli := stream.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
// to add backoff
ti, err := cli.GetTask(ctx, cfg.TaskName)
if err != nil {
return errors.Trace(err)
}

globalCheckpointTS, err := ti.GetGlobalCheckPointTS(ctx)
if err != nil {
if err = cli.DeleteTask(ctx, cfg.TaskName); err != nil {
return errors.Trace(err)
}

if err = restore.SetTSToFile(
ctx,
storage,
globalCheckpointTS,
restore.GlobalCheckpointFileName,
if err := streamMgr.setGCSafePoint(ctx,
utils.BRServiceSafePoint{
ID: buildPauseSafePointName(ti.Info.Name),
TTL: 0,
BackupTS: 0,
},
); err != nil {
return errors.Trace(err)
}

if err = cli.DeleteTask(ctx, cfg.TaskName); err != nil {
return errors.Trace(err)
log.Warn("failed to remove safe point", zap.String("error", err.Error()))
}

summary.Log(cmdName, logutil.StreamBackupTaskInfo(&ti.Info))
Expand Down Expand Up @@ -686,14 +684,22 @@ func RunStreamPause(
if err != nil {
return errors.Trace(err)
}
if err = streamMgr.setGCSafePoint(ctx, globalCheckPointTS); err != nil {
if err = streamMgr.setGCSafePoint(
ctx,
utils.BRServiceSafePoint{
ID: buildPauseSafePointName(ti.Info.Name),
TTL: cfg.SafePointTTL,
BackupTS: globalCheckPointTS,
},
); err != nil {
return errors.Trace(err)
}

err = cli.PauseTask(ctx, cfg.TaskName)
if err != nil {
return errors.Trace(err)
}

summary.Log(cmdName, logutil.StreamBackupTaskInfo(&ti.Info))
return nil
}
Expand Down Expand Up @@ -752,6 +758,18 @@ func RunStreamResume(
if err != nil {
return err
}

if err := streamMgr.setGCSafePoint(ctx,
utils.BRServiceSafePoint{
ID: buildPauseSafePointName(ti.Info.Name),
TTL: 0,
BackupTS: globalCheckPointTS,
},
); err != nil {
log.Warn("failed to remove safe point",
zap.Uint64("safe-point", globalCheckPointTS), zap.String("error", err.Error()))
}

summary.Log(cmdName, logutil.StreamBackupTaskInfo(&ti.Info))
return nil
}
Expand Down Expand Up @@ -1180,19 +1198,12 @@ func getLogRange(
}
logMinTS := mathutil.Max(logStartTS, truncateTS)

// get log global checkpoint ts from GlobalCheckpointFileName.
// If globalCheckpointTS equals 0, which represents the log task has not been stop.
logMaxTS, err := restore.GetTSFromFile(ctx, s, restore.GlobalCheckpointFileName)
// get max global resolved ts from metas.
logMaxTS, err := getGlobalResolvedTS(ctx, s)
if err != nil {
return 0, 0, errors.Trace(err)
}

// get max global resolved ts from metas.
if logMaxTS == 0 {
if logMaxTS, err = getGlobalResolvedTS(ctx, s); err != nil {
return 0, 0, errors.Trace(err)
}
}
logMaxTS = mathutil.Max(logMinTS, logMaxTS)

return logMinTS, logMaxTS, nil
}
Expand Down Expand Up @@ -1434,3 +1445,7 @@ func ShiftTS(startTS uint64) uint64 {
return oracle.ComposeTS(shiftPhysical, logical)
}
}

func buildPauseSafePointName(taskName string) string {
return fmt.Sprintf("%s_pause_safepoint", taskName)
}
6 changes: 4 additions & 2 deletions br/pkg/utils/safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ const (
checkGCSafePointGapTime = 5 * time.Second
// DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min.
DefaultBRGCSafePointTTL = 5 * 60
// DefaultStreamGCSafePointTTL specifies Keeping the GC safePoint at list 24h.
DefaultStreamGCSafePointTTL = 24 * 3600
// DefaultStreamStartSafePointTTL specifies keeping the server safepoint 30 mins when start task.
DefaultStreamStartSafePointTTL = 1800
// DefaultStreamPauseSafePointTTL specifies Keeping the server safePoint at list 24h when pause task.
DefaultStreamPauseSafePointTTL = 24 * 3600
)

// BRServiceSafePoint is metadata of service safe point from a BR 'instance'.
Expand Down

0 comments on commit 2faac4f

Please sign in to comment.