From a1905aa449cf732d884a15da123e5017753a238d Mon Sep 17 00:00:00 2001 From: Will DeVries Date: Mon, 6 Jan 2025 22:36:36 -0800 Subject: [PATCH 1/6] Simplify the snapshot code. --- pkg/executor/workloadrepo.go | 6 +- pkg/util/workloadrepo/const.go | 11 ++-- pkg/util/workloadrepo/snapshot.go | 91 +++++++++------------------- pkg/util/workloadrepo/worker.go | 21 ++++--- pkg/util/workloadrepo/worker_test.go | 4 +- 5 files changed, 53 insertions(+), 80 deletions(-) diff --git a/pkg/executor/workloadrepo.go b/pkg/executor/workloadrepo.go index 965fa602b0602..316c7c9d59448 100644 --- a/pkg/executor/workloadrepo.go +++ b/pkg/executor/workloadrepo.go @@ -22,7 +22,7 @@ import ( ) // TakeSnapshot is a hook from workload repo that may trigger manual snapshot. -var TakeSnapshot func() error +var TakeSnapshot func(context.Context) error // WorkloadRepoCreateExec indicates WorkloadRepoCreate executor. type WorkloadRepoCreateExec struct { @@ -30,9 +30,9 @@ type WorkloadRepoCreateExec struct { } // Next implements the Executor Next interface. -func (*WorkloadRepoCreateExec) Next(context.Context, *chunk.Chunk) error { +func (*WorkloadRepoCreateExec) Next(ctx context.Context, _ *chunk.Chunk) error { if TakeSnapshot != nil { - return TakeSnapshot() + return TakeSnapshot(ctx) } return nil } diff --git a/pkg/util/workloadrepo/const.go b/pkg/util/workloadrepo/const.go index df29d5b81a75e..6752ef473c17a 100644 --- a/pkg/util/workloadrepo/const.go +++ b/pkg/util/workloadrepo/const.go @@ -24,12 +24,9 @@ import ( ) const ( - ownerKey = "/tidb/workloadrepo/owner" - promptKey = "workloadrepo" - snapIDKey = "/tidb/workloadrepo/snap_id" - snapCommandKey = "/tidb/workloadrepo/snap_command" - - snapCommandTake = "take_snapshot" + ownerKey = "/tidb/workloadrepo/owner" + promptKey = "workloadrepo" + snapIDKey = "/tidb/workloadrepo/snap_id" etcdOpTimeout = 5 * time.Second snapshotRetries = 5 @@ -49,4 +46,6 @@ var ( errWrongValueForVar = dbterror.ClassUtil.NewStd(errno.ErrWrongValueForVar) errUnsupportedEtcdRequired = dbterror.ClassUtil.NewStdErr(errno.ErrNotSupportedYet, mysql.Message("etcd client required for workload repository", nil)) + errWorkloadNotStarted = dbterror.ClassUtil.NewStdErr(errno.ErrNotSupportedYet, mysql.Message("Workload repository is not enabled", nil)) + errCouldNotStartSnapshot = dbterror.ClassUtil.NewStdErr(errno.ErrUnknown, mysql.Message("Snapshot initiation failed", nil)) ) diff --git a/pkg/util/workloadrepo/snapshot.go b/pkg/util/workloadrepo/snapshot.go index fd14e7bc4bf7d..6bc330dea9c9f 100644 --- a/pkg/util/workloadrepo/snapshot.go +++ b/pkg/util/workloadrepo/snapshot.go @@ -89,16 +89,6 @@ func (w *worker) getSnapID(ctx context.Context) (uint64, error) { return strconv.ParseUint(snapIDStr, 10, 64) } -func (w *worker) updateSnapID(ctx context.Context, oid, nid uint64) error { - return w.etcdCAS(ctx, snapIDKey, - strconv.FormatUint(oid, 10), - strconv.FormatUint(nid, 10)) -} - -func (w *worker) createSnapID(ctx context.Context, nid uint64) error { - return w.etcdCreate(ctx, snapIDKey, strconv.FormatUint(nid, 10)) -} - func upsertHistSnapshot(ctx context.Context, sctx sessionctx.Context, snapID uint64) error { // TODO: fill DB_VER, WR_VER snapshotsInsert := sqlescape.MustEscapeSQL("INSERT INTO %n.%n (`BEGIN_TIME`, `SNAP_ID`) VALUES (now(), %%?) ON DUPLICATE KEY UPDATE `BEGIN_TIME` = now()", @@ -107,7 +97,11 @@ func upsertHistSnapshot(ctx context.Context, sctx sessionctx.Context, snapID uin return err } -func updateHistSnapshot(ctx context.Context, sctx sessionctx.Context, snapID uint64, errs []error) error { +func (w *worker) updateHistSnapshot(ctx context.Context, snapID uint64, errs []error) error { + _sessctx := w.getSessionWithRetry() + defer w.sesspool.Put(_sessctx) + sctx := _sessctx.(sessionctx.Context) + var nerr any if err := stderrors.Join(errs...); err != nil { nerr = err.Error() @@ -136,34 +130,17 @@ func (w *worker) snapshotTable(ctx context.Context, snapID uint64, rt *repositor return nil } -func (w *worker) takeSnapshot(ctx context.Context, sess sessionctx.Context, sendCommand bool) { - // coordination logic - if !w.owner.IsOwner() { - if sendCommand { - command, err := w.etcdGet(ctx, snapCommandKey) - if err != nil { - logutil.BgLogger().Info("workload repository cannot get current snap command value", zap.NamedError("err", err)) - return - } - - if command == "" { - err = w.etcdCreate(ctx, snapCommandKey, snapCommandTake) - } else { - err = w.etcdCAS(ctx, snapCommandKey, command, snapCommandTake) - } - - if err != nil { - logutil.BgLogger().Info("workload repository cannot send snapshot command", zap.NamedError("err", err)) - return - } - } - return - } +func (w *worker) takeSnapshot(ctx context.Context) (uint64, error) { + _sessctx := w.getSessionWithRetry() + defer w.sesspool.Put(_sessctx) + sess := _sessctx.(sessionctx.Context) + var snapID uint64 + var err error = nil for range snapshotRetries { - snapID, err := w.getSnapID(ctx) + snapID, err = w.getSnapID(ctx) if err != nil { - logutil.BgLogger().Info("workload repository cannot get current snapid", zap.NamedError("err", err)) + err = fmt.Errorf("cannot get current snapid: %w", err) continue } @@ -174,57 +151,43 @@ func (w *worker) takeSnapshot(ctx context.Context, sess sessionctx.Context, send // due to another owner winning the etcd CAS loop. // While undesirable, this scenario is acceptable since both owners would // likely share similar datetime values and same cluster version. - if err := upsertHistSnapshot(ctx, sess, snapID+1); err != nil { - logutil.BgLogger().Info("workload repository could not insert into hist_snapshots", zap.NamedError("err", err)) + if err = upsertHistSnapshot(ctx, sess, snapID+1); err != nil { + err = fmt.Errorf("could not insert into hist_snapshots: %w", err) continue } if snapID == 0 { - err = w.createSnapID(ctx, snapID+1) + err = w.etcdCreate(ctx, snapIDKey, strconv.FormatUint(snapID+1, 10)) } else { - err = w.updateSnapID(ctx, snapID, snapID+1) + err = w.etcdCAS(ctx, snapIDKey, strconv.FormatUint(snapID, 10), strconv.FormatUint(snapID+1, 10)) } if err != nil { - logutil.BgLogger().Info("workload repository cannot update current snapid", zap.Uint64("new_id", snapID), zap.NamedError("err", err)) + err = fmt.Errorf("cannot update current snapid to %d: %w", snapID, err) continue } - logutil.BgLogger().Info("workload repository fired snapshot", zap.String("owner", w.instanceID), zap.Uint64("snapID", snapID+1)) break } + + // return the last error seen, if it ended on an error + return snapID, err } func (w *worker) startSnapshot(_ctx context.Context) func() { return func() { w.resetSnapshotInterval(w.snapshotInterval) - _sessctx := w.getSessionWithRetry() - defer w.sesspool.Put(_sessctx) - sess := _sessctx.(sessionctx.Context) - // this is for etcd watch // other wise wch won't be collected after the exit of this function ctx, cancel := context.WithCancel(_ctx) defer cancel() snapIDCh := w.etcdClient.Watch(ctx, snapIDKey) - snapCmdCh := w.etcdClient.Watch(ctx, snapCommandKey) for { select { case <-ctx.Done(): return - case resp := <-snapCmdCh: - if len(resp.Events) < 1 { - continue - } - - // same as snapID events - // we only catch the last event if possible - snapCommandStr := string(resp.Events[len(resp.Events)-1].Kv.Value) - if snapCommandStr == snapCommandTake { - w.takeSnapshot(ctx, sess, false) - } case resp := <-snapIDCh: if len(resp.Events) < 1 { // since there is no event, we don't know the latest snapid either @@ -261,13 +224,17 @@ func (w *worker) startSnapshot(_ctx context.Context) func() { } wg.Wait() - if err := updateHistSnapshot(ctx, sess, snapID, errs); err != nil { + if err := w.updateHistSnapshot(ctx, snapID, errs); err != nil { logutil.BgLogger().Info("workload repository snapshot failed: could not update hist_snapshots", zap.NamedError("err", err)) } - case <-w.snapshotChan: - w.takeSnapshot(ctx, sess, true) case <-w.snapshotTicker.C: - w.takeSnapshot(ctx, sess, false) + if w.owner.IsOwner() { + if snapID, err := w.takeSnapshot(ctx); err != nil { + logutil.BgLogger().Info("workload repository snapshot failed", zap.NamedError("err", err)) + } else { + logutil.BgLogger().Info("workload repository ran snapshot", zap.String("owner", w.instanceID), zap.Uint64("snapID", snapID)) + } + } } } } diff --git a/pkg/util/workloadrepo/worker.go b/pkg/util/workloadrepo/worker.go index 3705ff67466cd..a86ed6f6019b1 100644 --- a/pkg/util/workloadrepo/worker.go +++ b/pkg/util/workloadrepo/worker.go @@ -121,17 +121,26 @@ type worker struct { samplingTicker *time.Ticker snapshotInterval int32 snapshotTicker *time.Ticker - snapshotChan chan struct{} retentionDays int32 } var workerCtx = worker{} -func takeSnapshot() error { - if workerCtx.snapshotChan == nil { - return errors.New("Workload repository is not enabled yet") +func takeSnapshot(ctx context.Context) error { + workerCtx.Lock() + defer workerCtx.Unlock() + + if !workerCtx.enabled { + return errWorkloadNotStarted.GenWithStackByArgs() } - workerCtx.snapshotChan <- struct{}{} + + if snapID, err := workerCtx.takeSnapshot(ctx); err != nil { + logutil.BgLogger().Info("workload repository manual snapshot failed", zap.String("owner", workerCtx.instanceID), zap.NamedError("err", err)) + return errCouldNotStartSnapshot.GenWithStackByArgs() + } else { + logutil.BgLogger().Info("workload repository ran manual snapshot", zap.String("owner", workerCtx.instanceID), zap.Uint64("snapID", snapID)) + } + return nil } @@ -360,7 +369,6 @@ func (w *worker) start() error { } _ = stmtsummary.StmtSummaryByDigestMap.SetHistoryEnabled(false) - w.snapshotChan = make(chan struct{}, 1) ctx, cancel := context.WithCancel(context.Background()) w.cancel = cancel w.wg.RunWithRecover(w.startRepository(ctx), func(err any) { @@ -388,7 +396,6 @@ func (w *worker) stop() { } w.cancel = nil - w.snapshotChan = nil } // setRepositoryDest will change the dest of workload snapshot. diff --git a/pkg/util/workloadrepo/worker_test.go b/pkg/util/workloadrepo/worker_test.go index 77436eff594d3..50c44c0c89fc1 100644 --- a/pkg/util/workloadrepo/worker_test.go +++ b/pkg/util/workloadrepo/worker_test.go @@ -169,13 +169,13 @@ func TestRaceToCreateTablesWorker(t *testing.T) { require.Len(t, res, 0) // manually trigger snapshot by sending a tick to all workers - wrk1.snapshotChan <- struct{}{} + wrk1.takeSnapshot(ctx) require.Eventually(t, func() bool { res := tk.MustQuery("select snap_id, count(*) from workload_schema.hist_snapshots group by snap_id").Rows() return len(res) == 1 }, time.Minute, time.Second) - wrk2.snapshotChan <- struct{}{} + wrk2.takeSnapshot(ctx) require.Eventually(t, func() bool { res := tk.MustQuery("select snap_id, count(*) from workload_schema.hist_snapshots group by snap_id").Rows() return len(res) == 2 From a33f0a28060a9a780d55e2f70bc860b95b083deb Mon Sep 17 00:00:00 2001 From: Will DeVries Date: Fri, 31 Jan 2025 20:49:51 -0800 Subject: [PATCH 2/6] Fix ci issues. --- pkg/util/workloadrepo/snapshot.go | 2 +- pkg/util/workloadrepo/worker.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/util/workloadrepo/snapshot.go b/pkg/util/workloadrepo/snapshot.go index 6bc330dea9c9f..59ef00754409a 100644 --- a/pkg/util/workloadrepo/snapshot.go +++ b/pkg/util/workloadrepo/snapshot.go @@ -136,7 +136,7 @@ func (w *worker) takeSnapshot(ctx context.Context) (uint64, error) { sess := _sessctx.(sessionctx.Context) var snapID uint64 - var err error = nil + var err error for range snapshotRetries { snapID, err = w.getSnapID(ctx) if err != nil { diff --git a/pkg/util/workloadrepo/worker.go b/pkg/util/workloadrepo/worker.go index a86ed6f6019b1..a2e643320f49b 100644 --- a/pkg/util/workloadrepo/worker.go +++ b/pkg/util/workloadrepo/worker.go @@ -139,9 +139,8 @@ func takeSnapshot(ctx context.Context) error { return errCouldNotStartSnapshot.GenWithStackByArgs() } else { logutil.BgLogger().Info("workload repository ran manual snapshot", zap.String("owner", workerCtx.instanceID), zap.Uint64("snapID", snapID)) + return nil } - - return nil } func init() { From aaa969f95033a46c85ef4382d2a589dc52da28c8 Mon Sep 17 00:00:00 2001 From: Will DeVries Date: Fri, 31 Jan 2025 20:59:52 -0800 Subject: [PATCH 3/6] Fix ci issues. --- pkg/util/workloadrepo/worker.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/util/workloadrepo/worker.go b/pkg/util/workloadrepo/worker.go index a2e643320f49b..9f2971eeb7a28 100644 --- a/pkg/util/workloadrepo/worker.go +++ b/pkg/util/workloadrepo/worker.go @@ -134,13 +134,14 @@ func takeSnapshot(ctx context.Context) error { return errWorkloadNotStarted.GenWithStackByArgs() } - if snapID, err := workerCtx.takeSnapshot(ctx); err != nil { + snapID, err := workerCtx.takeSnapshot(ctx) + if err != nil { logutil.BgLogger().Info("workload repository manual snapshot failed", zap.String("owner", workerCtx.instanceID), zap.NamedError("err", err)) return errCouldNotStartSnapshot.GenWithStackByArgs() - } else { - logutil.BgLogger().Info("workload repository ran manual snapshot", zap.String("owner", workerCtx.instanceID), zap.Uint64("snapID", snapID)) - return nil } + + logutil.BgLogger().Info("workload repository ran manual snapshot", zap.String("owner", workerCtx.instanceID), zap.Uint64("snapID", snapID)) + return nil } func init() { From c47c469227559bb04065293ad0a26e6bc7912724 Mon Sep 17 00:00:00 2001 From: Will DeVries Date: Mon, 10 Feb 2025 11:13:33 -0800 Subject: [PATCH 4/6] Add comments to clarify code. --- pkg/util/workloadrepo/snapshot.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/util/workloadrepo/snapshot.go b/pkg/util/workloadrepo/snapshot.go index 59ef00754409a..6e25555ae1b75 100644 --- a/pkg/util/workloadrepo/snapshot.go +++ b/pkg/util/workloadrepo/snapshot.go @@ -130,6 +130,8 @@ func (w *worker) snapshotTable(ctx context.Context, snapID uint64, rt *repositor return nil } +// takeSnapshot increments the value of snapIDKey, which triggers the tidb +// nodes to run the snapshot process. See the code in startSnapshot(). func (w *worker) takeSnapshot(ctx context.Context) (uint64, error) { _sessctx := w.getSessionWithRetry() defer w.sesspool.Put(_sessctx) @@ -189,6 +191,7 @@ func (w *worker) startSnapshot(_ctx context.Context) func() { case <-ctx.Done(): return case resp := <-snapIDCh: + // This case is triggered by both by w.snapshotInterval and the SQL command, which calls w.takeSnapshot() directly. if len(resp.Events) < 1 { // since there is no event, we don't know the latest snapid either // really should not happen except creation From cbcc317a8f7bfac67d76bd02a6c0b711a64b3c6a Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 19 Feb 2025 11:28:58 +0800 Subject: [PATCH 5/6] workloadrepo: try to recover etcd snapID from table Signed-off-by: xhe --- pkg/util/workloadrepo/const.go | 3 ++ pkg/util/workloadrepo/snapshot.go | 25 +++++++-- pkg/util/workloadrepo/worker_test.go | 76 ++++++++++++++++++++++------ 3 files changed, 85 insertions(+), 19 deletions(-) diff --git a/pkg/util/workloadrepo/const.go b/pkg/util/workloadrepo/const.go index 6752ef473c17a..15868c0b63efa 100644 --- a/pkg/util/workloadrepo/const.go +++ b/pkg/util/workloadrepo/const.go @@ -15,6 +15,7 @@ package workloadrepo import ( + "errors" "time" "github.com/pingcap/tidb/pkg/errno" @@ -48,4 +49,6 @@ var ( errUnsupportedEtcdRequired = dbterror.ClassUtil.NewStdErr(errno.ErrNotSupportedYet, mysql.Message("etcd client required for workload repository", nil)) errWorkloadNotStarted = dbterror.ClassUtil.NewStdErr(errno.ErrNotSupportedYet, mysql.Message("Workload repository is not enabled", nil)) errCouldNotStartSnapshot = dbterror.ClassUtil.NewStdErr(errno.ErrUnknown, mysql.Message("Snapshot initiation failed", nil)) + + errKeyNotFound = errors.New("key not found") ) diff --git a/pkg/util/workloadrepo/snapshot.go b/pkg/util/workloadrepo/snapshot.go index 6e25555ae1b75..8a9673e0d79d4 100644 --- a/pkg/util/workloadrepo/snapshot.go +++ b/pkg/util/workloadrepo/snapshot.go @@ -77,14 +77,28 @@ func (w *worker) etcdCAS(ctx context.Context, key, oval, nval string) error { return nil } +func queryMaxSnapID(ctx context.Context, sctx sessionctx.Context) (uint64, error) { + query := sqlescape.MustEscapeSQL("SELECT MAX(`SNAP_ID`) FROM %n.%n", WorkloadSchema, histSnapshotsTable) + rs, err := runQuery(ctx, sctx, query) + if err != nil { + return 0, err + } + if len(rs) > 0 { + if rs[0].IsNull(0) { + return 0, nil + } + return rs[0].GetUint64(0), nil + } + return 0, errors.New("no rows returned when querying max snap id") +} + func (w *worker) getSnapID(ctx context.Context) (uint64, error) { snapIDStr, err := w.etcdGet(ctx, snapIDKey) if err != nil { return 0, err } if snapIDStr == "" { - // return zero when the key does not exist - return 0, nil + return 0, errKeyNotFound } return strconv.ParseUint(snapIDStr, 10, 64) } @@ -140,7 +154,12 @@ func (w *worker) takeSnapshot(ctx context.Context) (uint64, error) { var snapID uint64 var err error for range snapshotRetries { + isEmpty := false snapID, err = w.getSnapID(ctx) + if stderrors.Is(err, errKeyNotFound) { + snapID, err = queryMaxSnapID(ctx, sess) + isEmpty = true + } if err != nil { err = fmt.Errorf("cannot get current snapid: %w", err) continue @@ -158,7 +177,7 @@ func (w *worker) takeSnapshot(ctx context.Context) (uint64, error) { continue } - if snapID == 0 { + if isEmpty { err = w.etcdCreate(ctx, snapIDKey, strconv.FormatUint(snapID+1, 10)) } else { err = w.etcdCAS(ctx, snapIDKey, strconv.FormatUint(snapID, 10), strconv.FormatUint(snapID+1, 10)) diff --git a/pkg/util/workloadrepo/worker_test.go b/pkg/util/workloadrepo/worker_test.go index 50c44c0c89fc1..a4efe98a049ae 100644 --- a/pkg/util/workloadrepo/worker_test.go +++ b/pkg/util/workloadrepo/worker_test.go @@ -59,21 +59,7 @@ func setupWorkerForTest(ctx context.Context, etcdCli *clientv3.Client, dom *doma return wrk } -func setupDomainAndContext(t *testing.T) (context.Context, kv.Storage, *domain.Domain, string) { - ctx := context.Background() - ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers) - var cancel context.CancelFunc = nil - if ddl, ok := t.Deadline(); ok { - ctx, cancel = context.WithDeadline(ctx, ddl) - } - t.Cleanup(func() { - if cancel != nil { - cancel() - } - }) - - store, dom := testkit.CreateMockStoreAndDomain(t) - +func setupEtcd(t *testing.T) string { cfg := embed.NewConfig() cfg.Dir = t.TempDir() @@ -97,7 +83,25 @@ func setupDomainAndContext(t *testing.T) (context.Context, kv.Storage, *domain.D require.False(t, true, "server took too long to start") } - return ctx, store, dom, embedEtcd.Clients[0].Addr().String() + return embedEtcd.Clients[0].Addr().String() +} + +func setupDomainAndContext(t *testing.T) (context.Context, kv.Storage, *domain.Domain, string) { + ctx := context.Background() + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers) + var cancel context.CancelFunc = nil + if ddl, ok := t.Deadline(); ok { + ctx, cancel = context.WithDeadline(ctx, ddl) + } + t.Cleanup(func() { + if cancel != nil { + cancel() + } + }) + + store, dom := testkit.CreateMockStoreAndDomain(t) + etcdAddr := setupEtcd(t) + return ctx, store, dom, etcdAddr } func setupWorker(ctx context.Context, t *testing.T, addr string, dom *domain.Domain, id string, testWorker bool) *worker { @@ -857,3 +861,43 @@ func TestCalcNextTick(t *testing.T) { require.True(t, calcNextTick(time.Date(2024, 12, 7, 2, 0, 0, 1, loc)) == time.Hour*24-time.Nanosecond) require.True(t, calcNextTick(time.Date(2024, 12, 7, 1, 59, 59, 999999999, loc)) == time.Nanosecond) } + +func TestRecoverSnapID(t *testing.T) { + ctx, store, dom, addr := setupDomainAndContext(t) + worker := setupWorker(ctx, t, addr, dom, "worker1", true) + require.NoError(t, worker.setRepositoryDest(ctx, "table")) + now := time.Now() + + require.Eventually(t, func() bool { + return worker.checkTablesExists(ctx, now) + }, time.Minute, time.Second) + tk := testkit.NewTestKit(t, store) + prevSnapID := uint64(0) + require.Eventually(t, func() bool { + res := tk.MustQuery("select max(snap_id) from workload_schema.hist_snapshots").Rows() + if len(res) == 0 || len(res[0]) == 0 { + return false + } + snapID, err := strconv.ParseUint(res[0][0].(string), 10, 64) + prevSnapID = snapID + return err == nil && snapID > 0 + }, time.Minute, time.Second) + worker.stop() + + etcd2 := setupEtcd(t) + worker2 := setupWorker(ctx, t, etcd2, dom, "worker2", true) + snapIDStr, err := worker2.etcdGet(ctx, snapIDKey) + require.Equal(t, "", snapIDStr) + + _, err = worker2.getSnapID(ctx) + require.EqualError(t, errKeyNotFound, err.Error()) + newSnapID, err := queryMaxSnapID(ctx, worker2.getSessionWithRetry().(sessionctx.Context)) + require.Nil(t, err) + require.Equal(t, prevSnapID, newSnapID) + + require.NoError(t, worker2.setRepositoryDest(ctx, "table")) + require.Eventually(t, func() bool { + newSnapID, err = worker2.getSnapID(ctx) + return err == nil && newSnapID >= prevSnapID + }, time.Minute, time.Second) +} From b387176e832efc723deeafa5fecfe4fdcba62c8c Mon Sep 17 00:00:00 2001 From: xhe Date: Thu, 20 Feb 2025 12:48:14 +0800 Subject: [PATCH 6/6] fix ci Signed-off-by: xhe --- pkg/util/workloadrepo/BUILD.bazel | 2 +- pkg/util/workloadrepo/worker_test.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/util/workloadrepo/BUILD.bazel b/pkg/util/workloadrepo/BUILD.bazel index b885213d5662a..65fbac6f8bbce 100644 --- a/pkg/util/workloadrepo/BUILD.bazel +++ b/pkg/util/workloadrepo/BUILD.bazel @@ -50,7 +50,7 @@ go_test( srcs = ["worker_test.go"], embed = [":workloadrepo"], flaky = True, - shard_count = 13, + shard_count = 14, deps = [ "//pkg/domain", "//pkg/infoschema", diff --git a/pkg/util/workloadrepo/worker_test.go b/pkg/util/workloadrepo/worker_test.go index a4efe98a049ae..97272d8b5a33f 100644 --- a/pkg/util/workloadrepo/worker_test.go +++ b/pkg/util/workloadrepo/worker_test.go @@ -887,6 +887,7 @@ func TestRecoverSnapID(t *testing.T) { etcd2 := setupEtcd(t) worker2 := setupWorker(ctx, t, etcd2, dom, "worker2", true) snapIDStr, err := worker2.etcdGet(ctx, snapIDKey) + require.Nil(t, err) require.Equal(t, "", snapIDStr) _, err = worker2.getSnapID(ctx)