Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workloadrepo: try to recover etcd snapID from table #59628

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/util/workloadrepo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ go_test(
srcs = ["worker_test.go"],
embed = [":workloadrepo"],
flaky = True,
shard_count = 13,
shard_count = 14,
deps = [
"//pkg/domain",
"//pkg/infoschema",
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/workloadrepo/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package workloadrepo

import (
"errors"
"time"

"github.com/pingcap/tidb/pkg/errno"
Expand Down Expand Up @@ -48,4 +49,6 @@ var (
errUnsupportedEtcdRequired = dbterror.ClassUtil.NewStdErr(errno.ErrNotSupportedYet, mysql.Message("etcd client required for workload repository", nil))
errWorkloadNotStarted = dbterror.ClassUtil.NewStdErr(errno.ErrNotSupportedYet, mysql.Message("Workload repository is not enabled", nil))
errCouldNotStartSnapshot = dbterror.ClassUtil.NewStdErr(errno.ErrUnknown, mysql.Message("Snapshot initiation failed", nil))

errKeyNotFound = errors.New("key not found")
)
25 changes: 22 additions & 3 deletions pkg/util/workloadrepo/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,28 @@ func (w *worker) etcdCAS(ctx context.Context, key, oval, nval string) error {
return nil
}

func queryMaxSnapID(ctx context.Context, sctx sessionctx.Context) (uint64, error) {
query := sqlescape.MustEscapeSQL("SELECT MAX(`SNAP_ID`) FROM %n.%n", WorkloadSchema, histSnapshotsTable)
rs, err := runQuery(ctx, sctx, query)
if err != nil {
return 0, err
}
if len(rs) > 0 {
if rs[0].IsNull(0) {
return 0, nil
}
return rs[0].GetUint64(0), nil
}
return 0, errors.New("no rows returned when querying max snap id")
}

func (w *worker) getSnapID(ctx context.Context) (uint64, error) {
snapIDStr, err := w.etcdGet(ctx, snapIDKey)
if err != nil {
return 0, err
}
if snapIDStr == "" {
// return zero when the key does not exist
return 0, nil
return 0, errKeyNotFound
}
return strconv.ParseUint(snapIDStr, 10, 64)
}
Expand Down Expand Up @@ -140,7 +154,12 @@ func (w *worker) takeSnapshot(ctx context.Context) (uint64, error) {
var snapID uint64
var err error
for range snapshotRetries {
isEmpty := false
snapID, err = w.getSnapID(ctx)
if stderrors.Is(err, errKeyNotFound) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comments on when this may happen.

snapID, err = queryMaxSnapID(ctx, sess)
isEmpty = true
}
if err != nil {
err = fmt.Errorf("cannot get current snapid: %w", err)
continue
Expand All @@ -158,7 +177,7 @@ func (w *worker) takeSnapshot(ctx context.Context) (uint64, error) {
continue
}

if snapID == 0 {
if isEmpty {
err = w.etcdCreate(ctx, snapIDKey, strconv.FormatUint(snapID+1, 10))
} else {
err = w.etcdCAS(ctx, snapIDKey, strconv.FormatUint(snapID, 10), strconv.FormatUint(snapID+1, 10))
Expand Down
77 changes: 61 additions & 16 deletions pkg/util/workloadrepo/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,7 @@ func setupWorkerForTest(ctx context.Context, etcdCli *clientv3.Client, dom *doma
return wrk
}

func setupDomainAndContext(t *testing.T) (context.Context, kv.Storage, *domain.Domain, string) {
ctx := context.Background()
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
var cancel context.CancelFunc = nil
if ddl, ok := t.Deadline(); ok {
ctx, cancel = context.WithDeadline(ctx, ddl)
}
t.Cleanup(func() {
if cancel != nil {
cancel()
}
})

store, dom := testkit.CreateMockStoreAndDomain(t)

func setupEtcd(t *testing.T) string {
cfg := embed.NewConfig()
cfg.Dir = t.TempDir()

Expand All @@ -97,7 +83,25 @@ func setupDomainAndContext(t *testing.T) (context.Context, kv.Storage, *domain.D
require.False(t, true, "server took too long to start")
}

return ctx, store, dom, embedEtcd.Clients[0].Addr().String()
return embedEtcd.Clients[0].Addr().String()
}

func setupDomainAndContext(t *testing.T) (context.Context, kv.Storage, *domain.Domain, string) {
ctx := context.Background()
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
var cancel context.CancelFunc = nil
if ddl, ok := t.Deadline(); ok {
ctx, cancel = context.WithDeadline(ctx, ddl)
}
t.Cleanup(func() {
if cancel != nil {
cancel()
}
})

store, dom := testkit.CreateMockStoreAndDomain(t)
etcdAddr := setupEtcd(t)
return ctx, store, dom, etcdAddr
}

func setupWorker(ctx context.Context, t *testing.T, addr string, dom *domain.Domain, id string, testWorker bool) *worker {
Expand Down Expand Up @@ -857,3 +861,44 @@ func TestCalcNextTick(t *testing.T) {
require.True(t, calcNextTick(time.Date(2024, 12, 7, 2, 0, 0, 1, loc)) == time.Hour*24-time.Nanosecond)
require.True(t, calcNextTick(time.Date(2024, 12, 7, 1, 59, 59, 999999999, loc)) == time.Nanosecond)
}

func TestRecoverSnapID(t *testing.T) {
ctx, store, dom, addr := setupDomainAndContext(t)
worker := setupWorker(ctx, t, addr, dom, "worker1", true)
require.NoError(t, worker.setRepositoryDest(ctx, "table"))
now := time.Now()

require.Eventually(t, func() bool {
return worker.checkTablesExists(ctx, now)
}, time.Minute, time.Second)
tk := testkit.NewTestKit(t, store)
prevSnapID := uint64(0)
require.Eventually(t, func() bool {
res := tk.MustQuery("select max(snap_id) from workload_schema.hist_snapshots").Rows()
if len(res) == 0 || len(res[0]) == 0 {
return false
}
snapID, err := strconv.ParseUint(res[0][0].(string), 10, 64)
prevSnapID = snapID
return err == nil && snapID > 0
}, time.Minute, time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe time.Second should be time.Millisecond*100?

worker.stop()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably wait for the worker to stop.

Suggested change
// wait for worker to stop
require.Eventually(t, func() bool {
return worker.cancel == nil
}, time.Second*10, time.Millisecond*100)

etcd2 := setupEtcd(t)
worker2 := setupWorker(ctx, t, etcd2, dom, "worker2", true)
snapIDStr, err := worker2.etcdGet(ctx, snapIDKey)
require.Nil(t, err)
require.Equal(t, "", snapIDStr)

_, err = worker2.getSnapID(ctx)
require.EqualError(t, errKeyNotFound, err.Error())
newSnapID, err := queryMaxSnapID(ctx, worker2.getSessionWithRetry().(sessionctx.Context))
require.Nil(t, err)
require.Equal(t, prevSnapID, newSnapID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code assumes that prevSnapID has to match the last run. This based on timing. It might be better to just run a SQL query to validate that this value is one greater than max(snap_id).


require.NoError(t, worker2.setRepositoryDest(ctx, "table"))
require.Eventually(t, func() bool {
newSnapID, err = worker2.getSnapID(ctx)
return err == nil && newSnapID >= prevSnapID
}, time.Minute, time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would retry every 100 milliseconds instead.

}