Skip to content

Commit

Permalink
*: fast point select path
Browse files Browse the repository at this point in the history
Skip optimizer and coprocessor for point select.
  • Loading branch information
coocood committed Jul 10, 2018
1 parent 4af1d23 commit 2198b32
Show file tree
Hide file tree
Showing 18 changed files with 700 additions and 77 deletions.
3 changes: 1 addition & 2 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ IndexJoin_11 4166.67 root left outer join, inner:IndexLookUp_10, outer key:test.
└─TableScan_9 10000.00 cop table:t2, keep order:false
explain update t1 set t1.c2 = 2 where t1.c1 = 1;
id count task operator info
TableReader_5 1.00 root data:TableScan_4
└─TableScan_4 1.00 cop table:t1, range:[1,1], keep order:false
Point_Select_1 1.00 root Point_Select
explain delete from t1 where t1.c2 = 1;
id count task operator info
IndexLookUp_9 10.00 root
Expand Down
3 changes: 1 addition & 2 deletions cmd/explaintest/r/explain_easy_stats.result
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ Projection_6 2481.25 root test.t1.c1, test.t1.c2, test.t1.c3, test.t2.c1, test.t
└─TableScan_20 1985.00 cop table:t2, keep order:false
explain update t1 set t1.c2 = 2 where t1.c1 = 1;
id count task operator info
TableReader_5 1.00 root data:TableScan_4
└─TableScan_4 1.00 cop table:t1, range:[1,1], keep order:false
Point_Select_1 1.00 root Point_Select
explain delete from t1 where t1.c2 = 1;
id count task operator info
IndexLookUp_9 0.00 root
Expand Down
2 changes: 2 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plan.Plan
case *plan.PhysicalTableReader:
tableScan := v.TablePlans[0].(*plan.PhysicalTableScan)
return len(tableScan.Ranges) == 1 && tableScan.Ranges[0].IsPoint(ctx.GetSessionVars().StmtCtx)
case *plan.PointSelectPlan:
return true
default:
return false
}
Expand Down
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildExecute(v)
case *plan.Explain:
return b.buildExplain(v)
case *plan.PointSelectPlan:
return b.buildPointSelect(v)
case *plan.Insert:
return b.buildInsert(v)
case *plan.LoadData:
Expand Down
181 changes: 181 additions & 0 deletions executor/point_select.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"golang.org/x/net/context"
)

func (b *executorBuilder) buildPointSelect(p *plan.PointSelectPlan) Executor {
return &PointSelectExecutor{
ctx: b.ctx,
schema: p.Schema(),
tblInfo: p.TblInfo,
idxInfo: p.IndexInfo,
idxVals: p.IndexValues,
handle: p.Handle,
startTS: b.getStartTS(),
}
}

type PointSelectExecutor struct {
ctx sessionctx.Context
schema *expression.Schema
tps []*types.FieldType
tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
done bool
}

func (e *PointSelectExecutor) Open(context.Context) error {
return nil
}

func (e *PointSelectExecutor) Close() error {
return nil
}

func (e *PointSelectExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
e.done = true
snapshot, err := e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.startTS})
if err != nil {
return errors.Trace(err)
}
e.snapshot = snapshot
if e.idxInfo != nil {
for i := range e.idxVals {
colInfo := e.tblInfo.Columns[e.idxInfo.Columns[i].Offset]
casted, err := table.CastValue(e.ctx, e.idxVals[i], colInfo)
if err != nil {
return errors.Trace(err)
}
e.idxVals[i] = casted
}
encodedIdxVals, err := codec.EncodeKey(e.ctx.GetSessionVars().StmtCtx, nil, e.idxVals...)
if err != nil {
return errors.Trace(err)
}
idxKey := tablecodec.EncodeIndexSeekKey(e.tblInfo.ID, e.idxInfo.ID, encodedIdxVals)
handleVal, err := e.get(idxKey)
if err != nil && !kv.ErrNotExist.Equal(err) {
return errors.Trace(err)
}
if len(handleVal) == 0 {
return nil
}
h, err := tables.DecodeHandle(handleVal)
if err != nil {
return errors.Trace(err)
}
e.handle = h
}
key := tablecodec.EncodeRowKeyWithHandle(e.tblInfo.ID, e.handle)
val, err := e.get(key)
if err != nil && !kv.ErrNotExist.Equal(err) {
return errors.Trace(err)
}
if len(val) == 0 {
return nil
}
colIDs := make(map[int64]int)
for i, col := range e.schema.Columns {
colIDs[col.ID] = i
}
colVals, err := tablecodec.CutRowNew(val, colIDs)
if err != nil {
return errors.Trace(err)
}
decoder := codec.NewDecoder(chk, e.ctx.GetSessionVars().GetTimeZone())
for id, offset := range colIDs {
if e.tblInfo.PKIsHandle && mysql.HasPriKeyFlag(e.schema.Columns[offset].RetType.Flag) {
chk.AppendInt64(offset, e.handle)
continue
}
if id == model.ExtraHandleID {
chk.AppendInt64(offset, e.handle)
continue
}
colVal := colVals[offset]
if len(colVal) == 0 {
colInfo := getColInfoByID(e.tblInfo, id)
d, err1 := table.GetColOriginDefaultValue(e.ctx, colInfo)
if err1 != nil {
return errors.Trace(err1)
}
chk.AppendDatum(offset, &d)
continue
}
_, err = decoder.DecodeOne(colVals[offset], offset, e.schema.Columns[offset].RetType)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

func (e *PointSelectExecutor) get(key kv.Key) (val []byte, err error) {
txn := e.ctx.Txn()
if txn != nil && txn.Valid() && !txn.IsReadOnly() {
return txn.Get(key)
}
return e.snapshot.Get(key)
}

func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
for _, col := range tbl.Columns {
if col.ID == colID {
return col
}
}
return nil
}

func (e *PointSelectExecutor) Schema() *expression.Schema {
return e.schema
}

func (e *PointSelectExecutor) retTypes() []*types.FieldType {
if e.tps == nil {
e.tps = make([]*types.FieldType, e.schema.Len())
for i := range e.schema.Columns {
e.tps[i] = e.schema.Columns[i].RetType
}
}
return e.tps
}

func (e *PointSelectExecutor) newChunk() *chunk.Chunk {
return chunk.NewChunkWithCapacity(e.retTypes(), 1)
}
2 changes: 0 additions & 2 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,8 +1581,6 @@ func (s *testIntegrationSuite) TestTimeBuiltin(c *C) {
// TODO: MySQL returns "<nil> <nil>".
result.Check(testkit.Rows("0000-00-01 <nil>"))
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1292|Incorrect datetime value: '0000-00-00 00:00:00'"))
result = tk.MustQuery("select str_to_date('2018-6-1', '%Y-%m-%d'), str_to_date('2018-6-1', '%Y-%c-%d'), str_to_date('59:20:1', '%s:%i:%k'), str_to_date('59:20:1', '%s:%i:%l')")
result.Check(testkit.Rows("2018-06-01 2018-06-01 01:20:59 01:20:59"))

// for maketime
tk.MustExec(`drop table if exists t`)
Expand Down
7 changes: 6 additions & 1 deletion expression/simple_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ func ParseSimpleExpr(ctx sessionctx.Context, exprStr string, tableInfo *model.Ta
return nil, errors.Trace(err)
}
expr := stmts[0].(*ast.SelectStmt).Fields.Fields[0].Expr
rewriter := &simpleRewriter{tbl: tableInfo, ctx: ctx}
return RewriteSimpleExpr(ctx, tableInfo, expr)
}

// RewriteSimpleExpr rewrites simple ast.ExprNode to expression.Expression.
func RewriteSimpleExpr(ctx sessionctx.Context, tbl *model.TableInfo, expr ast.ExprNode) (Expression, error) {
rewriter := &simpleRewriter{tbl: tbl, ctx: ctx}
expr.Accept(rewriter)
if rewriter.err != nil {
return nil, errors.Trace(rewriter.err)
Expand Down
2 changes: 1 addition & 1 deletion plan/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type CheckIndex struct {
IdxName string
}

// CheckIndexRange is used for checking index data, output the index values that handle within begin and end.
// CheckIndexRange is used for checking index data, output the index values that Handle within begin and end.
type CheckIndexRange struct {
baseSchemaProducer

Expand Down
6 changes: 3 additions & 3 deletions plan/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (ds *DataSource) tryToGetDualTask() (task, error) {
func (ds *DataSource) findBestTask(prop *requiredProp) (t task, err error) {
// If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself.
// So here we do nothing.
// TODO: Add a special prop to handle IndexJoin's inner plan.
// TODO: Add a special prop to Handle IndexJoin's inner plan.
// Then we can remove forceToTableScan and forceToIndexScan.
if prop == nil {
return nil, nil
Expand Down Expand Up @@ -444,8 +444,8 @@ func (is *PhysicalIndexScan) initSchema(id int, idx *model.IndexInfo, isDoubleRe
break
}
}
// If it's double read case, the first index must return handle. So we should add extra handle column
// if there isn't a handle column.
// If it's double read case, the first index must return Handle. So we should add extra Handle column
// if there isn't a Handle column.
if isDoubleRead && !setHandle {
indexCols = append(indexCols, &expression.Column{FromID: id, ID: model.ExtraHandleID, Position: -1})
}
Expand Down
12 changes: 6 additions & 6 deletions plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega
b.optFlag = b.optFlag | flagBuildKeyInfo
b.optFlag = b.optFlag | flagAggregationOptimize
// We may apply aggregation eliminate optimization.
// So we add the flagMaxMinEliminate to try to convert max/min to topn and flagPushDownTopN to handle the newly added topn operator.
// So we add the flagMaxMinEliminate to try to convert max/min to topn and flagPushDownTopN to Handle the newly added topn operator.
b.optFlag = b.optFlag | flagMaxMinEliminate
b.optFlag = b.optFlag | flagPushDownTopN
// when we eliminate the max and min we may add `is not null` filter.
Expand Down Expand Up @@ -286,7 +286,7 @@ func (b *planBuilder) buildJoin(joinNode *ast.Join) LogicalPlan {
joinPlan.JoinType = InnerJoin
}

// Merge sub join's redundantSchema into this join plan. When handle query like
// Merge sub join's redundantSchema into this join plan. When Handle query like
// select t2.a from (t1 join t2 using (a)) join t3 using (a);
// we can simply search in the top level join plan to find redundant column.
var lRedundant, rRedundant *expression.Schema
Expand Down Expand Up @@ -1716,13 +1716,13 @@ func (ds *DataSource) newExtraHandleSchemaCol() *expression.Column {

// getStatsTable gets statistics information for a table specified by "tableID".
// A pseudo statistics table is returned in any of the following scenario:
// 1. tidb-server started and statistics handle has not been initialized.
// 1. tidb-server started and statistics Handle has not been initialized.
// 2. table row count from statistics is zero.
// 3. statistics is outdated.
func (b *planBuilder) getStatsTable(tblInfo *model.TableInfo) *statistics.Table {
statsHandle := domain.GetDomain(b.ctx).StatsHandle()

// 1. tidb-server started and statistics handle has not been initialized.
// 1. tidb-server started and statistics Handle has not been initialized.
if statsHandle == nil {
return statistics.PseudoTable(tblInfo)
}
Expand Down Expand Up @@ -1806,8 +1806,8 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan {
}
ds.SetSchema(schema)

// We append an extra handle column to the schema when "ds" is not a memory
// table e.g. table in the "INFORMATION_SCHEMA" database, and the handle
// We append an extra Handle column to the schema when "ds" is not a memory
// table e.g. table in the "INFORMATION_SCHEMA" database, and the Handle
// column is not the primary key of "ds".
isMemDB := infoschema.IsMemoryDB(ds.DBName.L)
if !isMemDB && handleCol == nil {
Expand Down
4 changes: 4 additions & 0 deletions plan/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type logicalOptRule interface {
// Optimize does optimization and creates a Plan.
// The node must be prepared first.
func Optimize(ctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (Plan, error) {
fp := tryFastPlan(ctx, node)
if fp != nil {
return fp, nil
}
ctx.GetSessionVars().PlanID = 0
builder := &planBuilder{
ctx: ctx,
Expand Down
2 changes: 1 addition & 1 deletion plan/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func SetPBColumnsDefaultValue(ctx sessionctx.Context, pbColumns []*tipb.ColumnIn
}

// SupportStreaming returns true if a pushed down operation supports using coprocessor streaming API.
// Note that this function handle pushed down physical plan only! It's called in constructDAGReq.
// Note that this function Handle pushed down physical plan only! It's called in constructDAGReq.
// Some plans are difficult (if possible) to implement streaming, and some are pointless to do so.
// TODO: Support more kinds of physical plan.
func SupportStreaming(p PhysicalPlan) bool {
Expand Down
Loading

0 comments on commit 2198b32

Please sign in to comment.