Skip to content

Commit

Permalink
ddl: fix job's row count for global sort (#59898)
Browse files Browse the repository at this point in the history
close #59897
  • Loading branch information
tangenta authored Mar 4, 2025
1 parent bbb9623 commit a6094f7
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 102 deletions.
144 changes: 50 additions & 94 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ func NewAddIndexIngestPipeline(
srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, backendCtx)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt,
reorgMeta.GetBatchSize(), rm, backendCtx)
ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool,
tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, rowCntListener)
ingestOp := NewIndexIngestOperator(ctx, copCtx, sessPool,
tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta)
sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, rowCntListener)

operator.Compose[TableScanTask](srcOp, scanOp)
Expand Down Expand Up @@ -667,19 +667,17 @@ func NewWriteExternalStoreOperator(
writers = append(writers, writer)
}

return &indexIngestExternalWorker{
indexIngestBaseWorker: indexIngestBaseWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
copCtx: copCtx,
se: nil,
sessPool: sessPool,
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
totalCount: totalCount,
},
return &indexIngestWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
copCtx: copCtx,
se: nil,
sessPool: sessPool,
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
totalCount: totalCount,
}
})
return &WriteExternalStoreOperator{
Expand All @@ -700,7 +698,6 @@ func (o *WriteExternalStoreOperator) Close() error {
type IndexWriteResult struct {
ID int
Added int
Total int
}

// IndexIngestOperator writes index records to ingest engine.
Expand All @@ -712,15 +709,13 @@ type IndexIngestOperator struct {
func NewIndexIngestOperator(
ctx *OperatorCtx,
copCtx copr.CopContext,
backendCtx ingest.BackendCtx,
sessPool opSessPool,
tbl table.PhysicalTable,
indexes []table.Index,
engines []ingest.Engine,
srcChunkPool *sync.Pool,
concurrency int,
reorgMeta *model.DDLReorgMeta,
rowCntListener RowCountListener,
) *IndexIngestOperator {
writerCfg := getLocalWriterConfig(len(indexes), concurrency)

Expand All @@ -742,77 +737,25 @@ func NewIndexIngestOperator(
writers = append(writers, writer)
}

return &indexIngestLocalWorker{
indexIngestBaseWorker: indexIngestBaseWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
copCtx: copCtx,

se: nil,
sessPool: sessPool,
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
},
backendCtx: backendCtx,
rowCntListener: rowCntListener,
return &indexIngestWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
copCtx: copCtx,

se: nil,
sessPool: sessPool,
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
}
})
return &IndexIngestOperator{
AsyncOperator: operator.NewAsyncOperator[IndexRecordChunk, IndexWriteResult](ctx, pool),
}
}

type indexIngestExternalWorker struct {
indexIngestBaseWorker
}

func (w *indexIngestExternalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) {
defer func() {
if ck.Chunk != nil {
w.srcChunkPool.Put(ck.Chunk)
}
}()
rs, err := w.indexIngestBaseWorker.HandleTask(ck)
if err != nil {
w.ctx.onError(err)
return
}
send(rs)
}

type indexIngestLocalWorker struct {
indexIngestBaseWorker
backendCtx ingest.BackendCtx
rowCntListener RowCountListener
}

func (w *indexIngestLocalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) {
defer func() {
if ck.Chunk != nil {
w.srcChunkPool.Put(ck.Chunk)
}
}()
rs, err := w.indexIngestBaseWorker.HandleTask(ck)
if err != nil {
w.ctx.onError(err)
return
}
if rs.Added == 0 {
return
}
w.rowCntListener.Written(rs.Added)
err = w.backendCtx.IngestIfQuotaExceeded(w.ctx, ck.ID, rs.Added)
if err != nil {
w.ctx.onError(err)
return
}
rs.Total = w.backendCtx.TotalKeyCount()
send(rs)
}

type indexIngestBaseWorker struct {
type indexIngestWorker struct {
ctx *OperatorCtx

tbl table.PhysicalTable
Expand All @@ -830,23 +773,28 @@ type indexIngestBaseWorker struct {
totalCount *atomic.Int64
}

func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk) (IndexWriteResult, error) {
func (w *indexIngestWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) {
defer func() {
if ck.Chunk != nil {
w.srcChunkPool.Put(ck.Chunk)
}
}()
failpoint.Inject("injectPanicForIndexIngest", func() {
panic("mock panic")
})

result := IndexWriteResult{
ID: rs.ID,
ID: ck.ID,
}
w.initSessCtx()
count, _, err := w.WriteChunk(&rs)
count, _, err := w.WriteChunk(&ck)
if err != nil {
w.ctx.onError(err)
return result, err
return
}
if count == 0 {
logutil.Logger(w.ctx).Info("finish a index ingest task", zap.Int("id", rs.ID))
return result, nil
logutil.Logger(w.ctx).Info("finish a index ingest task", zap.Int("id", ck.ID))
return
}
if w.totalCount != nil {
w.totalCount.Add(int64(count))
Expand All @@ -855,10 +803,10 @@ func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk) (IndexWriteResul
if ResultCounterForTest != nil {
ResultCounterForTest.Add(1)
}
return result, nil
send(result)
}

func (w *indexIngestBaseWorker) initSessCtx() {
func (w *indexIngestWorker) initSessCtx() {
if w.se == nil {
sessCtx, err := w.sessPool.Get()
if err != nil {
Expand All @@ -874,7 +822,7 @@ func (w *indexIngestBaseWorker) initSessCtx() {
}
}

func (w *indexIngestBaseWorker) Close() {
func (w *indexIngestWorker) Close() {
// TODO(lance6716): unify the real write action for engineInfo and external
// writer.
for _, writer := range w.writers {
Expand All @@ -894,7 +842,7 @@ func (w *indexIngestBaseWorker) Close() {
}

// WriteChunk will write index records to lightning engine.
func (w *indexIngestBaseWorker) WriteChunk(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) {
func (w *indexIngestWorker) WriteChunk(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) {
failpoint.Inject("mockWriteLocalError", func(_ failpoint.Value) {
failpoint.Return(0, nil, errors.New("mock write local error"))
})
Expand Down Expand Up @@ -955,20 +903,28 @@ func (s *indexWriteResultSink) collectResult() error {
select {
case <-s.ctx.Done():
return s.ctx.Err()
case _, ok := <-s.source.Channel():
case rs, ok := <-s.source.Channel():
if !ok {
err := s.flush()
if err != nil {
s.ctx.onError(err)
}
if s.backendCtx != nil {
if s.backendCtx != nil { // for local sort only
total := s.backendCtx.TotalKeyCount()
if total > 0 {
s.rowCntListener.SetTotal(total)
}
}
return err
}
s.rowCntListener.Written(rs.Added)
if s.backendCtx != nil { // for local sort only
err := s.backendCtx.IngestIfQuotaExceeded(s.ctx, rs.ID, rs.Added)
if err != nil {
s.ctx.onError(err)
return err
}
}
}
}
}
Expand Down
15 changes: 12 additions & 3 deletions tests/realtikvtest/addindextest2/global_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ func checkFileCleaned(t *testing.T, jobID int64, sortStorageURI string) {
require.Equal(t, 0, len(statFiles))
}

func checkDataAndShowJobs(t *testing.T, tk *testkit.TestKit, count int) {
tk.MustExec("admin check table t;")
rs := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rs, 1)
require.Contains(t, rs[0][12], "ingest")
require.Contains(t, rs[0][12], "cloud")
require.Equal(t, rs[0][7], strconv.Itoa(count))
}

func TestGlobalSortBasic(t *testing.T) {
gcsHost, gcsPort, cloudStorageURI := genStorageURI(t)
opt := fakestorage.Options{
Expand Down Expand Up @@ -120,18 +129,18 @@ func TestGlobalSortBasic(t *testing.T) {
})

tk.MustExec("alter table t add index idx(a);")
tk.MustExec("admin check table t;")
checkDataAndShowJobs(t, tk, size)
<-ch
checkFileCleaned(t, jobID, cloudStorageURI)

testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/forceMergeSort", "return()")
tk.MustExec("alter table t add index idx1(a);")
tk.MustExec("admin check table t;")
checkDataAndShowJobs(t, tk, size)
<-ch
checkFileCleaned(t, jobID, cloudStorageURI)

tk.MustExec("alter table t add unique index idx2(a);")
tk.MustExec("admin check table t;")
checkDataAndShowJobs(t, tk, size)
<-ch
checkFileCleaned(t, jobID, cloudStorageURI)
}
Expand Down
9 changes: 4 additions & 5 deletions tests/realtikvtest/addindextest3/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func TestBackfillOperators(t *testing.T) {
src := testutil.NewOperatorTestSource(chunkResults...)
reorgMeta := ddl.NewDDLReorgMeta(tk.Session())
ingestOp := ddl.NewIndexIngestOperator(
opCtx, copCtx, bcCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine},
srcChkPool, 3, reorgMeta, &ddl.EmptyRowCntListener{})
opCtx, copCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine},
srcChkPool, 3, reorgMeta)
sink := testutil.NewOperatorTestSink[ddl.IndexWriteResult]()

operator.Compose[ddl.IndexRecordChunk](src, ingestOp)
Expand Down Expand Up @@ -443,9 +443,8 @@ func TestTuneWorkerPoolSize(t *testing.T) {
require.NoError(t, err)
defer bcCtx.Close()
mockEngine := ingest.NewMockEngineInfo(nil)
ingestOp := ddl.NewIndexIngestOperator(opCtx, copCtx, bcCtx, sessPool, pTbl, []table.Index{index},
[]ingest.Engine{mockEngine}, nil, 2, nil,
&ddl.EmptyRowCntListener{})
ingestOp := ddl.NewIndexIngestOperator(opCtx, copCtx, sessPool, pTbl, []table.Index{index},
[]ingest.Engine{mockEngine}, nil, 2, nil)

ingestOp.Open()
require.Equal(t, ingestOp.GetWorkerPoolSize(), int32(2))
Expand Down

0 comments on commit a6094f7

Please sign in to comment.