From ea943b893215f5f7f733fa65bfdf4a0426be91f2 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 30 Jan 2023 17:51:46 +0800 Subject: [PATCH 1/6] prevent from restore point to cluster running log backup Signed-off-by: Leavrth --- br/pkg/task/stream.go | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 8596eb89d6974..7e381aba7ad49 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -862,8 +862,8 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre return nil } -func checkConfigForStatus(cfg *StreamConfig) error { - if len(cfg.PD) == 0 { +func checkConfigForStatus(pd []string) error { + if len(pd) == 0 { return errors.Annotatef(berrors.ErrInvalidArgument, "the command needs access to PD, please specify `-u` or `--pd`") } @@ -913,7 +913,7 @@ func RunStreamStatus( ctx = opentracing.ContextWithSpan(ctx, span1) } - if err := checkConfigForStatus(cfg); err != nil { + if err := checkConfigForStatus(cfg.PD); err != nil { return err } ctl, err := makeStatusController(ctx, cfg, g) @@ -1028,6 +1028,27 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre return nil } +// checkTaskExists checks whether there is a log backup task running. +// If so, return an error. +func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { + if err := checkConfigForStatus(cfg.PD); err != nil { + return err + } + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return err + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + tasks, err := cli.GetAllTasks(ctx) + if err != nil { + return err + } + if len(tasks) > 0 { + return errors.Errorf("There is a log backup task running: %s, please stop the task before restore", tasks[0].Info.Name) + } + return nil +} + // RunStreamRestore restores stream log. func RunStreamRestore( c context.Context, @@ -1044,6 +1065,10 @@ func RunStreamRestore( ctx = opentracing.ContextWithSpan(ctx, span1) } + if err := checkTaskExists(ctx, cfg); err != nil { + return errors.Trace(err) + } + logInfo, err := getLogRange(ctx, &cfg.Config) if err != nil { return errors.Trace(err) From b81bd68781ddd33213607ff86dbed2cf643a6a8e Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 30 Jan 2023 18:40:49 +0800 Subject: [PATCH 2/6] close the etcd client in time Signed-off-by: Leavrth --- br/pkg/task/stream.go | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 7e381aba7ad49..823b1fdcae968 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1039,6 +1039,7 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { return err } cli := streamhelper.NewMetaDataClient(etcdCLI) + defer cli.Close() tasks, err := cli.GetAllTasks(ctx) if err != nil { return err From 75374c5f8a9fa682346ce70ac3dd7fe2d388e9e5 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 3 Feb 2023 16:26:42 +0800 Subject: [PATCH 3/6] resolve call cycle Signed-off-by: Leavrth --- br/pkg/backup/push.go | 3 +-- br/pkg/task/restore.go | 7 +++++++ br/pkg/task/stream.go | 6 +----- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 2ffffe690ffe5..ed929a12895a4 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -218,10 +218,9 @@ func (push *pushDown) pushBackup( if len(errMsg) <= 0 { errMsg = errPb.Msg } - return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s %s", + return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s", store.GetId(), redact.String(store.GetAddress()), - req.StorageBackend.String(), errMsg, ) } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 527e07f28ea27..b4092e454e079 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -474,10 +474,17 @@ func IsStreamRestore(cmdName string) bool { // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { + if err := checkTaskExists(c, cfg); err != nil { + return errors.Trace(err) + } + if IsStreamRestore(cmdName) { return RunStreamRestore(c, g, cmdName, cfg) } + return runRestore(c, g, cmdName, cfg) +} +func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { cfg.Adjust() defer summary.Summary(cmdName) ctx, cancel := context.WithCancel(c) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 823b1fdcae968..34269baa945f9 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1066,10 +1066,6 @@ func RunStreamRestore( ctx = opentracing.ContextWithSpan(ctx, span1) } - if err := checkTaskExists(ctx, cfg); err != nil { - return errors.Trace(err) - } - logInfo, err := getLogRange(ctx, &cfg.Config) if err != nil { return errors.Trace(err) @@ -1115,7 +1111,7 @@ func RunStreamRestore( logStorage := cfg.Config.Storage cfg.Config.Storage = cfg.FullBackupStorage // TiFlash replica is restored to down-stream on 'pitr' currently. - if err = RunRestore(ctx, g, FullRestoreCmd, cfg); err != nil { + if err = runRestore(ctx, g, FullRestoreCmd, cfg); err != nil { return errors.Trace(err) } cfg.Config.Storage = logStorage From faec281597eb73e15cf1bb37291ab2e5561ce540 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Fri, 3 Feb 2023 16:28:22 +0800 Subject: [PATCH 4/6] Update br/pkg/task/stream.go Co-authored-by: Zak Zhao <57036248+joccau@users.noreply.github.com> --- br/pkg/task/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 34269baa945f9..c40ec5905edfc 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1045,7 +1045,7 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { return err } if len(tasks) > 0 { - return errors.Errorf("There is a log backup task running: %s, please stop the task before restore", tasks[0].Info.Name) + return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster.", tasks[0].Info.Name) } return nil } From 406522a46c9dfb0acd7a19075de8f3e58a7dcb56 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 7 Feb 2023 16:31:16 +0800 Subject: [PATCH 5/6] add more info in error Signed-off-by: Leavrth --- br/pkg/task/restore.go | 2 +- br/pkg/task/stream.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index b4092e454e079..601897883e727 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -475,7 +475,7 @@ func IsStreamRestore(cmdName string) bool { // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { if err := checkTaskExists(c, cfg); err != nil { - return errors.Trace(err) + return errors.Annotate(err, "failed to check task exits") } if IsStreamRestore(cmdName) { diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index c40ec5905edfc..93e0f90f4de67 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1039,7 +1039,11 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { return err } cli := streamhelper.NewMetaDataClient(etcdCLI) - defer cli.Close() + defer func() { + if err := cli.Close(); err != nil { + log.Error("failed to close the etcd client", zap.Error(err)) + } + }() tasks, err := cli.GetAllTasks(ctx) if err != nil { return err From 7b38f78f99726f4121f82ee2fa4fd74701e29ff9 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 7 Feb 2023 18:24:43 +0800 Subject: [PATCH 6/6] add integration test Signed-off-by: Leavrth --- br/pkg/task/stream.go | 2 +- br/tests/br_restore_log_task_enable/run.sh | 56 ++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 br/tests/br_restore_log_task_enable/run.sh diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 93e0f90f4de67..6a0eaada78679 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1049,7 +1049,7 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { return err } if len(tasks) > 0 { - return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster.", tasks[0].Info.Name) + return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name) } return nil } diff --git a/br/tests/br_restore_log_task_enable/run.sh b/br/tests/br_restore_log_task_enable/run.sh new file mode 100644 index 0000000000000..923f8fe7c2b33 --- /dev/null +++ b/br/tests/br_restore_log_task_enable/run.sh @@ -0,0 +1,56 @@ +#!/bin/sh +# +# Copyright 2022 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux +DB="$TEST_NAME" +TABLE="usertable" + +# start log task +run_br log start --task-name 1234 -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR + +run_sql "CREATE DATABASE $DB;" +run_sql "CREATE TABLE $DB.$TABLE (id int);" +run_sql "INSERT INTO $DB.$TABLE VALUES (1), (2), (3);" + +# backup full +run_br backup full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR + +# clean db +run_sql "DROP DATABASE $DB;" + +# restore full (should be failed) +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1 + +# restore point (should be failed) +run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1 + +# pause log task +run_br log pause --task-name 1234 --pd $PD_ADDR + +# restore full (should be failed) +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1 + +# restore point (should be failed) +run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1 + +# stop log task +run_br log stop --task-name 1234 --pd $PD_ADDR + +# restore full (should be success) +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR + +# clean db +run_sql "DROP DATABASE $DB"