diff --git a/DEPS.bzl b/DEPS.bzl index 55923713a952a..4f11ed06cdd58 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3061,9 +3061,9 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sum = "h1:SnvWHM4JSkn9TFLIjrSRanpliqnmgk+y0MuoXC77y6I=", - version = "v0.0.0-20230524051921-3dc79e773139", - ) + sum = "h1:tBKPWWqgWEBs04BV4UN7RhtUkZDs0oz+WyMbtRDVtL8=", + version = "v0.0.0-20230928035022-1bdcc25ed63c", + ) go_repository( name = "com_github_pingcap_log", build_file_proto_mode = "disable_global", diff --git a/br/cmd/br/main.go b/br/cmd/br/main.go index d70d9425e0653..5eca340f1e622 100644 --- a/br/cmd/br/main.go +++ b/br/cmd/br/main.go @@ -49,7 +49,7 @@ func main() { NewBackupCommand(), NewRestoreCommand(), NewStreamCommand(), - newOpeartorCommand(), + newOperatorCommand(), ) // Outputs cmd.Print to stdout. rootCmd.SetOut(os.Stdout) diff --git a/br/cmd/br/operator.go b/br/cmd/br/operator.go index 57ed59b224d06..2a6d80aa12ffa 100644 --- a/br/cmd/br/operator.go +++ b/br/cmd/br/operator.go @@ -11,7 +11,7 @@ import ( "github.com/spf13/cobra" ) -func newOpeartorCommand() *cobra.Command { +func newOperatorCommand() *cobra.Command { cmd := &cobra.Command{ Use: "operator ", Short: "utilities for operators like tidb-operator.", @@ -26,14 +26,19 @@ func newOpeartorCommand() *cobra.Command { }, Hidden: true, } - cmd.AddCommand(newPauseGcCommand()) + cmd.AddCommand(newPrepareForSnapshotBackupCommand( + "pause-gc-and-schedulers", + "(Will be replaced with `prepare-for-snapshot-backup`) pause gc, schedulers and importing until the program exits.")) + cmd.AddCommand(newPrepareForSnapshotBackupCommand( + "prepare-for-snapshot-backup", + "pause gc, schedulers and importing until the program exits, for snapshot backup.")) return cmd } -func newPauseGcCommand() *cobra.Command { +func newPrepareForSnapshotBackupCommand(use string, short string) *cobra.Command { cmd := &cobra.Command{ - Use: "pause-gc-and-schedulers", - Short: "pause gc and schedulers to the ts until the program exits.", + Use: use, + Short: short, Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { cfg := operator.PauseGcConfig{} @@ -41,9 +46,9 @@ func newPauseGcCommand() *cobra.Command { return err } ctx := GetDefaultContext() - return operator.PauseGCAndScheduler(ctx, &cfg) + return operator.AdaptEnvForSnapshotBackup(ctx, &cfg) }, } - operator.DefineFlagsForPauseGcConfig(cmd.Flags()) + operator.DefineFlagsForPrepareSnapBackup(cmd.Flags()) return cmd } diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index 2b7d76e28d795..e34805d8f77dc 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -111,4 +111,6 @@ var ( ErrKVDownloadFailed = errors.Normalize("download sst failed", errors.RFCCodeText("BR:KV:ErrKVDownloadFailed")) // ErrKVIngestFailed indicates a generic, retryable ingest error. ErrKVIngestFailed = errors.Normalize("ingest sst failed", errors.RFCCodeText("BR:KV:ErrKVIngestFailed")) + + ErrPossibleInconsistency = errors.Normalize("the cluster state might be inconsistent", errors.RFCCodeText("BR:KV:ErrPossibleInconsistency")) ) diff --git a/br/pkg/task/backup_ebs.go b/br/pkg/task/backup_ebs.go index 5bcdb7977b3b8..0b59cf6f061a8 100644 --- a/br/pkg/task/backup_ebs.go +++ b/br/pkg/task/backup_ebs.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/conn/util" "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/storage" @@ -144,6 +145,16 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { if e != nil { return errors.Trace(err) } + denyLightning := utils.NewSuspendImporting("backup_ebs_command", mgr.StoreManager) + _, err := denyLightning.DenyAllStores(ctx, utils.DefaultBRGCSafePointTTL) + if err != nil { + return errors.Annotate(err, "lightning from running") + } + go func() { + if err := denyLightning.Keeper(ctx, utils.DefaultBRGCSafePointTTL); err != nil { + log.Warn("cannot keep deny importing, the backup archive may not be useable if there were importing.", logutil.ShortError(err)) + } + }() defer func() { if ctx.Err() != nil { log.Warn("context canceled, doing clean work with background context") @@ -155,6 +166,13 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { if restoreE := restoreFunc(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } + res, err := denyLightning.AllowAllStores(ctx) + if err != nil { + log.Warn("failed to restore importing, you may need to wait until you are able to start importing", zap.Duration("wait_for", utils.DefaultBRGCSafePointTTL)) + } + if err := denyLightning.ConsistentWithPrev(res); err != nil { + log.Warn("lightning hasn't been denied, the backup archive may not be usable.", logutil.ShortError(err)) + } }() } diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel index a291d68df5b12..5ce85cbd1313f 100644 --- a/br/pkg/task/operator/BUILD.bazel +++ b/br/pkg/task/operator/BUILD.bazel @@ -13,8 +13,10 @@ go_library( "//br/pkg/pdutil", "//br/pkg/task", "//br/pkg/utils", + "@com_github_pingcap_errors//:errors", "@com_github_pingcap_log//:log", "@com_github_spf13_pflag//:pflag", + "@org_golang_google_grpc//keepalive", "@org_golang_x_sync//errgroup", "@org_uber_go_zap//:zap", ], diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index 95c922b1c19cf..909d18911c8d0 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -6,8 +6,10 @@ import ( "context" "crypto/tls" "strings" + "sync" "time" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/pdutil" @@ -15,6 +17,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/keepalive" ) func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) { @@ -34,48 +37,129 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) return mgr, nil } -func cleanUpWith(f func(ctx context.Context)) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) +func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWith(f func(ctx context.Context)) { + _ = cx.cleanUpWithErr(func(ctx context.Context) error { f(ctx); return nil }) +} + +func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithErr(f func(ctx context.Context) error) error { + ctx, cancel := context.WithTimeout(context.Background(), cx.cfg.TTL) defer cancel() - f(ctx) + return f(ctx) } -// PauseGCAndScheduler blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config. +type AdaptEnvForSnapshotBackupContext struct { + context.Context + + pdMgr *pdutil.PdController + kvMgr *utils.StoreManager + cfg PauseGcConfig + + rdGrp sync.WaitGroup + runGrp *errgroup.Group +} + +func (cx *AdaptEnvForSnapshotBackupContext) ReadyL(name string, notes ...zap.Field) { + logutil.CL(cx).Info("Stage ready.", append(notes, zap.String("component", name))...) + cx.rdGrp.Done() +} + +func hintAllReady() { + // Hacking: some version of operators using the follow two logs to check whether we are ready... + log.Info("Schedulers are paused.") + log.Info("GC is paused.") + log.Info("All ready.") +} + +// AdaptEnvForSnapshotBackup blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config. // This function will block until the context being canceled. -func PauseGCAndScheduler(ctx context.Context, cfg *PauseGcConfig) error { +func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { mgr, err := dialPD(ctx, &cfg.Config) if err != nil { - return err + return errors.Annotate(err, "failed to dial PD") } - + var tconf *tls.Config + if cfg.TLS.IsEnabled() { + tconf, err = cfg.TLS.ToTLSConfig() + if err != nil { + return errors.Annotate(err, "invalid tls config") + } + } + kvMgr := utils.NewStoreManager(mgr.GetPDClient(), keepalive.ClientParameters{ + Time: cfg.Config.GRPCKeepaliveTime, + Timeout: cfg.Config.GRPCKeepaliveTimeout, + }, tconf) eg, ectx := errgroup.WithContext(ctx) + cx := &AdaptEnvForSnapshotBackupContext{ + Context: logutil.ContextWithField(ectx, zap.String("tag", "br_operator")), + pdMgr: mgr, + kvMgr: kvMgr, + cfg: *cfg, + rdGrp: sync.WaitGroup{}, + runGrp: eg, + } + cx.rdGrp.Add(3) - eg.Go(func() error { return pauseGCKeeper(ectx, cfg, mgr) }) - eg.Go(func() error { return pauseSchedulerKeeper(ectx, mgr) }) + eg.Go(func() error { return pauseGCKeeper(cx) }) + eg.Go(func() error { return pauseSchedulerKeeper(cx) }) + eg.Go(func() error { return pauseImporting(cx) }) + go func() { + cx.rdGrp.Wait() + hintAllReady() + }() return eg.Wait() } -func pauseGCKeeper(ctx context.Context, cfg *PauseGcConfig, ctl *pdutil.PdController) error { +func pauseImporting(cx *AdaptEnvForSnapshotBackupContext) error { + denyLightning := utils.NewSuspendImporting("prepare_for_snapshot_backup", cx.kvMgr) + if _, err := denyLightning.DenyAllStores(cx, cx.cfg.TTL); err != nil { + return errors.Trace(err) + } + cx.ReadyL("pause_lightning") + cx.runGrp.Go(func() error { + err := denyLightning.Keeper(cx, cx.cfg.TTL) + if errors.Cause(err) != context.Canceled { + logutil.CL(cx).Warn("keeper encounters error.", logutil.ShortError(err)) + } + return cx.cleanUpWithErr(func(ctx context.Context) error { + for { + if ctx.Err() != nil { + return errors.Annotate(ctx.Err(), "cleaning up timed out") + } + res, err := denyLightning.AllowAllStores(ctx) + if err != nil { + logutil.CL(ctx).Warn("Failed to restore lightning, will retry.", logutil.ShortError(err)) + // Retry for 10 times. + time.Sleep(cx.cfg.TTL / 10) + continue + } + return denyLightning.ConsistentWithPrev(res) + } + }) + }) + return nil +} + +func pauseGCKeeper(ctx *AdaptEnvForSnapshotBackupContext) error { // Note: should we remove the service safepoint as soon as this exits? sp := utils.BRServiceSafePoint{ ID: utils.MakeSafePointID(), - TTL: int64(cfg.TTL.Seconds()), - BackupTS: cfg.SafePoint, + TTL: int64(ctx.cfg.TTL.Seconds()), + BackupTS: ctx.cfg.SafePoint, } if sp.BackupTS == 0 { - rts, err := ctl.GetMinResolvedTS(ctx) + rts, err := ctx.pdMgr.GetMinResolvedTS(ctx) if err != nil { return err } - log.Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts)) + logutil.CL(ctx).Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts)) sp.BackupTS = rts } - err := utils.StartServiceSafePointKeeper(ctx, ctl.GetPDClient(), sp) + err := utils.StartServiceSafePointKeeper(ctx, ctx.pdMgr.GetPDClient(), sp) if err != nil { return err } - log.Info("GC is paused.", zap.Object("safepoint", sp)) + ctx.ReadyL("pause_gc", zap.Object("safepoint", sp)) // Note: in fact we can directly return here. // But the name `keeper` implies once the function exits, // the GC should be resume, so let's block here. @@ -83,10 +167,10 @@ func pauseGCKeeper(ctx context.Context, cfg *PauseGcConfig, ctl *pdutil.PdContro return nil } -func pauseSchedulerKeeper(ctx context.Context, ctl *pdutil.PdController) error { - undo, err := ctl.RemoveAllPDSchedulers(ctx) +func pauseSchedulerKeeper(ctx *AdaptEnvForSnapshotBackupContext) error { + undo, err := ctx.pdMgr.RemoveAllPDSchedulers(ctx) if undo != nil { - defer cleanUpWith(func(ctx context.Context) { + defer ctx.cleanUpWith(func(ctx context.Context) { if err := undo(ctx); err != nil { log.Warn("failed to restore pd scheduler.", logutil.ShortError(err)) } @@ -95,7 +179,7 @@ func pauseSchedulerKeeper(ctx context.Context, ctl *pdutil.PdController) error { if err != nil { return err } - log.Info("Schedulers are paused.") + ctx.ReadyL("pause_scheduler") // Wait until the context canceled. // So we can properly do the clean up work. <-ctx.Done() diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go index eb7e12a49af56..998fdc64d961e 100644 --- a/br/pkg/task/operator/config.go +++ b/br/pkg/task/operator/config.go @@ -16,7 +16,7 @@ type PauseGcConfig struct { TTL time.Duration `json:"ttl" yaml:"ttl"` } -func DefineFlagsForPauseGcConfig(f *pflag.FlagSet) { +func DefineFlagsForPrepareSnapBackup(f *pflag.FlagSet) { _ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.") _ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.") } diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 2484988c870dd..b455237dbf506 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "schema.go", "sensitive.go", "store_manager.go", + "suspend_importing.go", "worker.go", ], importpath = "github.com/pingcap/tidb/br/pkg/utils", @@ -36,6 +37,7 @@ go_library( "//parser/types", "//sessionctx", "//util", + "//util/engine", "//util/sqlexec", "@com_github_cheggaaa_pb_v3//:pb", "@com_github_cznic_mathutil//:mathutil", @@ -43,6 +45,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//oracle", @@ -80,6 +83,7 @@ go_test( "safe_point_test.go", "schema_test.go", "sensitive_test.go", + "suspend_importing_test.go", ], embed = [":utils"], flaky = True, @@ -103,11 +107,15 @@ go_test( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/encryptionpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", + "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", + "@org_golang_x_sync//errgroup", "@org_uber_go_goleak//:goleak", "@org_uber_go_multierr//:multierr", ], diff --git a/br/pkg/utils/store_manager.go b/br/pkg/utils/store_manager.go index 8dd50a2a57c82..9cf0c3c27ee9c 100644 --- a/br/pkg/utils/store_manager.go +++ b/br/pkg/utils/store_manager.go @@ -147,6 +147,7 @@ func (mgr *StoreManager) getGrpcConnLocked(ctx context.Context, storeID uint64) if addr == "" { addr = store.GetAddress() } + log.Info("StoreManager: dialing to store.", zap.String("address", addr), zap.Uint64("store-id", storeID)) conn, err := grpc.DialContext( ctx, addr, diff --git a/br/pkg/utils/suspend_importing.go b/br/pkg/utils/suspend_importing.go new file mode 100644 index 0000000000000..c2df70229c525 --- /dev/null +++ b/br/pkg/utils/suspend_importing.go @@ -0,0 +1,144 @@ +package utils + +import ( + "context" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/metapb" + berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/util/engine" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +const ( + DenyLightningUpdateFrequency = 5 +) + +func (mgr *StoreManager) GetAllStores(ctx context.Context) ([]*metapb.Store, error) { + return mgr.PDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) +} + +func (mgr *StoreManager) GetDenyLightningClient(ctx context.Context, storeID uint64) (SuspendImportingClient, error) { + var cli import_sstpb.ImportSSTClient + err := mgr.WithConn(ctx, storeID, func(cc *grpc.ClientConn) { + cli = import_sstpb.NewImportSSTClient(cc) + }) + if err != nil { + return nil, err + } + return cli, nil +} + +type SuspendImportingEnv interface { + GetAllStores(ctx context.Context) ([]*metapb.Store, error) + GetDenyLightningClient(ctx context.Context, storeID uint64) (SuspendImportingClient, error) +} + +type SuspendImportingClient interface { + // Temporarily disable ingest / download / write for data listeners don't support catching import data. + SuspendImportRPC(ctx context.Context, in *import_sstpb.SuspendImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.SuspendImportRPCResponse, error) +} + +type SuspendImporting struct { + env SuspendImportingEnv + name string +} + +func NewSuspendImporting(name string, env SuspendImportingEnv) *SuspendImporting { + return &SuspendImporting{ + env: env, + name: name, + } +} + +// DenyAllStores tries to deny all current stores' lightning execution for the period of time. +// Returns a map mapping store ID to whether they are already denied to import tasks. +func (d *SuspendImporting) DenyAllStores(ctx context.Context, dur time.Duration) (map[uint64]bool, error) { + return d.forEachStores(ctx, func() *import_sstpb.SuspendImportRPCRequest { + return &import_sstpb.SuspendImportRPCRequest{ + ShouldSuspendImports: true, + DurationInSecs: uint64(dur.Seconds()), + Caller: d.name, + } + }) +} + +func (d *SuspendImporting) AllowAllStores(ctx context.Context) (map[uint64]bool, error) { + return d.forEachStores(ctx, func() *import_sstpb.SuspendImportRPCRequest { + return &import_sstpb.SuspendImportRPCRequest{ + ShouldSuspendImports: false, + Caller: d.name, + } + }) +} + +// forEachStores send the request to each stores reachable. +// Returns a map mapping store ID to whether they are already denied to import tasks. +func (d *SuspendImporting) forEachStores(ctx context.Context, makeReq func() *import_sstpb.SuspendImportRPCRequest) (map[uint64]bool, error) { + stores, err := d.env.GetAllStores(ctx) + if err != nil { + return nil, errors.Annotate(err, "failed to get all stores") + } + + result := map[uint64]bool{} + for _, store := range stores { + logutil.CL(ctx).Info("Handling store.", zap.Stringer("store", store)) + if engine.IsTiFlash(store) { + logutil.CL(ctx).Info("Store is tiflash, skipping.", zap.Stringer("store", store)) + continue + } + cli, err := d.env.GetDenyLightningClient(ctx, store.Id) + if err != nil { + return nil, errors.Annotatef(err, "failed to get client for store %d", store.Id) + } + req := makeReq() + resp, err := cli.SuspendImportRPC(ctx, req) + if err != nil { + return nil, errors.Annotatef(err, "failed to deny lightning rpc for store %d", store.Id) + } + result[store.Id] = resp.AlreadySuspended + } + return result, nil +} + +// HasKeptDenying checks whether a result returned by `DenyAllStores` is able to keep the consistency with last request. +// i.e. Whether the store has some holes of pausing the import requests. +func (d *SuspendImporting) ConsistentWithPrev(result map[uint64]bool) error { + for storeId, denied := range result { + if !denied { + return errors.Annotatef(berrors.ErrPossibleInconsistency, "failed to keep importing to store %d being denied, the state might be inconsistency", storeId) + } + } + return nil +} + +func (d *SuspendImporting) Keeper(ctx context.Context, ttl time.Duration) error { + lastSuccess := time.Now() + t := time.NewTicker(ttl / DenyLightningUpdateFrequency) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + res, err := d.DenyAllStores(ctx, ttl) + if err != nil { + if time.Since(lastSuccess) < ttl { + logutil.CL(ctx).Warn("Failed to send deny one of the stores.", logutil.ShortError(err)) + continue + } + return err + } + if err := d.ConsistentWithPrev(res); err != nil { + return err + } + + lastSuccess = time.Now() + } + } +} diff --git a/br/pkg/utils/suspend_importing_test.go b/br/pkg/utils/suspend_importing_test.go new file mode 100644 index 0000000000000..8ee04af072048 --- /dev/null +++ b/br/pkg/utils/suspend_importing_test.go @@ -0,0 +1,209 @@ +package utils_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type ImportTargetStore struct { + mu sync.Mutex + Id uint64 + LastSuccessDenyCall time.Time + SuspendImportFor time.Duration + SuspendedImport bool + + ErrGen func() error +} + +type ImportTargetStores struct { + mu sync.Mutex + items map[uint64]*ImportTargetStore +} + +func initWithIDs(ids []int) *ImportTargetStores { + ss := &ImportTargetStores{ + items: map[uint64]*ImportTargetStore{}, + } + for _, id := range ids { + store := new(ImportTargetStore) + store.Id = uint64(id) + ss.items[uint64(id)] = store + } + return ss +} + +func (s *ImportTargetStores) GetAllStores(ctx context.Context) ([]*metapb.Store, error) { + s.mu.Lock() + defer s.mu.Unlock() + + stores := make([]*metapb.Store, 0, len(s.items)) + for _, store := range s.items { + stores = append(stores, &metapb.Store{Id: store.Id}) + } + return stores, nil +} + +func (s *ImportTargetStores) GetDenyLightningClient(ctx context.Context, storeID uint64) (utils.SuspendImportingClient, error) { + s.mu.Lock() + defer s.mu.Unlock() + + store, ok := s.items[storeID] + if !ok { + return nil, errors.Trace(fmt.Errorf("store %d not found", storeID)) + } + + return store, nil +} + +// Temporarily disable ingest / download / write for data listeners don't support catching import data. +func (s *ImportTargetStore) SuspendImportRPC(ctx context.Context, in *import_sstpb.SuspendImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.SuspendImportRPCResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.ErrGen != nil { + if err := s.ErrGen(); err != nil { + return nil, s.ErrGen() + } + } + + suspended := s.SuspendedImport + if in.ShouldSuspendImports { + s.SuspendedImport = true + s.SuspendImportFor = time.Duration(in.DurationInSecs) * time.Second + s.LastSuccessDenyCall = time.Now() + } else { + s.SuspendedImport = false + } + return &import_sstpb.SuspendImportRPCResponse{ + AlreadySuspended: suspended, + }, nil +} + +func (s *ImportTargetStores) assertAllStoresDenied(t *testing.T) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, store := range s.items { + func() { + store.mu.Lock() + defer store.mu.Unlock() + + require.True(t, store.SuspendedImport, "ID = %d", store.Id) + require.Less(t, time.Since(store.LastSuccessDenyCall), store.SuspendImportFor, "ID = %d", store.Id) + }() + } +} + +func TestBasic(t *testing.T) { + req := require.New(t) + + ss := initWithIDs([]int{1, 4, 5}) + deny := utils.NewSuspendImporting(t.Name(), ss) + + ctx := context.Background() + res, err := deny.DenyAllStores(ctx, 10*time.Second) + req.NoError(err) + req.Error(deny.ConsistentWithPrev(res)) + for id, inner := range ss.items { + req.True(inner.SuspendedImport, "at %d", id) + req.Equal(inner.SuspendImportFor, 10*time.Second, "at %d", id) + } + + res, err = deny.DenyAllStores(ctx, 10*time.Second) + req.NoError(err) + req.NoError(deny.ConsistentWithPrev(res)) + + res, err = deny.AllowAllStores(ctx) + req.NoError(err) + req.NoError(deny.ConsistentWithPrev(res)) +} + +func TestKeeperError(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + ss := initWithIDs([]int{1, 4, 5}) + deny := utils.NewSuspendImporting(t.Name(), ss) + ttl := time.Second + + now := time.Now() + triggeredErr := uint32(0) + _, err := deny.DenyAllStores(ctx, ttl) + req.NoError(err) + + ss.items[4].ErrGen = func() error { + if time.Since(now) > 600*time.Millisecond { + return nil + } + triggeredErr += 1 + return status.Error(codes.Unavailable, "the store is slacking.") + } + + cx, cancel := context.WithCancel(ctx) + + wg := new(errgroup.Group) + wg.Go(func() error { return deny.Keeper(cx, ttl) }) + time.Sleep(ttl) + cancel() + req.ErrorIs(wg.Wait(), context.Canceled) + req.Positive(triggeredErr) +} + +func TestKeeperErrorExit(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + ss := initWithIDs([]int{1, 4, 5}) + deny := utils.NewSuspendImporting(t.Name(), ss) + ttl := time.Second + + triggeredErr := uint32(0) + _, err := deny.DenyAllStores(ctx, ttl) + req.NoError(err) + + ss.items[4].ErrGen = func() error { + triggeredErr += 1 + return status.Error(codes.Unavailable, "the store is slacking.") + } + + wg := new(errgroup.Group) + wg.Go(func() error { return deny.Keeper(ctx, ttl) }) + time.Sleep(ttl) + req.Error(wg.Wait()) + req.Positive(triggeredErr) +} + +func TestKeeperCalled(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + ss := initWithIDs([]int{1, 4, 5}) + deny := utils.NewSuspendImporting(t.Name(), ss) + ttl := 1 * time.Second + + _, err := deny.DenyAllStores(ctx, ttl) + req.NoError(err) + + cx, cancel := context.WithCancel(ctx) + wg := new(errgroup.Group) + wg.Go(func() error { return deny.Keeper(cx, ttl) }) + for i := 0; i < 20; i++ { + ss.assertAllStoresDenied(t) + time.Sleep(ttl / 10) + } + cancel() + req.ErrorIs(wg.Wait(), context.Canceled) +} diff --git a/errors.toml b/errors.toml index b793e284e314a..80cb7623922a8 100644 --- a/errors.toml +++ b/errors.toml @@ -146,6 +146,11 @@ error = ''' storage is not tikv ''' +["BR:KV:ErrPossibleInconsistency"] +error = ''' +the cluster state might be inconsistent +''' + ["BR:PD:ErrPDBatchScanRegion"] error = ''' batch scan region diff --git a/go.mod b/go.mod index 3caef0c1552bd..9e46f5cbb1ac1 100644 --- a/go.mod +++ b/go.mod @@ -71,7 +71,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20230524051921-3dc79e773139 + github.com/pingcap/kvproto v0.0.0-20230928035022-1bdcc25ed63c github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e diff --git a/go.sum b/go.sum index 110d9be26fb21..75600b08fe5f7 100644 --- a/go.sum +++ b/go.sum @@ -780,9 +780,15 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059/go.mod h1:fMRU1BA1y+r89 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +<<<<<<< HEAD github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/kvproto v0.0.0-20230524051921-3dc79e773139 h1:SnvWHM4JSkn9TFLIjrSRanpliqnmgk+y0MuoXC77y6I= github.com/pingcap/kvproto v0.0.0-20230524051921-3dc79e773139/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +======= +github.com/pingcap/kvproto v0.0.0-20230726063044-73d6d7f3756b/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20230928035022-1bdcc25ed63c h1:tBKPWWqgWEBs04BV4UN7RhtUkZDs0oz+WyMbtRDVtL8= +github.com/pingcap/kvproto v0.0.0-20230928035022-1bdcc25ed63c/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +>>>>>>> f961aa502a (snapshot_backup: deny executing tidb-lightning import while running snapshot_backup (#47001) (#47341)) github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=