Skip to content

Commit

Permalink
ddl: use latest PD address to register lightning (#48687) (#55937)
Browse files Browse the repository at this point in the history
close #48680
  • Loading branch information
ti-chi-bot authored Nov 6, 2024
1 parent a92e555 commit bb5d8a8
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 10 deletions.
20 changes: 16 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,10 +772,15 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCt
if err != nil {
return model.ReorgTypeNone, err
}
var pdLeaderAddr string
if d != nil {
//nolint:forcetypeassert
pdLeaderAddr = d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
}
if variable.EnableDistTask.Load() {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli)
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, pdLeaderAddr)
} else {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil)
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil, pdLeaderAddr)
}
if err != nil {
return model.ReorgTypeNone, err
Expand Down Expand Up @@ -927,7 +932,12 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
if ok && bc.Done() {
return true, 0, nil
}
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, nil)
var pdLeaderAddr string
if d != nil {
//nolint:forcetypeassert
pdLeaderAddr = d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
}
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, nil, pdLeaderAddr)
if err != nil {
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1849,7 +1859,9 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
return errors.New("unexpected error, can't find index info")
}
if indexInfo.Unique {
bc, err := ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, reorgInfo.ID, nil)
//nolint:forcetypeassert
pdLeaderAddr := w.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
bc, err := ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, reorgInfo.ID, nil, pdLeaderAddr)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// BackendCtxMgr is used to manage the backend context.
type BackendCtxMgr interface {
CheckAvailable() (bool, error)
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error)
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string) (BackendCtx, error)
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
}

// Register creates a new backend and registers it to the backend context.
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error) {
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string) (BackendCtx, error) {
bc, exist := m.Load(jobID)
if !exist {
m.memRoot.RefreshConsumption()
Expand All @@ -91,6 +91,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6
logutil.BgLogger().Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
cfg.Lightning.TiDB.PdAddr = pdAddr
bd, err := createLocalBackend(ctx, cfg)
if err != nil {
logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
Expand Down
1 change: 0 additions & 1 deletion ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func genConfig(memRoot MemRoot, jobID int64, unique bool) (*Config, error) {
} else {
cfg.TikvImporter.DuplicateResolution = lightning.DupeResAlgNone
}
cfg.TiDB.PdAddr = tidbCfg.Path
cfg.TiDB.Host = "127.0.0.1"
cfg.TiDB.StatusPort = int(tidbCfg.Status.StatusPort)
// Set TLS related information
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) {
}

// Register implements BackendCtxMgr.Register interface.
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client) (BackendCtx, error) {
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client, _ string) (BackendCtx, error) {
logutil.BgLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
4 changes: 3 additions & 1 deletion ddl/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -156,7 +157,8 @@ func (b *backfillSchedulerHandle) InitSubtaskExecEnv(ctx context.Context) error
logutil.BgLogger().Info("[ddl] lightning init subtask exec env")
d := b.d

bc, err := ingest.LitBackCtxMgr.Register(d.ctx, b.index.Unique, b.job.ID, d.etcdCli)
pdLeaderAddr := d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
bc, err := ingest.LitBackCtxMgr.Register(d.ctx, b.index.Unique, b.job.ID, d.etcdCli, pdLeaderAddr)
if err != nil {
logutil.BgLogger().Warn("[ddl] lightning register error", zap.Error(err))
return err
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) {
tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);")

// Mock there is a running ingest job.
_, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil)
_, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil, realtikvtest.PDAddr)
require.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(2)
Expand Down
3 changes: 3 additions & 0 deletions tests/realtikvtest/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ var (
// TiKVPath is the path of the TiKV Storage.
TiKVPath = flag.String("tikv-path", "tikv://127.0.0.1:2379?disableGC=true", "TiKV addr")

// PDAddr is the address of PD.
PDAddr = "127.0.0.1:2379"

// KeyspaceName is an option to specify the name of keyspace that the tests run on,
// this option is only valid while the flag WithRealTiKV is set.
KeyspaceName = flag.String("keyspace-name", "", "the name of keyspace that the tests run on")
Expand Down

0 comments on commit bb5d8a8

Please sign in to comment.