Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fast path point select #6937

Merged
merged 21 commits into from
Jul 30, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ IndexJoin_11 4166.67 root left outer join, inner:IndexLookUp_10, outer key:test.
└─TableScan_9 10.00 cop table:t2, keep order:false, stats:pseudo
explain update t1 set t1.c2 = 2 where t1.c1 = 1;
id count task operator info
TableReader_5 1.00 root data:TableScan_4
└─TableScan_4 1.00 cop table:t1, range:[1,1], keep order:false, stats:pseudo
Point_Select_1 1.00 root Point_Select
explain delete from t1 where t1.c2 = 1;
id count task operator info
IndexLookUp_9 10.00 root
Expand Down
3 changes: 1 addition & 2 deletions cmd/explaintest/r/explain_easy_stats.result
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ Projection_6 2481.25 root test.t1.c1, test.t1.c2, test.t1.c3, test.t2.c1, test.t
└─TableScan_20 1985.00 cop table:t2, keep order:false
explain update t1 set t1.c2 = 2 where t1.c1 = 1;
id count task operator info
TableReader_5 1.00 root data:TableScan_4
└─TableScan_4 1.00 cop table:t1, range:[1,1], keep order:false
Point_Select_1 1.00 root Point_Select
explain delete from t1 where t1.c2 = 1;
id count task operator info
IndexLookUp_9 0.00 root
Expand Down
2 changes: 2 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plan.Plan
case *plan.PhysicalTableReader:
tableScan := v.TablePlans[0].(*plan.PhysicalTableScan)
return len(tableScan.Ranges) == 1 && tableScan.Ranges[0].IsPoint(ctx.GetSessionVars().StmtCtx)
case *plan.PointSelectPlan:
return true
default:
return false
}
Expand Down
10 changes: 6 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildExecute(v)
case *plan.Explain:
return b.buildExplain(v)
case *plan.PointSelectPlan:
return b.buildPointSelect(v)
case *plan.Insert:
return b.buildInsert(v)
case *plan.LoadData:
Expand Down Expand Up @@ -1029,7 +1031,7 @@ func (b *executorBuilder) buildProjection(v *plan.PhysicalProjection) Executor {
// If the calculation row count for this Projection operator is smaller
// than a Chunk size, we turn back to the un-parallel Projection
// implementation to reduce the goroutine overhead.
if v.StatsInfo().Count() < int64(b.ctx.GetSessionVars().MaxChunkSize) {
if int64(v.StatsCount()) < int64(b.ctx.GetSessionVars().MaxChunkSize) {
e.numWorkers = 0
}
return e
Expand Down Expand Up @@ -1482,7 +1484,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plan.PhysicalTableReader) (*
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
} else {
e.feedback = statistics.NewQueryFeedback(ts.Table.ID, ts.Hist, ts.StatsInfo().Count(), ts.Desc)
e.feedback = statistics.NewQueryFeedback(ts.Table.ID, ts.Hist, int64(ts.StatsCount()), ts.Desc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why make this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go lint reports error because it returns an unexported type statsInfo.

}
collect := e.feedback.CollectFeedback(len(ts.Ranges))
e.dagPB.CollectRangeCounts = &collect
Expand Down Expand Up @@ -1537,7 +1539,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plan.PhysicalIndexReader) (*
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
} else {
e.feedback = statistics.NewQueryFeedback(is.Table.ID, is.Hist, is.StatsInfo().Count(), is.Desc)
e.feedback = statistics.NewQueryFeedback(is.Table.ID, is.Hist, int64(is.StatsCount()), is.Desc)
}
collect := e.feedback.CollectFeedback(len(is.Ranges))
e.dagPB.CollectRangeCounts = &collect
Expand Down Expand Up @@ -1604,7 +1606,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plan.PhysicalIndexLook
if containsLimit(indexReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
} else {
e.feedback = statistics.NewQueryFeedback(is.Table.ID, is.Hist, is.StatsInfo().Count(), is.Desc)
e.feedback = statistics.NewQueryFeedback(is.Table.ID, is.Hist, int64(is.StatsCount()), is.Desc)
}
// do not collect the feedback for table request.
collectTable := false
Expand Down
2 changes: 1 addition & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func isExpensiveQuery(p plan.Plan) bool {

func isPhysicalPlanExpensive(p plan.PhysicalPlan) bool {
expensiveRowThreshold := int64(config.GetGlobalConfig().Log.ExpensiveThreshold)
if p.StatsInfo().Count() > expensiveRowThreshold {
if int64(p.StatsCount()) > expensiveRowThreshold {
return true
}

Expand Down
186 changes: 186 additions & 0 deletions executor/point_select.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"golang.org/x/net/context"
)

func (b *executorBuilder) buildPointSelect(p *plan.PointSelectPlan) Executor {
return &PointSelectExecutor{
ctx: b.ctx,
schema: p.Schema(),
tblInfo: p.TblInfo,
idxInfo: p.IndexInfo,
idxVals: p.IndexValues,
handle: p.Handle,
startTS: b.getStartTS(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use math.MaxUint64 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The statement may be UPDATE/DELETE or in an explicit transaction.

}
}

// PointSelectExecutor executes point select query.
type PointSelectExecutor struct {
ctx sessionctx.Context
schema *expression.Schema
tps []*types.FieldType
tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
done bool
}

// Open implements the Executor interface.
func (e *PointSelectExecutor) Open(context.Context) error {
return nil
}

// Close implements the Executor interface.
func (e *PointSelectExecutor) Close() error {
return nil
}

// Next implements the Executor interface.
func (e *PointSelectExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
e.done = true
snapshot, err1 := e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.startTS})
if err1 != nil {
return errors.Trace(err1)
}
e.snapshot = snapshot
if e.idxInfo != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next function is too long, this if branch can be extract to a function like getHandleFromIdxValue

for i := range e.idxVals {
colInfo := e.tblInfo.Columns[e.idxInfo.Columns[i].Offset]
casted, err := table.CastValue(e.ctx, e.idxVals[i], colInfo)
if err != nil {
return errors.Trace(err)
}
e.idxVals[i] = casted
}
encodedIdxVals, err := codec.EncodeKey(e.ctx.GetSessionVars().StmtCtx, nil, e.idxVals...)
if err != nil {
return errors.Trace(err)
}
idxKey := tablecodec.EncodeIndexSeekKey(e.tblInfo.ID, e.idxInfo.ID, encodedIdxVals)
handleVal, err := e.get(idxKey)
if err != nil && !kv.ErrNotExist.Equal(err) {
return errors.Trace(err)
}
if len(handleVal) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this happen? Is it equal to ErrNotExist ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the key is deleted, it can be found in mem buffer but handleVal length is 0.

return nil
}
h, err := tables.DecodeHandle(handleVal)
if err != nil {
return errors.Trace(err)
}
e.handle = h
}
key := tablecodec.EncodeRowKeyWithHandle(e.tblInfo.ID, e.handle)
val, err := e.get(key)
if err != nil && !kv.ErrNotExist.Equal(err) {
return errors.Trace(err)
}
if len(val) == 0 {
return nil
}
colIDs := make(map[int64]int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following code can also be extracted to a function extractColumnsFromValue, so the Next logic would be much more intuitive:

snapshot = GetSnapshot()
handle = getHandleFromIdxValue()
key = EncodeIndexSeekKey(handle)
val = e.get(key)
extractColumnsFromValue(val)

for i, col := range e.schema.Columns {
colIDs[col.ID] = i
}
colVals, err := tablecodec.CutRowNew(val, colIDs)
if err != nil {
return errors.Trace(err)
}
decoder := codec.NewDecoder(chk, e.ctx.GetSessionVars().Location())
for id, offset := range colIDs {
if e.tblInfo.PKIsHandle && mysql.HasPriKeyFlag(e.schema.Columns[offset].RetType.Flag) {
chk.AppendInt64(offset, e.handle)
continue
}
if id == model.ExtraHandleID {
chk.AppendInt64(offset, e.handle)
continue
}
colVal := colVals[offset]
if len(colVal) == 0 {
colInfo := getColInfoByID(e.tblInfo, id)
d, err1 := table.GetColOriginDefaultValue(e.ctx, colInfo)
if err1 != nil {
return errors.Trace(err1)
}
chk.AppendDatum(offset, &d)
continue
}
_, err = decoder.DecodeOne(colVals[offset], offset, e.schema.Columns[offset].RetType)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

func (e *PointSelectExecutor) get(key kv.Key) (val []byte, err error) {
txn := e.ctx.Txn()
if txn != nil && txn.Valid() && !txn.IsReadOnly() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this happen? PointUpdate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

return txn.Get(key)
}
return e.snapshot.Get(key)
}

func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
for _, col := range tbl.Columns {
if col.ID == colID {
return col
}
}
return nil
}

// Schema implements the Executor interface.
func (e *PointSelectExecutor) Schema() *expression.Schema {
return e.schema
}

func (e *PointSelectExecutor) retTypes() []*types.FieldType {
if e.tps == nil {
e.tps = make([]*types.FieldType, e.schema.Len())
for i := range e.schema.Columns {
e.tps[i] = e.schema.Columns[i].RetType
}
}
return e.tps
}

func (e *PointSelectExecutor) newChunk() *chunk.Chunk {
return chunk.NewChunkWithCapacity(e.retTypes(), 1)
}
2 changes: 0 additions & 2 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,8 +1581,6 @@ func (s *testIntegrationSuite) TestTimeBuiltin(c *C) {
// TODO: MySQL returns "<nil> <nil>".
result.Check(testkit.Rows("0000-00-01 <nil>"))
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1292|Incorrect datetime value: '0000-00-00 00:00:00'"))
result = tk.MustQuery("select str_to_date('2018-6-1', '%Y-%m-%d'), str_to_date('2018-6-1', '%Y-%c-%d'), str_to_date('59:20:1', '%s:%i:%k'), str_to_date('59:20:1', '%s:%i:%l')")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why it's removed?

result.Check(testkit.Rows("2018-06-01 2018-06-01 01:20:59 01:20:59"))

// for maketime
tk.MustExec(`drop table if exists t`)
Expand Down
7 changes: 6 additions & 1 deletion expression/simple_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ func ParseSimpleExpr(ctx sessionctx.Context, exprStr string, tableInfo *model.Ta
return nil, errors.Trace(err)
}
expr := stmts[0].(*ast.SelectStmt).Fields.Fields[0].Expr
rewriter := &simpleRewriter{tbl: tableInfo, ctx: ctx}
return RewriteSimpleExpr(ctx, tableInfo, expr)
}

// RewriteSimpleExpr rewrites simple ast.ExprNode to expression.Expression.
func RewriteSimpleExpr(ctx sessionctx.Context, tbl *model.TableInfo, expr ast.ExprNode) (Expression, error) {
rewriter := &simpleRewriter{tbl: tbl, ctx: ctx}
expr.Accept(rewriter)
if rewriter.err != nil {
return nil, errors.Trace(rewriter.err)
Expand Down
4 changes: 2 additions & 2 deletions plan/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type CheckIndex struct {
IdxName string
}

// CheckIndexRange is used for checking index data, output the index values that handle within begin and end.
// CheckIndexRange is used for checking index data, output the index values that Handle within begin and end.
type CheckIndexRange struct {
baseSchemaProducer

Expand Down Expand Up @@ -439,7 +439,7 @@ func (e *Explain) explainPlanInRowFormat(p PhysicalPlan, taskType, indent string
// operator id, task type, operator info, and the estemated row count.
func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent string, isLastChild bool) {
operatorInfo := p.ExplainInfo()
count := string(strconv.AppendFloat([]byte{}, p.StatsInfo().count, 'f', 2, 64))
count := string(strconv.AppendFloat([]byte{}, p.statsInfo().count, 'f', 2, 64))
row := []string{e.prettyIdentifier(p.ExplainID(), indent, isLastChild), count, taskType, operatorInfo}
e.Rows = append(e.Rows, row)
}
Expand Down
6 changes: 3 additions & 3 deletions plan/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (ds *DataSource) tryToGetDualTask() (task, error) {
func (ds *DataSource) findBestTask(prop *requiredProp) (t task, err error) {
// If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself.
// So here we do nothing.
// TODO: Add a special prop to handle IndexJoin's inner plan.
// TODO: Add a special prop to Handle IndexJoin's inner plan.
// Then we can remove forceToTableScan and forceToIndexScan.
if prop == nil {
return nil, nil
Expand Down Expand Up @@ -416,8 +416,8 @@ func (is *PhysicalIndexScan) initSchema(id int, idx *model.IndexInfo, isDoubleRe
break
}
}
// If it's double read case, the first index must return handle. So we should add extra handle column
// if there isn't a handle column.
// If it's double read case, the first index must return Handle. So we should add extra Handle column
// if there isn't a Handle column.
if isDoubleRead && !setHandle {
indexCols = append(indexCols, &expression.Column{FromID: id, ID: model.ExtraHandleID, Position: -1})
}
Expand Down
12 changes: 6 additions & 6 deletions plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega
b.optFlag = b.optFlag | flagBuildKeyInfo
b.optFlag = b.optFlag | flagAggregationOptimize
// We may apply aggregation eliminate optimization.
// So we add the flagMaxMinEliminate to try to convert max/min to topn and flagPushDownTopN to handle the newly added topn operator.
// So we add the flagMaxMinEliminate to try to convert max/min to topn and flagPushDownTopN to Handle the newly added topn operator.
b.optFlag = b.optFlag | flagMaxMinEliminate
b.optFlag = b.optFlag | flagPushDownTopN
// when we eliminate the max and min we may add `is not null` filter.
Expand Down Expand Up @@ -286,7 +286,7 @@ func (b *planBuilder) buildJoin(joinNode *ast.Join) LogicalPlan {
joinPlan.JoinType = InnerJoin
}

// Merge sub join's redundantSchema into this join plan. When handle query like
// Merge sub join's redundantSchema into this join plan. When Handle query like
// select t2.a from (t1 join t2 using (a)) join t3 using (a);
// we can simply search in the top level join plan to find redundant column.
var lRedundant, rRedundant *expression.Schema
Expand Down Expand Up @@ -1716,13 +1716,13 @@ func (ds *DataSource) newExtraHandleSchemaCol() *expression.Column {

// getStatsTable gets statistics information for a table specified by "tableID".
// A pseudo statistics table is returned in any of the following scenario:
// 1. tidb-server started and statistics handle has not been initialized.
// 1. tidb-server started and statistics Handle has not been initialized.
// 2. table row count from statistics is zero.
// 3. statistics is outdated.
func (b *planBuilder) getStatsTable(tblInfo *model.TableInfo) *statistics.Table {
statsHandle := domain.GetDomain(b.ctx).StatsHandle()

// 1. tidb-server started and statistics handle has not been initialized.
// 1. tidb-server started and statistics Handle has not been initialized.
if statsHandle == nil {
return statistics.PseudoTable(tblInfo)
}
Expand Down Expand Up @@ -1807,8 +1807,8 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan {
}
ds.SetSchema(schema)

// We append an extra handle column to the schema when "ds" is not a memory
// table e.g. table in the "INFORMATION_SCHEMA" database, and the handle
// We append an extra Handle column to the schema when "ds" is not a memory
// table e.g. table in the "INFORMATION_SCHEMA" database, and the Handle
// column is not the primary key of "ds".
isMemDB := infoschema.IsMemoryDB(ds.DBName.L)
if !isMemDB && handleCol == nil {
Expand Down
4 changes: 4 additions & 0 deletions plan/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type logicalOptRule interface {
// Optimize does optimization and creates a Plan.
// The node must be prepared first.
func Optimize(ctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (Plan, error) {
fp := tryFastPlan(ctx, node)
if fp != nil {
return fp, nil
}
ctx.GetSessionVars().PlanID = 0
builder := &planBuilder{
ctx: ctx,
Expand Down
Loading