Skip to content

Commit

Permalink
Merge branch 'get-token-metrics' of github.com:tiancaiamao/tidb into …
Browse files Browse the repository at this point in the history
…get-token-metrics
  • Loading branch information
tiancaiamao committed Jul 23, 2018
2 parents ca395ae + 1cab197 commit 78d07f9
Show file tree
Hide file tree
Showing 14 changed files with 318 additions and 183 deletions.
12 changes: 10 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ type TiKVClient struct {
// GrpcConnectionCount is the max gRPC connections that will be established
// with each tikv-server.
GrpcConnectionCount uint `toml:"grpc-connection-count" json:"grpc-connection-count"`
// After a duration of this time in seconds if the client doesn't see any activity it pings
// the server to see if the transport is still alive.
GrpcKeepAliveTime uint `toml:"grpc-keepalive-time" json:"grpc-keepalive-time"`
// After having pinged for keepalive check, the client waits for a duration of Timeout in seconds
// and if no activity is seen even after that the connection is closed.
GrpcKeepAliveTimeout uint `toml:"grpc-keepalive-timeout" json:"grpc-keepalive-timeout"`
// CommitTimeout is the max time which command 'commit' will wait.
CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"`
}
Expand Down Expand Up @@ -301,8 +307,10 @@ var defaultConf = Config{
Reporter: OpenTracingReporter{},
},
TiKVClient: TiKVClient{
GrpcConnectionCount: 16,
CommitTimeout: "41s",
GrpcConnectionCount: 16,
GrpcKeepAliveTime: 10,
GrpcKeepAliveTimeout: 3,
CommitTimeout: "41s",
},
Binlog: Binlog{
WriteTimeout: "15s",
Expand Down
8 changes: 8 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ local-agent-host-port = ""
# Max gRPC connections that will be established with each tikv-server.
grpc-connection-count = 16

# After a duration of this time in seconds if the client doesn't see any activity it pings
# the server to see if the transport is still alive.
grpc-keepalive-time = 10

# After having pinged for keepalive check, the client waits for a duration of Timeout in seconds
# and if no activity is seen even after that the connection is closed.
grpc-keepalive-timeout = 3

# max time for commit command, must be twice bigger than raft election timeout.
commit-timeout = "41s"

Expand Down
1 change: 1 addition & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func buildIndexColumns(columns []*model.ColumnInfo, idxColNames []*ast.IndexColN
Name: col.Name,
Offset: col.Offset,
Length: ic.Length,
Tp: &col.FieldType,
})
}

Expand Down
3 changes: 3 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ func (b *executorBuilder) buildDDL(v *plan.DDL) Executor {
return e
}

// buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`.
func (b *executorBuilder) buildExplain(v *plan.Explain) Executor {
e := &ExplainExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
Expand Down Expand Up @@ -1494,6 +1495,8 @@ func buildNoRangeTableReader(b *executorBuilder, v *plan.PhysicalTableReader) (*
return e, nil
}

// buildTableReader builds a table reader executor. It first build a no range table reader,
// and then update it ranges from table scan plan.
func (b *executorBuilder) buildTableReader(v *plan.PhysicalTableReader) *TableReaderExecutor {
ret, err := buildNoRangeTableReader(b, v)
if err != nil {
Expand Down
168 changes: 0 additions & 168 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,114 +172,6 @@ func handleIsExtra(col *expression.Column) bool {
return false
}

// TableReaderExecutor sends dag request and reads table data from kv layer.
type TableReaderExecutor struct {
baseExecutor

table table.Table
tableID int64
keepOrder bool
desc bool
ranges []*ranger.Range
dagPB *tipb.DAGRequest
// columns are only required by union scan.
columns []*model.ColumnInfo

// resultHandler handles the order of the result. Since (MAXInt64, MAXUint64] stores before [0, MaxInt64] physically
// for unsigned int.
resultHandler *tableResultHandler
streaming bool
feedback *statistics.QueryFeedback

// corColInFilter tells whether there's correlated column in filter.
corColInFilter bool
// corColInAccess tells whether there's correlated column in access conditions.
corColInAccess bool
plans []plan.PhysicalPlan
}

// Close implements the Executor Close interface.
func (e *TableReaderExecutor) Close() error {
e.ctx.StoreQueryFeedback(e.feedback)
err := e.resultHandler.Close()
return errors.Trace(err)
}

// Next fills data into the chunk passed by its caller.
// The task was actually done by tableReaderHandler.
func (e *TableReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
err := e.resultHandler.nextChunk(ctx, chk)
if err != nil {
e.feedback.Invalidate()
}
return errors.Trace(err)
}

// Open initialzes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
span, ctx := startSpanFollowsContext(ctx, "executor.TableReader.Open")
defer span.Finish()

var err error
if e.corColInFilter {
e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans)
if err != nil {
return errors.Trace(err)
}
}
if e.corColInAccess {
ts := e.plans[0].(*plan.PhysicalTableScan)
access := ts.AccessCondition
pkTP := ts.Table.GetPkColInfo().FieldType
e.ranges, err = ranger.BuildTableRange(access, e.ctx.GetSessionVars().StmtCtx, &pkTP)
if err != nil {
return errors.Trace(err)
}
}

e.resultHandler = &tableResultHandler{}
firstPartRanges, secondPartRanges := splitRanges(e.ranges, e.keepOrder)
firstResult, err := e.buildResp(ctx, firstPartRanges)
if err != nil {
e.feedback.Invalidate()
return errors.Trace(err)
}
if len(secondPartRanges) == 0 {
e.resultHandler.open(nil, firstResult)
return nil
}
var secondResult distsql.SelectResult
secondResult, err = e.buildResp(ctx, secondPartRanges)
if err != nil {
e.feedback.Invalidate()
return errors.Trace(err)
}
e.resultHandler.open(firstResult, secondResult)
return nil
}

// buildResp first build request and send it to tikv using distsql.Select. It uses SelectResut returned by the callee
// to fetch all results.
func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
kvReq, err := builder.SetTableRanges(e.tableID, ranges, e.feedback).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
if err != nil {
return nil, errors.Trace(err)
}
result, err := distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback)
if err != nil {
return nil, errors.Trace(err)
}
result.Fetch(ctx)
return result, nil
}

func splitRanges(ranges []*ranger.Range, keepOrder bool) ([]*ranger.Range, []*ranger.Range) {
if len(ranges) == 0 || ranges[0].LowVal[0].Kind() == types.KindInt64 {
return ranges, nil
Expand Down Expand Up @@ -922,63 +814,3 @@ func GetLackHandles(expectedHandles []int64, obtainedHandlesMap map[int64]struct

return diffHandles
}

type tableResultHandler struct {
// If the pk is unsigned and we have KeepOrder=true.
// optionalResult handles the request whose range is in signed int range.
// result handles the request whose range is exceed signed int range.
// Otherwise, we just set optionalFinished true and the result handles the whole ranges.
optionalResult distsql.SelectResult
result distsql.SelectResult

optionalFinished bool
}

func (tr *tableResultHandler) open(optionalResult, result distsql.SelectResult) {
if optionalResult == nil {
tr.optionalFinished = true
tr.result = result
return
}
tr.optionalResult = optionalResult
tr.result = result
tr.optionalFinished = false
}

func (tr *tableResultHandler) nextChunk(ctx context.Context, chk *chunk.Chunk) error {
if !tr.optionalFinished {
err := tr.optionalResult.Next(ctx, chk)
if err != nil {
return errors.Trace(err)
}
if chk.NumRows() > 0 {
return nil
}
tr.optionalFinished = true
}
return tr.result.Next(ctx, chk)
}

func (tr *tableResultHandler) nextRaw(ctx context.Context) (data []byte, err error) {
if !tr.optionalFinished {
data, err = tr.optionalResult.NextRaw(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if data != nil {
return data, nil
}
tr.optionalFinished = true
}
data, err = tr.result.NextRaw(ctx)
if err != nil {
return nil, errors.Trace(err)
}
return data, nil
}

func (tr *tableResultHandler) Close() error {
err := closeAll(tr.optionalResult, tr.result)
tr.optionalResult, tr.result = nil, nil
return errors.Trace(err)
}
Loading

0 comments on commit 78d07f9

Please sign in to comment.