Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: move irrelevant code out of package "distsql" #5893

Merged
merged 4 commits into from
Feb 24, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 8 additions & 91 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -68,11 +66,10 @@ type PartialResult interface {
}

type selectResult struct {
label string
aggregate bool
resp kv.Response
label string
resp kv.Response

results chan newResultWithErr
results chan resultWithErr
closed chan struct{}

rowLen int
Expand All @@ -86,7 +83,7 @@ type selectResult struct {
partialCount int64 // number of partial results.
}

type newResultWithErr struct {
type resultWithErr struct {
result []byte
err error
}
Expand All @@ -107,15 +104,15 @@ func (r *selectResult) fetch(goCtx goctx.Context) {
for {
resultSubset, err := r.resp.Next(goCtx)
if err != nil {
r.results <- newResultWithErr{err: errors.Trace(err)}
r.results <- resultWithErr{err: errors.Trace(err)}
return
}
if resultSubset == nil {
return
}

select {
case r.results <- newResultWithErr{result: resultSubset}:
case r.results <- resultWithErr{result: resultSubset}:
case <-r.closed:
// If selectResult called Close() already, make fetch goroutine exit.
return
Expand Down Expand Up @@ -321,7 +318,7 @@ func Select(goCtx goctx.Context, ctx sessionctx.Context, kvReq *kv.Request, fiel
return &selectResult{
label: "dag",
resp: resp,
results: make(chan newResultWithErr, kvReq.Concurrency),
results: make(chan resultWithErr, kvReq.Concurrency),
closed: make(chan struct{}),
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
Expand All @@ -338,7 +335,7 @@ func Analyze(ctx goctx.Context, client kv.Client, kvReq *kv.Request) (SelectResu
result := &selectResult{
label: "analyze",
resp: resp,
results: make(chan newResultWithErr, kvReq.Concurrency),
results: make(chan resultWithErr, kvReq.Concurrency),
closed: make(chan struct{}),
}
return result, nil
Expand All @@ -348,83 +345,3 @@ func Analyze(ctx goctx.Context, client kv.Client, kvReq *kv.Request) (SelectResu
const (
codeInvalidResp = 1
)

// FieldTypeFromPBColumn creates a types.FieldType from tipb.ColumnInfo.
func FieldTypeFromPBColumn(col *tipb.ColumnInfo) *types.FieldType {
return &types.FieldType{
Tp: byte(col.GetTp()),
Flag: uint(col.Flag),
Flen: int(col.GetColumnLen()),
Decimal: int(col.GetDecimal()),
Elems: col.Elems,
Collate: mysql.Collations[uint8(col.GetCollation())],
}
}

func columnToProto(c *model.ColumnInfo) *tipb.ColumnInfo {
pc := &tipb.ColumnInfo{
ColumnId: c.ID,
Collation: collationToProto(c.FieldType.Collate),
ColumnLen: int32(c.FieldType.Flen),
Decimal: int32(c.FieldType.Decimal),
Flag: int32(c.Flag),
Elems: c.Elems,
}
pc.Tp = int32(c.FieldType.Tp)
return pc
}

// TODO: update it when more collate is supported.
func collationToProto(c string) int32 {
v := mysql.CollationNames[c]
if v == mysql.BinaryCollationID {
return int32(mysql.BinaryCollationID)
}
// We only support binary and utf8_bin collation.
// Setting other collations to utf8_bin for old data compatibility.
// For the data created when we didn't enforce utf8_bin collation in create table.
return int32(mysql.DefaultCollationID)
}

// ColumnsToProto converts a slice of model.ColumnInfo to a slice of tipb.ColumnInfo.
func ColumnsToProto(columns []*model.ColumnInfo, pkIsHandle bool) []*tipb.ColumnInfo {
cols := make([]*tipb.ColumnInfo, 0, len(columns))
for _, c := range columns {
col := columnToProto(c)
// TODO: Here `PkHandle`'s meaning is changed, we will change it to `IsHandle` when tikv's old select logic
// is abandoned.
if (pkIsHandle && mysql.HasPriKeyFlag(c.Flag)) || c.ID == model.ExtraHandleID {
col.PkHandle = true
} else {
col.PkHandle = false
}
cols = append(cols, col)
}
return cols
}

// IndexToProto converts a model.IndexInfo to a tipb.IndexInfo.
func IndexToProto(t *model.TableInfo, idx *model.IndexInfo) *tipb.IndexInfo {
pi := &tipb.IndexInfo{
TableId: t.ID,
IndexId: idx.ID,
Unique: idx.Unique,
}
cols := make([]*tipb.ColumnInfo, 0, len(idx.Columns)+1)
for _, c := range idx.Columns {
cols = append(cols, columnToProto(t.Columns[c.Offset]))
}
if t.PKIsHandle {
// Coprocessor needs to know PKHandle column info, so we need to append it.
for _, col := range t.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
colPB := columnToProto(col)
colPB.PkHandle = true
cols = append(cols, colPB)
break
}
}
}
pi.Columns = cols
return pi
}
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plan.AnalyzeColumnsTa
BucketSize: maxBucketSize,
SampleSize: maxRegionSampleSize,
SketchSize: maxSketchSize,
ColumnsInfo: distsql.ColumnsToProto(cols, task.TableInfo.PKIsHandle),
ColumnsInfo: plan.ColumnsToProto(cols, task.TableInfo.PKIsHandle),
CmsketchDepth: &depth,
CmsketchWidth: &width,
}
Expand Down
74 changes: 71 additions & 3 deletions plan/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ package plan

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -93,7 +93,7 @@ func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error)
columns := p.Columns
tsExec := &tipb.TableScan{
TableId: p.Table.ID,
Columns: distsql.ColumnsToProto(columns, p.Table.PKIsHandle),
Columns: ColumnsToProto(columns, p.Table.PKIsHandle),
Desc: p.Desc,
}
err := setPBColumnsDefaultValue(ctx, tsExec.Columns, p.Columns)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (p *PhysicalIndexScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error)
idxExec := &tipb.IndexScan{
TableId: p.Table.ID,
IndexId: p.Index.ID,
Columns: distsql.ColumnsToProto(columns, p.Table.PKIsHandle),
Columns: ColumnsToProto(columns, p.Table.PKIsHandle),
Desc: p.Desc,
}
unique := checkCoverIndex(p.Index, p.Ranges)
Expand Down Expand Up @@ -162,3 +162,71 @@ func setPBColumnsDefaultValue(ctx sessionctx.Context, pbColumns []*tipb.ColumnIn
}
return nil
}

// ColumnsToProto converts a slice of model.ColumnInfo to a slice of tipb.ColumnInfo.
func ColumnsToProto(columns []*model.ColumnInfo, pkIsHandle bool) []*tipb.ColumnInfo {
cols := make([]*tipb.ColumnInfo, 0, len(columns))
for _, c := range columns {
col := columnToProto(c)
// TODO: Here `PkHandle`'s meaning is changed, we will change it to `IsHandle` when tikv's old select logic
// is abandoned.
if (pkIsHandle && mysql.HasPriKeyFlag(c.Flag)) || c.ID == model.ExtraHandleID {
col.PkHandle = true
} else {
col.PkHandle = false
}
cols = append(cols, col)
}
return cols
}

// IndexToProto converts a model.IndexInfo to a tipb.IndexInfo.
func IndexToProto(t *model.TableInfo, idx *model.IndexInfo) *tipb.IndexInfo {
pi := &tipb.IndexInfo{
TableId: t.ID,
IndexId: idx.ID,
Unique: idx.Unique,
}
cols := make([]*tipb.ColumnInfo, 0, len(idx.Columns)+1)
for _, c := range idx.Columns {
cols = append(cols, columnToProto(t.Columns[c.Offset]))
}
if t.PKIsHandle {
// Coprocessor needs to know PKHandle column info, so we need to append it.
for _, col := range t.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
colPB := columnToProto(col)
colPB.PkHandle = true
cols = append(cols, colPB)
break
}
}
}
pi.Columns = cols
return pi
}

func columnToProto(c *model.ColumnInfo) *tipb.ColumnInfo {
pc := &tipb.ColumnInfo{
ColumnId: c.ID,
Collation: collationToProto(c.FieldType.Collate),
ColumnLen: int32(c.FieldType.Flen),
Decimal: int32(c.FieldType.Decimal),
Flag: int32(c.Flag),
Elems: c.Elems,
}
pc.Tp = int32(c.FieldType.Tp)
return pc
}

// TODO: update it when more collate is supported.
func collationToProto(c string) int32 {
v := mysql.CollationNames[c]
if v == mysql.BinaryCollationID {
return int32(mysql.BinaryCollationID)
}
// We only support binary and utf8_bin collation.
// Setting other collations to utf8_bin for old data compatibility.
// For the data created when we didn't enforce utf8_bin collation in create table.
return int32(mysql.DefaultCollationID)
}
43 changes: 4 additions & 39 deletions distsql/distsql_test.go → plan/plan_to_pb_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016 PingCAP, Inc.
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,26 +11,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package distsql
package plan

import (
"errors"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tipb/go-tipb"
goctx "golang.org/x/net/context"
)

func TestT(t *testing.T) {
CustomVerboseFlag = true
TestingT(t)
}

var _ = Suite(&testDistsqlSuite{})

type testDistsqlSuite struct{}
Expand All @@ -45,9 +36,8 @@ func (s *testDistsqlSuite) TestColumnToProto(c *C) {
FieldType: *tp,
}
pc := columnToProto(col)
c.Assert(pc.GetFlag(), Equals, int32(10))
ntp := FieldTypeFromPBColumn(pc)
c.Assert(ntp, DeepEquals, tp)
expect := &tipb.ColumnInfo{ColumnId: 0, Tp: 3, Collation: 83, ColumnLen: -1, Decimal: -1, Flag: 10, Elems: []string(nil), DefaultVal: []uint8(nil), PkHandle: false, XXX_unrecognized: []uint8(nil)}
c.Assert(pc, DeepEquals, expect)

cols := []*model.ColumnInfo{col, col}
pcs := ColumnsToProto(cols, false)
Expand Down Expand Up @@ -131,28 +121,3 @@ func (s *testDistsqlSuite) TestIndexToProto(c *C) {
c.Assert(pIdx.IndexId, Equals, int64(1))
c.Assert(pIdx.Unique, Equals, true)
}

type mockResponse struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used any more?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, reverted.

count int
}

func (resp *mockResponse) Next(ctx goctx.Context) ([]byte, error) {
resp.count++
if resp.count == 100 {
return nil, errors.New("error happened")
}
return mockSubresult(), nil
}

func (resp *mockResponse) Close() error {
return nil
}

func mockSubresult() []byte {
resp := new(tipb.SelectResponse)
b, err := resp.Marshal()
if err != nil {
panic(err)
}
return b
}
3 changes: 1 addition & 2 deletions store/mockstore/mocktikv/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
Expand Down Expand Up @@ -133,7 +132,7 @@ func (h *rpcHandler) handleAnalyzeColumnsReq(req *coprocessor.Request, analyzeRe
for i := range e.fields {
rf := new(ast.ResultField)
rf.Column = new(model.ColumnInfo)
rf.Column.FieldType = *distsql.FieldTypeFromPBColumn(columns[i])
rf.Column.FieldType = *fieldTypeFromPBColumn(columns[i])
e.fields[i] = rf
}

Expand Down
15 changes: 13 additions & 2 deletions store/mockstore/mocktikv/cop_handler_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -353,7 +352,7 @@ func (e *evalContext) setColumnInfo(cols []*tipb.ColumnInfo) {
e.colIDs = make(map[int64]int)
e.fieldTps = make([]*types.FieldType, 0, len(e.columnInfos))
for i, col := range e.columnInfos {
ft := distsql.FieldTypeFromPBColumn(col)
ft := fieldTypeFromPBColumn(col)
e.fieldTps = append(e.fieldTps, ft)
e.colIDs[col.GetColumnId()] = i
}
Expand Down Expand Up @@ -614,3 +613,15 @@ func extractOffsetsInExpr(expr *tipb.Expr, columns []*tipb.ColumnInfo, collector
}
return collector, nil
}

// fieldTypeFromPBColumn creates a types.FieldType from tipb.ColumnInfo.
func fieldTypeFromPBColumn(col *tipb.ColumnInfo) *types.FieldType {
return &types.FieldType{
Tp: byte(col.GetTp()),
Flag: uint(col.Flag),
Flen: int(col.GetColumnLen()),
Decimal: int(col.GetDecimal()),
Elems: col.Elems,
Collate: mysql.Collations[uint8(col.GetCollation())],
}
}
2 changes: 1 addition & 1 deletion util/arena/arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewAllocator(capacity int) *SimpleAllocator {
// Alloc implements Allocator.AllocBytes interface.
func (s *SimpleAllocator) Alloc(capacity int) []byte {
if s.off+capacity < cap(s.arena) {
slice := s.arena[s.off : s.off : s.off+capacity]
slice := s.arena[s.off:s.off : s.off+capacity]
s.off += capacity
return slice
}
Expand Down