Skip to content

Commit

Permalink
br/restore: Add conc control to upload for PiTR (#59696)
Browse files Browse the repository at this point in the history
close #59835
  • Loading branch information
YuJuncen authored Feb 28, 2025
1 parent d392685 commit d2ffdd0
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 11 deletions.
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ go_test(
],
embed = [":snap_client"],
flaky = True,
shard_count = 23,
shard_count = 24,
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,10 @@ func makeDBPool(size uint, dbFactory func() (*tidallocdb.DB, error)) ([]*tidallo
}

func (rc *SnapClient) InstallPiTRSupport(ctx context.Context, deps PiTRCollDep) error {
if err := deps.LoadMaxCopyConcurrency(ctx, rc.concurrencyPerStore); err != nil {
return errors.Trace(err)
}

collector, err := newPiTRColl(ctx, deps)
if err != nil {
return errors.Trace(err)
Expand Down
46 changes: 41 additions & 5 deletions br/pkg/restore/snap_client/pitr_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
Expand All @@ -27,6 +28,10 @@ import (
"golang.org/x/sync/errgroup"
)

const (
DefaultMaxConcurrentCopy = 1024
)

type persistCall struct {
cx context.Context
cb func(error)
Expand Down Expand Up @@ -147,12 +152,15 @@ type pitrCollector struct {
// restoreUUID is the identity of this restoration.
// This will be kept among restarting from checkpoints.
restoreUUID uuid.UUID
// maxCopyConcurrency is the maximum number of concurrent copy operations.
maxCopyConcurrency int

// Mutable state.
ingestedSSTMeta ingestedSSTsMeta
ingestedSSTMetaLock sync.Mutex
putMigOnce sync.Once
writerRoutine persisterHandle
concControl *util.WorkerPool

// Delegates.

Expand Down Expand Up @@ -242,15 +250,15 @@ func (c *pitrCollector) onBatch(ctx context.Context, fileSets restore.BatchBacku

for _, file := range fileSet.SSTFiles {
fileCount += 1
eg.Go(func() error {
c.concControl.ApplyOnErrorGroup(eg, func() error {
if err := c.putSST(ectx, file); err != nil {
return errors.Annotatef(err, "failed to put sst %s", file.GetName())
}
return nil
})
}
for _, hint := range fileSet.RewriteRules.TableIDRemapHint {
eg.Go(func() error {
c.concControl.ApplyOnErrorGroup(eg, func() error {
if err := c.putRewriteRule(ectx, hint.Origin, hint.Rewritten); err != nil {
return errors.Annotatef(err, "failed to put rewrite rule of %v", fileSet.RewriteRules)
}
Expand Down Expand Up @@ -304,6 +312,7 @@ func (c *pitrCollector) putSST(ctx context.Context, f *pb.File) error {
}

begin := time.Now()
failpoint.InjectCall("put-sst")

f = util.ProtoV1Clone(f)
out := c.sstPath(f.Name)
Expand Down Expand Up @@ -440,6 +449,21 @@ func (c *pitrCollector) commit(ctx context.Context) (uint64, error) {
return ts, c.persistExtraBackupMeta(ctx)
}

func (c *pitrCollector) init() {
maxConc := c.maxCopyConcurrency
if maxConc <= 0 {
maxConc = DefaultMaxConcurrentCopy
}

c.setConcurrency(maxConc)
c.goPersister()
c.resetCommitting()
}

func (c *pitrCollector) setConcurrency(maxConc int) {
c.concControl = util.NewWorkerPool(uint(maxConc), "copy-sst")
}

func (c *pitrCollector) resetCommitting() {
c.ingestedSSTMeta = ingestedSSTsMeta{
rewrites: map[int64]int64{},
Expand All @@ -454,6 +478,18 @@ type PiTRCollDep struct {
PDCli pd.Client
EtcdCli *clientv3.Client
Storage *pb.StorageBackend

maxCopyConcurrency int
}

func (deps *PiTRCollDep) LoadMaxCopyConcurrency(ctx context.Context, maxConcPerTiKV uint) error {
stores, err := deps.PDCli.GetAllStores(ctx)
if err != nil {
return errors.Trace(err)
}
deps.maxCopyConcurrency = int(maxConcPerTiKV) * len(stores)
log.Info("Load max copy concurrency", zap.Int("maxCopyConcurrency", deps.maxCopyConcurrency), zap.Int("stores", len(stores)))
return nil
}

// newPiTRColl creates a new PiTR collector.
Expand All @@ -476,7 +512,8 @@ func newPiTRColl(ctx context.Context, deps PiTRCollDep) (*pitrCollector, error)
}

coll := &pitrCollector{
enabled: true,
enabled: true,
maxCopyConcurrency: deps.maxCopyConcurrency,
}

strg, err := storage.Create(ctx, ts[0].Info.Storage, false)
Expand All @@ -503,7 +540,6 @@ func newPiTRColl(ctx context.Context, deps PiTRCollDep) (*pitrCollector, error)
}
coll.restoreStorage = restoreStrg
coll.restoreSuccess = summary.Succeed
coll.goPersister()
coll.resetCommitting()
coll.init()
return coll, nil
}
49 changes: 44 additions & 5 deletions br/pkg/restore/snap_client/pitr_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ package snapclient
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/google/uuid"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/restore/utils"
Expand Down Expand Up @@ -82,9 +85,7 @@ func (p *pitrCollectorT) Reopen() {
}
p.success.Store(false)
p.coll = newColl

p.coll.resetCommitting()
p.coll.goPersister()
p.coll.init()
}

func (p pitrCollectorT) RequireCopied(extBk backuppb.IngestedSSTs, files ...string) {
Expand Down Expand Up @@ -129,8 +130,7 @@ func newPiTRCollForTest(t *testing.T) pitrCollectorT {
return tsoCnt.Add(1), nil
}
coll.restoreSuccess = restoreSuccess.Load
coll.goPersister()
coll.resetCommitting()
coll.init()

return pitrCollectorT{
t: t,
Expand Down Expand Up @@ -269,3 +269,42 @@ func TestConflict(t *testing.T) {

coll.Done()
}

func TestConcurrency(t *testing.T) {
coll := newPiTRCollForTest(t)
coll.coll.setConcurrency(2)

cnt := int64(0)
fence := make(chan struct{})

failpoint.EnableCall("github.com/pingcap/tidb/br/pkg/restore/snap_client/put-sst", func() {
atomic.AddInt64(&cnt, 1)
<-fence
})

cbs := []func() error{}
l := sync.Mutex{}
for i := range 10 {
batch := restore.BatchBackupFileSet{
backupFileSet(withFile(nameFile(fmt.Sprintf("foo%02d.txt", i)))),
}

go func() {
cb := coll.RestoreAFile(batch)
l.Lock()
cbs = append(cbs, cb)
l.Unlock()
}()
}

require.Eventually(t, func() bool {
return atomic.LoadInt64(&cnt) == 2
}, time.Second, 10*time.Millisecond)
close(fence)

for _, cb := range cbs {
require.NoError(t, cb())
}

coll.Done()
}

0 comments on commit d2ffdd0

Please sign in to comment.