Skip to content

Commit

Permalink
This is an automated cherry-pick of #52503
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
tangenta authored and ti-chi-bot committed May 27, 2024
1 parent 0bbeb15 commit 39a3f5f
Show file tree
Hide file tree
Showing 12 changed files with 832 additions and 80 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ go_library(
"//pkg/util/compress",
"//pkg/util/engine",
"//pkg/util/hack",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"//pkg/util/ranger",
"@com_github_cockroachdb_pebble//:pebble",
Expand Down
4 changes: 4 additions & 0 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
if err != nil {
logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err))
}
d.runningJobs.clear()
})

d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil)
Expand Down Expand Up @@ -790,12 +791,15 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
defer d.sessPool.Put(ctx)

ingest.InitGlobalLightningEnv()
<<<<<<< HEAD
d.ownerManager.SetRetireOwnerHook(func() {
// Since this instance is not DDL owner anymore, we clean up the processing job info.
if ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.MarkJobFinish()
}
})
=======
>>>>>>> 5814957ace2 (ddl: fix runnable ingest job checking (#52503))

return nil
}
Expand Down
81 changes: 81 additions & 0 deletions pkg/ddl/ddl_running_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,34 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

type runningJobs struct {
sync.RWMutex
<<<<<<< HEAD
ids map[int64]struct{}
runningSchema map[string]map[string]struct{} // database -> table -> struct{}
runningJobIDs string
=======
// processingIDs records the IDs of the jobs that are being processed by a worker.
processingIDs map[int64]struct{}
processingIDsStr string

// unfinishedIDs records the IDs of the jobs that are not finished yet.
// It is not necessarily being processed by a worker.
unfinishedIDs map[int64]struct{}
unfinishedSchema map[string]map[string]struct{} // database -> table -> struct{}

// processingReorgJobID records the ID of the ingest job that is being processed by a worker.
// TODO(tangenta): remove this when we support running multiple concurrent ingest jobs.
processingIngestJobID int64
lastLoggingTime time.Time
>>>>>>> 5814957ace2 (ddl: fix runnable ingest job checking (#52503))
}

func newRunningJobs() *runningJobs {
Expand All @@ -40,11 +59,33 @@ func newRunningJobs() *runningJobs {
}
}

<<<<<<< HEAD
=======
func (j *runningJobs) clear() {
j.Lock()
defer j.Unlock()
j.unfinishedIDs = make(map[int64]struct{})
j.unfinishedSchema = make(map[string]map[string]struct{})
}

>>>>>>> 5814957ace2 (ddl: fix runnable ingest job checking (#52503))
func (j *runningJobs) add(job *model.Job) {
j.Lock()
defer j.Unlock()
j.ids[job.ID] = struct{}{}
j.updateInternalRunningJobIDs()
<<<<<<< HEAD
=======
if isIngestJob(job) {
j.processingIngestJobID = job.ID
}

if _, ok := j.unfinishedIDs[job.ID]; ok {
// Already exists, no need to add it again.
return
}
j.unfinishedIDs[job.ID] = struct{}{}
>>>>>>> 5814957ace2 (ddl: fix runnable ingest job checking (#52503))
for _, info := range job.GetInvolvingSchemaInfo() {
if _, ok := j.runningSchema[info.Database]; !ok {
j.runningSchema[info.Database] = make(map[string]struct{})
Expand All @@ -58,19 +99,52 @@ func (j *runningJobs) remove(job *model.Job) {
defer j.Unlock()
delete(j.ids, job.ID)
j.updateInternalRunningJobIDs()
<<<<<<< HEAD
for _, info := range job.GetInvolvingSchemaInfo() {
if db, ok := j.runningSchema[info.Database]; ok {
delete(db, info.Table)
}
if len(j.runningSchema[info.Database]) == 0 {
delete(j.runningSchema, info.Database)
=======
if isIngestJob(job) && job.ID == j.processingIngestJobID {
j.processingIngestJobID = 0
}

if job.IsFinished() || job.IsSynced() {
delete(j.unfinishedIDs, job.ID)
for _, info := range job.GetInvolvingSchemaInfo() {
if db, ok := j.unfinishedSchema[info.Database]; ok {
delete(db, info.Table)
}
if len(j.unfinishedSchema[info.Database]) == 0 {
delete(j.unfinishedSchema, info.Database)
}
>>>>>>> 5814957ace2 (ddl: fix runnable ingest job checking (#52503))
}
}
}

func (j *runningJobs) checkRunnable(job *model.Job) bool {
j.RLock()
defer j.RUnlock()
<<<<<<< HEAD
=======
if _, ok := j.processingIDs[job.ID]; ok {
// Already processing by a worker. Skip running it again.
return false
}
if isIngestJob(job) && j.processingIngestJobID != 0 {
// We only allow one task to use ingest at the same time in order to limit the CPU/memory usage.
if time.Since(j.lastLoggingTime) > 1*time.Minute {
logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job",
zap.String("category", "ddl-ingest"),
zap.Int64("processing job ID", j.processingIngestJobID))
j.lastLoggingTime = time.Now()
}
return false
}
>>>>>>> 5814957ace2 (ddl: fix runnable ingest job checking (#52503))
for _, info := range job.GetInvolvingSchemaInfo() {
if _, ok := j.runningSchema[model.InvolvingAll]; ok {
return false
Expand All @@ -93,6 +167,7 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
return true
}

<<<<<<< HEAD
func (j *runningJobs) allIDs() string {
j.RLock()
defer j.RUnlock()
Expand All @@ -110,4 +185,10 @@ func (j *runningJobs) updateInternalRunningJobIDs() {
i++
}
j.runningJobIDs = sb.String()
=======
func isIngestJob(job *model.Job) bool {
return (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg
>>>>>>> 5814957ace2 (ddl: fix runnable ingest job checking (#52503))
}
11 changes: 0 additions & 11 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -576,7 +575,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
startTime := time.Now()
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
markJobFinish(job)
}()

if jobNeedGC(job) {
Expand Down Expand Up @@ -622,15 +620,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
return errors.Trace(err)
}

func markJobFinish(job *model.Job) {
if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg &&
ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.MarkJobFinish()
}
}

func (w *worker) writeDDLSeqNum(job *model.Job) {
w.ddlSeqNumMu.Lock()
w.ddlSeqNumMu.seqNum++
Expand Down
8 changes: 8 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
<<<<<<< HEAD

Check failure on line 45 in pkg/ddl/index.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

missing import path

Check failure on line 45 in pkg/ddl/index.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

missing import path

Check failure on line 45 in pkg/ddl/index.go

View workflow job for this annotation

GitHub Actions / Compile for ubuntu-latest

missing import path
=======
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/common"
>>>>>>> 5814957ace2 (ddl: fix runnable ingest job checking (#52503))
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -843,6 +848,9 @@ func cleanupSortPath(ctx context.Context, currentJobID int64) error {
logutil.Logger(ctx).Warn(ingest.LitErrCleanSortPath, zap.Error(err))
return nil
}
failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() {
close(local.WaitRMFolderChForTest)
})
}
}
return nil
Expand Down
66 changes: 33 additions & 33 deletions pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@ import (
"fmt"
"math"
"strconv"
"sync"
"time"

<<<<<<< HEAD

Check failure on line 24 in pkg/ddl/ingest/backend_mgr.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

missing import path

Check failure on line 24 in pkg/ddl/ingest/backend_mgr.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

missing import path

Check failure on line 24 in pkg/ddl/ingest/backend_mgr.go

View workflow job for this annotation

GitHub Actions / Compile for ubuntu-latest

missing import path
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/config"
=======
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/config"
>>>>>>> 5814957ace2 (ddl: fix runnable ingest job checking (#52503))
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/pingcap/tidb/pkg/util/logutil"
kvutil "github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -37,18 +43,12 @@ type BackendCtxMgr interface {
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error)
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)

MarkJobProcessing(jobID int64) (ok bool)
MarkJobFinish()
}

type litBackendCtxMgr struct {
generic.SyncMap[int64, *litBackendCtx]
memRoot MemRoot
diskRoot DiskRoot
processingJobID int64
lastLoggingTime time.Time
mu sync.Mutex
memRoot MemRoot
diskRoot DiskRoot
}

func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
Expand All @@ -69,30 +69,6 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
return mgr
}

// MarkJobProcessing marks ingest backfill is processing.
func (m *litBackendCtxMgr) MarkJobProcessing(jobID int64) bool {
m.mu.Lock()
defer m.mu.Unlock()
if m.processingJobID == 0 || m.processingJobID == jobID {
m.processingJobID = jobID
return true
}
if time.Since(m.lastLoggingTime) > 1*time.Minute {
logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job",
zap.String("category", "ddl-ingest"),
zap.Int64("processing job ID", m.processingJobID))
m.lastLoggingTime = time.Now()
}
return false
}

// MarkJobFinish marks ingest backfill is finished.
func (m *litBackendCtxMgr) MarkJobFinish() {
m.mu.Lock()
m.processingJobID = 0
m.mu.Unlock()
}

// CheckAvailable checks if the ingest backfill is available.
func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
if err := m.diskRoot.PreCheckUsage(); err != nil {
Expand All @@ -102,6 +78,9 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
return true, nil
}

// ResignOwnerForTest is only used for test.
var ResignOwnerForTest = atomic.NewBool(false)

// Register creates a new backend and registers it to the backend context.
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error) {
bc, exist := m.Load(jobID)
Expand All @@ -123,8 +102,29 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6
return nil, err
}

<<<<<<< HEAD
bcCtx := newBackendContext(ctx, jobID, bd, cfg.Lightning, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient)
m.Store(jobID, bcCtx)
=======
m.memRoot.RefreshConsumption()
ok := m.memRoot.CheckConsume(StructSizeBackendCtx)
if !ok {
return nil, genBackendAllocMemFailedErr(ctx, m.memRoot, jobID)
}
cfg, err := genConfig(ctx, m.memRoot, jobID, unique, resourceGroupName)
if err != nil {
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
failpoint.Inject("beforeCreateLocalBackend", func() {
ResignOwnerForTest.Store(true)
})
bd, err := createLocalBackend(ctx, cfg, pdSvcDiscovery)
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
>>>>>>> 5814957ace2 (ddl: fix runnable ingest job checking (#52503))

m.memRoot.Consume(StructSizeBackendCtx)
logutil.Logger(ctx).Info(LitInfoCreateBackend, zap.Int64("job ID", jobID),
Expand Down
9 changes: 0 additions & 9 deletions pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,6 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken
}
}

// MarkJobProcessing implements BackendCtxMgr.MarkJobProcessing interface.
func (*MockBackendCtxMgr) MarkJobProcessing(_ int64) bool {
return true
}

// MarkJobFinish implements BackendCtxMgr.MarkJobFinish interface.
func (*MockBackendCtxMgr) MarkJobFinish() {
}

// CheckAvailable implements BackendCtxMgr.Available interface.
func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) {
return len(m.runningJobs) == 0, nil
Expand Down
20 changes: 9 additions & 11 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,6 @@ func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) {
if !d.runningJobs.checkRunnable(job) {
return false, nil
}
if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.State == model.JobStateQueueing &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg &&
ingest.LitBackCtxMgr != nil {
succeed := ingest.LitBackCtxMgr.MarkJobProcessing(job.ID)
if !succeed {
// We only allow one task to use ingest at the same time in order to limit the CPU/memory usage.
return false, nil
}
}
// Check if there is any block ddl running, like drop schema and flashback cluster.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where "+
"(CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) "+
Expand Down Expand Up @@ -273,6 +262,15 @@ func (d *ddl) startDispatchLoop() {
time.Sleep(dispatchLoopWaitingDuration)
continue
}
failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() {
if ingest.ResignOwnerForTest.Load() {
err2 := d.ownerManager.ResignOwner(context.Background())
if err2 != nil {
logutil.BgLogger().Info("resign meet error", zap.Error(err2))
}
ingest.ResignOwnerForTest.Store(false)
}
})
select {
case <-d.ddlJobCh:
case <-ticker.C:
Expand Down
Loading

0 comments on commit 39a3f5f

Please sign in to comment.