Skip to content

Commit

Permalink
executor: avoid ProjectoinExec's goroutine leak (#14127)
Browse files Browse the repository at this point in the history
  • Loading branch information
fzhedu authored and sre-bot committed Dec 25, 2019
1 parent efd34cb commit aa2f716
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 10 deletions.
35 changes: 26 additions & 9 deletions executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package executor
import (
"context"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mathutil"
)

// ExplainExec represents an explain executor.
Expand Down Expand Up @@ -70,23 +71,39 @@ func (e *ExplainExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, error) {
func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string, err error) {
closed := false
defer func() {
if !closed && e.analyzeExec != nil {
err = e.analyzeExec.Close()
closed = true
}
}()
if e.analyzeExec != nil {
chk := newFirstChunk(e.analyzeExec)
var nextErr, closeErr error
for {
err := Next(ctx, e.analyzeExec, chk)
if err != nil {
return nil, err
}
if chk.NumRows() == 0 {
nextErr = Next(ctx, e.analyzeExec, chk)
if nextErr != nil || chk.NumRows() == 0 {
break
}
}
if err := e.analyzeExec.Close(); err != nil {
closeErr = e.analyzeExec.Close()
closed = true
if nextErr != nil {
if closeErr != nil {
err = errors.New(nextErr.Error() + ", " + closeErr.Error())
} else {
err = nextErr
}
} else if closeErr != nil {
err = closeErr
}
if err != nil {
return nil, err
}
}
if err := e.explain.RenderResult(); err != nil {
if err = e.explain.RenderResult(); err != nil {
return nil, err
}
if e.analyzeExec != nil {
Expand Down
92 changes: 92 additions & 0 deletions executor/explain_unit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
"errors"
"testing"

"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mock"
)

var (
_ Executor = &mockErrorOperator{}
)

type mockErrorOperator struct {
baseExecutor
toPanic bool
closed bool
}

func (e *mockErrorOperator) Open(ctx context.Context) error {
return nil
}

func (e *mockErrorOperator) Next(ctx context.Context, req *chunk.Chunk) error {
if e.toPanic {
panic("next panic")
} else {
return errors.New("next error")
}
}

func (e *mockErrorOperator) Close() error {
e.closed = true
return errors.New("close error")
}

func getColumns() []*expression.Column {
return []*expression.Column{
{Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)},
}
}

// close() must be called after next() to avoid goroutines leak
func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
schema := expression.NewSchema(getColumns()...)
baseExec := newBaseExecutor(ctx, schema, nil)
explainExec := &ExplainExec{
baseExecutor: baseExec,
explain: nil,
}
// mockErrorOperator returns errors
mockOper := mockErrorOperator{baseExec, false, false}
explainExec.analyzeExec = &mockOper
tmpCtx := context.Background()
_, err := explainExec.generateExplainInfo(tmpCtx)

expectedStr := "next error, close error"
if err.Error() != expectedStr || !mockOper.closed {
t.Errorf(err.Error())
}
// mockErrorOperator panic
mockOper = mockErrorOperator{baseExec, true, false}
explainExec.analyzeExec = &mockOper
defer func() {
if panicErr := recover(); panicErr == nil || !mockOper.closed {
t.Errorf("panic test failed: without panic or close() is not called")
}
}()
_, err = explainExec.generateExplainInfo(tmpCtx)
}
1 change: 0 additions & 1 deletion executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ func (e *ProjectionExec) parallelExecute(ctx context.Context, chk *chunk.Chunk)
if err != nil {
return err
}

mSize := output.chk.MemoryUsage()
chk.SwapColumns(output.chk)
e.memTracker.Consume(output.chk.MemoryUsage() - mSize)
Expand Down

0 comments on commit aa2f716

Please sign in to comment.