Skip to content

Commit

Permalink
executor: refine explain analyze (#7888)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and alivxxx committed Oct 16, 2018
1 parent de6b582 commit 458c0d1
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 149 deletions.
8 changes: 4 additions & 4 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,9 +502,9 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro

// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.isUnparallelExec {
Expand Down Expand Up @@ -761,9 +761,9 @@ func (e *StreamAggExec) Close() error {

// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for !e.executed && chk.NumRows() < e.maxChunkSize {
Expand Down
38 changes: 6 additions & 32 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,41 +659,15 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) Executor {

// buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`.
func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor {
if v.Analyze {
stmt := &ExecStmt{
InfoSchema: GetInfoSchema(b.ctx),
Plan: v.ExecPlan,
StmtNode: v.ExecStmt,
Ctx: b.ctx,
}
b.ctx.GetSessionVars().StmtCtx.RuntimeStats = execdetails.NewRuntimeStats()
ctx := context.Background()
rs, err := stmt.Exec(ctx)
if err != nil {
return nil
}
if rs != nil {
chk := rs.NewChunk()
for {
err := rs.Next(ctx, chk)
if err != nil {
return nil
}
if chk.NumRows() == 0 {
break
}
}
}
}
v.PrepareRows()
e := &ExplainExec{
explainExec := &ExplainExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
explain: v,
}
e.rows = make([][]string, 0, len(v.Rows))
for _, row := range v.Rows {
e.rows = append(e.rows, row)
if v.Analyze {
b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
explainExec.analyzeExec = b.build(v.ExecPlan)
}
return e
return explainExec
}

func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor {
Expand Down
17 changes: 10 additions & 7 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ func (e *IndexReaderExecutor) Close() error {

// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
err := e.result.Next(ctx, chk)
if err != nil {
Expand Down Expand Up @@ -458,7 +458,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) {
lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
e.tblWorkerWg.Add(lookupConcurrencyLimit)
e.baseExecutor.ctx.GetSessionVars().StmtCtx.RuntimeStats.GetRuntimeStat(e.id + "_tableReader")
for i := 0; i < lookupConcurrencyLimit; i++ {
worker := &tableWorker{
workCh: workCh,
Expand All @@ -480,7 +479,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha
}

func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []int64) (Executor, error) {
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, &TableReaderExecutor{
tableReaderExec := &TableReaderExecutor{
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.id+"_tableReader"),
table: e.table,
physicalTableID: e.physicalTableID,
Expand All @@ -489,7 +488,11 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
}, handles)
}
// We assign `nil` to `runtimeStats` to forbidden `TableWorker` driven `IndexLookupExecutor`'s runtime stats collecting,
// because TableWorker information isn't showing in explain result now.
tableReaderExec.runtimeStats = nil
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles)
if err != nil {
log.Error(err)
return nil, errors.Trace(err)
Expand Down Expand Up @@ -518,9 +521,9 @@ func (e *IndexLookUpExecutor) Close() error {

// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for {
Expand Down
34 changes: 18 additions & 16 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type baseExecutor struct {
maxChunkSize int
children []Executor
retFieldTypes []*types.FieldType
runtimeStat *execdetails.RuntimeStat
runtimeStats *execdetails.RuntimeStats
}

// Open initializes children recursively and "childrenResults" according to children's schemas.
Expand Down Expand Up @@ -134,7 +134,9 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
schema: schema,
initCap: ctx.GetSessionVars().MaxChunkSize,
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
runtimeStat: ctx.GetSessionVars().StmtCtx.RuntimeStats.GetRuntimeStat(id),
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.Get(e.id)
}
if schema != nil {
cols := schema.Columns
Expand Down Expand Up @@ -177,9 +179,9 @@ type CancelDDLJobsExec struct {

// Next implements the Executor Next interface.
func (e *CancelDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobIDs) {
Expand Down Expand Up @@ -623,9 +625,9 @@ type LimitExec struct {

// Next implements the Executor Next interface.
func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.cursor >= e.end {
Expand Down Expand Up @@ -746,9 +748,9 @@ func (e *TableDualExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *TableDualExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.numReturned >= e.numDualRows {
Expand Down Expand Up @@ -801,9 +803,9 @@ func (e *SelectionExec) Close() error {

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)

Expand Down Expand Up @@ -880,9 +882,9 @@ type TableScanExec struct {

// Next implements the Executor Next interface.
func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.isVirtualTable {
Expand Down Expand Up @@ -984,9 +986,9 @@ func (e *MaxOneRowExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.evaluated {
Expand Down Expand Up @@ -1130,9 +1132,9 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) {

// Next implements the Executor Next interface.
func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if !e.initialized {
Expand Down
46 changes: 44 additions & 2 deletions executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"github.com/cznic/mathutil"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util/chunk"
"golang.org/x/net/context"
)
Expand All @@ -23,18 +24,39 @@ import (
type ExplainExec struct {
baseExecutor

rows [][]string
cursor int
explain *core.Explain
analyzeExec Executor
rows [][]string
cursor int
}

// Open implements the Executor Open interface.
func (e *ExplainExec) Open(ctx context.Context) error {
if e.analyzeExec != nil {
return e.analyzeExec.Open(ctx)
}
return nil
}

// Close implements the Executor Close interface.
func (e *ExplainExec) Close() error {
if e.analyzeExec != nil {
e.analyzeExec.Close()
}
e.rows = nil
return nil
}

// Next implements the Executor Next interface.
func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.rows == nil {
var err error
e.rows, err = e.generateExplainInfo(ctx)
if err != nil {
return err
}
}

chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.rows) {
return nil
Expand All @@ -49,3 +71,23 @@ func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error {
e.cursor += numCurRows
return nil
}

func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, error) {
if e.analyzeExec != nil {
chk := e.analyzeExec.newFirstChunk()
for {
err := e.analyzeExec.Next(ctx, chk)
if err != nil {
return nil, err
}
if chk.NumRows() == 0 {
break
}
}
}
e.explain.RenderResult()
if e.analyzeExec != nil {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = nil
}
return e.explain.Rows, nil
}
4 changes: 2 additions & 2 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork

// Next implements the Executor interface.
func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
e.joinResult.Reset()
Expand Down
8 changes: 4 additions & 4 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,9 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
// step 1. fetch data from inner child and build a hash table;
// step 2. fetch data from outer child in a background goroutine and probe the hash table in multiple join workers.
func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
if !e.prepared {
e.innerFinished = make(chan error, 1)
Expand Down Expand Up @@ -726,9 +726,9 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {

// Next implements the Executor interface.
func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for {
Expand Down
4 changes: 2 additions & 2 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ func (e *MergeJoinExec) prepare(ctx context.Context, chk *chunk.Chunk) error {

// Next implements the Executor Next interface.
func (e *MergeJoinExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.prepared {
Expand Down
4 changes: 2 additions & 2 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ func (e *ProjectionExec) Open(ctx context.Context) error {
// +------------------------------+ +----------------------+
//
func (e *ProjectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.isUnparallelExec() {
Expand Down
8 changes: 4 additions & 4 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (e *SortExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.fetched {
Expand Down Expand Up @@ -301,9 +301,9 @@ func (e *TopNExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *TopNExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.fetched {
Expand Down
4 changes: 2 additions & 2 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
// Next fills data into the chunk passed by its caller.
// The task was actually done by tableReaderHandler.
func (e *TableReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
if err := e.resultHandler.nextChunk(ctx, chk); err != nil {
e.feedback.Invalidate()
Expand Down
Loading

0 comments on commit 458c0d1

Please sign in to comment.