From d9f0cdf77a1a6747b8bb80e0de193bcf359e995f Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 12 Aug 2024 14:45:39 +0800 Subject: [PATCH] executor: try to optimize index join query by reduce cop task count Signed-off-by: crazycs520 --- pkg/executor/join/index_lookup_join.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/executor/join/index_lookup_join.go b/pkg/executor/join/index_lookup_join.go index e289ae7ab44b3..e35aed79908a8 100644 --- a/pkg/executor/join/index_lookup_join.go +++ b/pkg/executor/join/index_lookup_join.go @@ -186,7 +186,7 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { return nil } -func (e *IndexLookUpJoin) startWorkers(ctx context.Context) { +func (e *IndexLookUpJoin) startWorkers(ctx context.Context, initBatchSize int) { concurrency := e.Ctx().GetSessionVars().IndexLookupJoinConcurrency() if e.stats != nil { e.stats.concurrency = concurrency @@ -197,7 +197,7 @@ func (e *IndexLookUpJoin) startWorkers(ctx context.Context) { e.cancelFunc = cancelFunc innerCh := make(chan *lookUpJoinTask, concurrency) e.WorkerWg.Add(1) - go e.newOuterWorker(resultCh, innerCh).run(workerCtx, e.WorkerWg) + go e.newOuterWorker(resultCh, innerCh, initBatchSize).run(workerCtx, e.WorkerWg) for i := 0; i < concurrency; i++ { innerWorker := e.newInnerWorker(innerCh) e.WorkerWg.Add(1) @@ -205,15 +205,17 @@ func (e *IndexLookUpJoin) startWorkers(ctx context.Context) { } } -func (e *IndexLookUpJoin) newOuterWorker(resultCh, innerCh chan *lookUpJoinTask) *outerWorker { +func (e *IndexLookUpJoin) newOuterWorker(resultCh, innerCh chan *lookUpJoinTask, initBatchSize int) *outerWorker { + maxBatchSize := e.Ctx().GetSessionVars().IndexJoinBatchSize + batchSize := min(initBatchSize, maxBatchSize) ow := &outerWorker{ OuterCtx: e.OuterCtx, ctx: e.Ctx(), executor: e.Children(0), resultCh: resultCh, innerCh: innerCh, - batchSize: 32, - maxBatchSize: e.Ctx().GetSessionVars().IndexJoinBatchSize, + batchSize: batchSize, + maxBatchSize: maxBatchSize, parentMemTracker: e.memTracker, lookup: e, } @@ -273,7 +275,7 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork // Next implements the Executor interface. func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error { if !e.prepared { - e.startWorkers(ctx) + e.startWorkers(ctx, req.RequiredRows()) e.prepared = true } if e.IsOuterJoin {