From adc2d3b38ec50303e374ce996544a59778bd7374 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 9 Apr 2020 13:01:09 +0800 Subject: [PATCH] [store/tikv] support batch coprocessor for TiFlash (#16030) --- distsql/request_builder.go | 6 + executor/builder.go | 21 ++ executor/table_reader.go | 3 + go.mod | 2 +- go.sum | 4 +- kv/kv.go | 2 + session/session.go | 1 + sessionctx/variable/session.go | 8 + sessionctx/variable/sysvar.go | 1 + sessionctx/variable/tidb_vars.go | 5 + sessionctx/variable/varsutil.go | 9 + sessionctx/variable/varsutil_test.go | 1 + store/tikv/batch_coprocessor.go | 406 +++++++++++++++++++++++++++ store/tikv/client.go | 52 +++- store/tikv/coprocessor.go | 5 + store/tikv/region_request.go | 31 ++ store/tikv/region_request_test.go | 3 + store/tikv/tikvrpc/tikvrpc.go | 45 +++ 18 files changed, 598 insertions(+), 7 deletions(-) create mode 100644 store/tikv/batch_coprocessor.go diff --git a/distsql/request_builder.go b/distsql/request_builder.go index ad1161f98506e..872bc831818a5 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -138,6 +138,12 @@ func (builder *RequestBuilder) SetStoreType(storeType kv.StoreType) *RequestBuil return builder } +// SetAllowBatchCop sets `BatchCop` property. +func (builder *RequestBuilder) SetAllowBatchCop(batchCop bool) *RequestBuilder { + builder.Request.BatchCop = batchCop + return builder +} + func (builder *RequestBuilder) getIsolationLevel() kv.IsoLevel { switch builder.Tp { case kv.ReqTypeAnalyze: diff --git a/executor/builder.go b/executor/builder.go index 0d5d92fc90c51..dc1669ca5e45d 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2205,6 +2205,26 @@ func containsLimit(execs []*tipb.Executor) bool { return false } +// When allow batch cop is 1, only agg / topN uses batch cop. +// When allow batch cop is 2, every query uses batch cop. +func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) { + if e.storeType != kv.TiFlash || e.keepOrder { + return + } + switch e.ctx.GetSessionVars().AllowBatchCop { + case 1: + for _, p := range v.TablePlans { + switch p.(type) { + case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN: + e.batchCop = true + } + } + case 2: + e.batchCop = true + } + return +} + func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableReader) (*TableReaderExecutor, error) { dagReq, streaming, err := b.constructDAGReq(v.TablePlans) if err != nil { @@ -2235,6 +2255,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea plans: v.TablePlans, storeType: v.StoreType, } + e.setBatchCop(v) e.buildVirtualColumnInfo() if containsLimit(dagReq.Executors) { e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc) diff --git a/executor/table_reader.go b/executor/table_reader.go index 8af87b7c40ae5..071d00381679b 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -88,6 +88,8 @@ type TableReaderExecutor struct { virtualColumnIndex []int // virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns. virtualColumnRetFieldTypes []*types.FieldType + // batchCop indicates whether use super batch coprocessor request, only works for TiFlash engine. + batchCop bool } // Open initialzes necessary variables for using this executor. @@ -201,6 +203,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra SetFromSessionVars(e.ctx.GetSessionVars()). SetMemTracker(e.memTracker). SetStoreType(e.storeType). + SetAllowBatchCop(e.batchCop). Build() if err != nil { return nil, err diff --git a/go.mod b/go.mod index ad693571f02ae..a8cd8017c96bb 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798 github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904 + github.com/pingcap/kvproto v0.0.0-20200409034505-a5af800ca2ef github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd github.com/pingcap/parser v0.0.0-20200407074807-436f1c8c4cff github.com/pingcap/pd/v4 v4.0.0-beta.1.0.20200305072537-61d9f9cc35d3 diff --git a/go.sum b/go.sum index 7eb399ac04511..61172432b8914 100644 --- a/go.sum +++ b/go.sum @@ -264,8 +264,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200214064158-62d31900d88e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200221034943-a2aa1d1e20a8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904 h1:pMFUXvhJ62hX8m0Q4RsL7L+hSW1mAMG26So5eFMoAtI= -github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200409034505-a5af800ca2ef h1:t+bOucRUlIlzW+6S32qG8ufu4iC8F8LEld4Rdhhp1Aw= +github.com/pingcap/kvproto v0.0.0-20200409034505-a5af800ca2ef/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/kv/kv.go b/kv/kv.go index 0c722a8ff46ac..105a544397d64 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -311,6 +311,8 @@ type Request struct { Cacheable bool // SchemaVer is for any schema-ful storage to validate schema correctness if necessary. SchemaVar int64 + // BatchCop indicates whether send batch coprocessor request to tiflash. + BatchCop bool } // ResultSubset represents a result subset from a single storage unit. diff --git a/session/session.go b/session/session.go index e8e51f216e48d..65b6639ede47d 100644 --- a/session/session.go +++ b/session/session.go @@ -1944,6 +1944,7 @@ var builtinGlobalVariable = []string{ variable.TiDBEnableNoopFuncs, variable.TiDBEnableIndexMerge, variable.TiDBTxnMode, + variable.TiDBAllowBatchCop, variable.TiDBRowFormatVersion, variable.TiDBEnableStmtSummary, variable.TiDBStmtSummaryInternalQuery, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 3af2f1721947f..2499c35221c8f 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -395,6 +395,10 @@ type SessionVars struct { // This variable is currently not recommended to be turned on. AllowWriteRowID bool + // AllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join. + // If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop. + AllowBatchCop int + // CorrelationThreshold is the guard to enable row count estimation using column order correlation. CorrelationThreshold float64 @@ -721,6 +725,8 @@ func NewSessionVars() *SessionVars { } terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming)) + vars.AllowBatchCop = DefTiDBAllowBatchCop + var enableChunkRPC string if config.GetGlobalConfig().TiKVClient.EnableChunkRPC { enableChunkRPC = "1" @@ -1080,6 +1086,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.IndexLookupJoinConcurrency = tidbOptPositiveInt32(val, DefIndexLookupJoinConcurrency) case TiDBIndexJoinBatchSize: s.IndexJoinBatchSize = tidbOptPositiveInt32(val, DefIndexJoinBatchSize) + case TiDBAllowBatchCop: + s.AllowBatchCop = int(tidbOptInt64(val, DefTiDBAllowBatchCop)) case TiDBIndexLookupSize: s.IndexLookupSize = tidbOptPositiveInt32(val, DefIndexLookupSize) case TiDBHashJoinConcurrency: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 902eb33f5b93a..70822cc7bee42 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -642,6 +642,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)}, {ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)}, {ScopeGlobal | ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)}, + {ScopeGlobal | ScopeSession, TiDBAllowBatchCop, strconv.Itoa(DefTiDBAllowBatchCop)}, {ScopeGlobal | ScopeSession, TiDBInitChunkSize, strconv.Itoa(DefInitChunkSize)}, {ScopeGlobal | ScopeSession, TiDBEnableCascadesPlanner, "0"}, {ScopeGlobal | ScopeSession, TiDBEnableIndexMerge, "0"}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 8d53ee4766007..a99068462555e 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -243,6 +243,10 @@ const ( // TiDBMaxChunkSize is used to control the max chunk size during query execution. TiDBMaxChunkSize = "tidb_max_chunk_size" + // TiDBAllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join. + // If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop. + TiDBAllowBatchCop = "tidb_allow_batch_cop" + // TiDBInitChunkSize is used to control the init chunk size during query execution. TiDBInitChunkSize = "tidb_init_chunk_size" @@ -442,6 +446,7 @@ const ( DefTiDBHashJoinConcurrency = 5 DefTiDBProjectionConcurrency = 4 DefTiDBOptimizerSelectivityLevel = 0 + DefTiDBAllowBatchCop = 1 DefTiDBTxnMode = "" DefTiDBRowFormatV1 = 1 DefTiDBRowFormatV2 = 2 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 782ab5f0d9f8f..0605bd31f3cd4 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -516,6 +516,15 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) } return value, nil + case TiDBAllowBatchCop: + v, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return value, ErrWrongTypeForVar.GenWithStackByArgs(name) + } + if v < 0 || v > 2 { + return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) + } + return value, nil case TiDBOptCPUFactor, TiDBOptCopCPUFactor, TiDBOptNetworkFactor, diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index d8d083ec53629..6ecd775febbe6 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -64,6 +64,7 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) { c.Assert(vars.IndexSerialScanConcurrency, Equals, DefIndexSerialScanConcurrency) c.Assert(vars.IndexLookupJoinConcurrency, Equals, DefIndexLookupJoinConcurrency) c.Assert(vars.HashJoinConcurrency, Equals, DefTiDBHashJoinConcurrency) + c.Assert(vars.AllowBatchCop, Equals, DefTiDBAllowBatchCop) c.Assert(vars.ProjectionConcurrency, Equals, int64(DefTiDBProjectionConcurrency)) c.Assert(vars.HashAggPartialConcurrency, Equals, DefTiDBHashAggPartialConcurrency) c.Assert(vars.HashAggFinalConcurrency, Equals, DefTiDBHashAggFinalConcurrency) diff --git a/store/tikv/batch_coprocessor.go b/store/tikv/batch_coprocessor.go new file mode 100644 index 0000000000000..7a446eec95d77 --- /dev/null +++ b/store/tikv/batch_coprocessor.go @@ -0,0 +1,406 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv + +import ( + "context" + "io" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" + "go.uber.org/zap" +) + +// batchCopTask comprises of multiple copTask that will send to same store. +type batchCopTask struct { + storeAddr string + cmdType tikvrpc.CmdType + + copTasks []copTaskAndRPCContext +} + +type batchCopResponse struct { + pbResp *coprocessor.BatchResponse + detail *execdetails.ExecDetails + + // batch Cop Response is yet to return startKey. So batchCop cannot retry partially. + startKey kv.Key + err error + respSize int64 + respTime time.Duration +} + +// GetData implements the kv.ResultSubset GetData interface. +func (rs *batchCopResponse) GetData() []byte { + return rs.pbResp.Data +} + +// GetStartKey implements the kv.ResultSubset GetStartKey interface. +func (rs *batchCopResponse) GetStartKey() kv.Key { + return rs.startKey +} + +// GetExecDetails is unavailable currently, because TiFlash has not collected exec details for batch cop. +// TODO: Will fix in near future. +func (rs *batchCopResponse) GetExecDetails() *execdetails.ExecDetails { + return &execdetails.ExecDetails{} +} + +// MemSize returns how many bytes of memory this response use +func (rs *batchCopResponse) MemSize() int64 { + if rs.respSize != 0 { + return rs.respSize + } + + // ignore rs.err + rs.respSize += int64(cap(rs.startKey)) + if rs.detail != nil { + rs.respSize += int64(sizeofExecDetails) + if rs.detail.CommitDetail != nil { + rs.respSize += int64(sizeofCommitDetails) + } + } + if rs.pbResp != nil { + // Using a approximate size since it's hard to get a accurate value. + rs.respSize += int64(rs.pbResp.Size()) + } + return rs.respSize +} + +func (rs *batchCopResponse) RespTime() time.Duration { + return rs.respTime +} + +type copTaskAndRPCContext struct { + task *copTask + ctx *RPCContext +} + +func buildBatchCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv.Request) ([]*batchCopTask, error) { + start := time.Now() + const cmdType = tikvrpc.CmdBatchCop + rangesLen := ranges.len() + for { + var tasks []*copTask + appendTask := func(regionWithRangeInfo *KeyLocation, ranges *copRanges) { + tasks = append(tasks, &copTask{ + region: regionWithRangeInfo.Region, + ranges: ranges, + cmdType: cmdType, + storeType: req.StoreType, + }) + } + + err := splitRanges(bo, cache, ranges, appendTask) + if err != nil { + return nil, errors.Trace(err) + } + + var batchTasks []*batchCopTask + + storeTaskMap := make(map[string]*batchCopTask) + needRetry := false + for _, task := range tasks { + rpcCtx, err := cache.GetTiFlashRPCContext(bo, task.region) + if err != nil { + return nil, err + } + // If the region is not found in cache, it must be out + // of date and already be cleaned up. We should retry and generate new tasks. + if rpcCtx == nil { + needRetry = true + break + } + if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok { + batchCop.copTasks = append(batchCop.copTasks, copTaskAndRPCContext{task: task, ctx: rpcCtx}) + } else { + batchTask := &batchCopTask{ + storeAddr: rpcCtx.Addr, + cmdType: cmdType, + copTasks: []copTaskAndRPCContext{{task, rpcCtx}}, + } + storeTaskMap[rpcCtx.Addr] = batchTask + } + } + if needRetry { + continue + } + for _, task := range storeTaskMap { + batchTasks = append(batchTasks, task) + } + + if elapsed := time.Since(start); elapsed > time.Millisecond*500 { + logutil.BgLogger().Warn("buildBatchCopTasks takes too much time", + zap.Duration("elapsed", elapsed), + zap.Int("range len", rangesLen), + zap.Int("task len", len(batchTasks))) + } + tikvTxnRegionsNumHistogramWithBatchCoprocessor.Observe(float64(len(batchTasks))) + return batchTasks, nil + } +} + +func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *kv.Variables) kv.Response { + if req.KeepOrder || req.Desc { + return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")} + } + ctx = context.WithValue(ctx, txnStartKey, req.StartTs) + bo := NewBackoffer(ctx, copBuildTaskMaxBackoff).WithVars(vars) + tasks, err := buildBatchCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req) + if err != nil { + return copErrorResponse{err} + } + it := &batchCopIterator{ + store: c.store, + req: req, + finishCh: make(chan struct{}), + vars: vars, + memTracker: req.MemTracker, + clientHelper: clientHelper{ + LockResolver: c.store.lockResolver, + RegionCache: c.store.regionCache, + Client: c.store.client, + minCommitTSPushed: &minCommitTSPushed{data: make(map[uint64]struct{}, 5)}, + }, + } + it.tasks = tasks + it.respChan = make(chan *batchCopResponse, 2048) + go it.run(ctx) + return it +} + +type batchCopIterator struct { + clientHelper + + store *tikvStore + req *kv.Request + finishCh chan struct{} + + tasks []*batchCopTask + + // Batch results are stored in respChan. + respChan chan *batchCopResponse + + vars *kv.Variables + + memTracker *memory.Tracker + + replicaReadSeed uint32 + + wg sync.WaitGroup + // closed represents when the Close is called. + // There are two cases we need to close the `finishCh` channel, one is when context is done, the other one is + // when the Close is called. we use atomic.CompareAndSwap `closed` to to make sure the channel is not closed twice. + closed uint32 +} + +func (b *batchCopIterator) run(ctx context.Context) { + // We run workers for every batch cop. + for _, task := range b.tasks { + b.wg.Add(1) + bo := NewBackoffer(ctx, copNextMaxBackoff).WithVars(b.vars) + go b.handleTask(ctx, bo, task) + } + b.wg.Wait() + close(b.respChan) +} + +// Next returns next coprocessor result. +// NOTE: Use nil to indicate finish, so if the returned ResultSubset is not nil, reader should continue to call Next(). +func (b *batchCopIterator) Next(ctx context.Context) (kv.ResultSubset, error) { + var ( + resp *batchCopResponse + ok bool + closed bool + ) + + // Get next fetched resp from chan + resp, ok, closed = b.recvFromRespCh(ctx) + if !ok || closed { + return nil, nil + } + + if resp.err != nil { + return nil, errors.Trace(resp.err) + } + + err := b.store.CheckVisibility(b.req.StartTs) + if err != nil { + return nil, errors.Trace(err) + } + return resp, nil +} + +func (b *batchCopIterator) recvFromRespCh(ctx context.Context) (resp *batchCopResponse, ok bool, exit bool) { + select { + case resp, ok = <-b.respChan: + case <-b.finishCh: + exit = true + case <-ctx.Done(): + // We select the ctx.Done() in the thread of `Next` instead of in the worker to avoid the cost of `WithCancel`. + if atomic.CompareAndSwapUint32(&b.closed, 0, 1) { + close(b.finishCh) + } + exit = true + } + return +} + +// Close releases the resource. +func (b *batchCopIterator) Close() error { + if atomic.CompareAndSwapUint32(&b.closed, 0, 1) { + close(b.finishCh) + } + b.wg.Wait() + return nil +} + +func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *batchCopTask) { + logutil.BgLogger().Debug("handle batch task") + tasks := []*batchCopTask{task} + for idx := 0; idx < len(tasks); idx++ { + ret, err := b.handleTaskOnce(ctx, bo, task) + if err != nil { + resp := &batchCopResponse{err: errors.Trace(err)} + b.sendToRespCh(resp) + break + } + tasks = append(tasks, ret...) + } + b.wg.Done() +} + +// Merge all ranges and request again. +func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) { + ranges := &copRanges{} + for _, taskCtx := range batchTask.copTasks { + taskCtx.task.ranges.do(func(ran *kv.KeyRange) { + ranges.mid = append(ranges.mid, *ran) + }) + } + return buildBatchCopTasks(bo, b.RegionCache, ranges, b.req) +} + +func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) { + logutil.BgLogger().Debug("handle batch task once") + sender := NewRegionBatchRequestSender(b.store.regionCache, b.store.client) + var regionInfos []*coprocessor.RegionInfo + for _, task := range task.copTasks { + regionInfos = append(regionInfos, &coprocessor.RegionInfo{ + RegionId: task.task.region.id, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: task.task.region.confVer, + Version: task.task.region.ver, + }, + Ranges: task.task.ranges.toPBRanges(), + }) + } + + copReq := coprocessor.BatchRequest{ + Tp: b.req.Tp, + StartTs: b.req.StartTs, + Data: b.req.Data, + SchemaVer: b.req.SchemaVar, + Regions: regionInfos, + } + + req := tikvrpc.NewRequest(task.cmdType, &copReq, kvrpcpb.Context{ + IsolationLevel: pbIsolationLevel(b.req.IsolationLevel), + Priority: kvPriorityToCommandPri(b.req.Priority), + NotFillCache: b.req.NotFillCache, + HandleTime: true, + ScanDetail: true, + }) + req.StoreTp = kv.TiFlash + + logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.copTasks))) + resp, retry, err := sender.sendReqToAddr(bo, task.copTasks, req, ReadTimeoutMedium) + // If there are store errors, we should retry for all regions. + if retry { + return b.retryBatchCopTask(ctx, bo, task) + } + if err != nil { + return nil, errors.Trace(err) + } + return nil, b.handleStreamedBatchCopResponse(ctx, bo, resp.Resp.(*tikvrpc.BatchCopStreamResponse), task) +} + +func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, bo *Backoffer, response *tikvrpc.BatchCopStreamResponse, task *batchCopTask) (err error) { + defer response.Close() + resp := response.BatchResponse + if resp == nil { + // streaming request returns io.EOF, so the first Response is nil. + return + } + for { + err = b.handleBatchCopResponse(bo, resp, task) + if err != nil { + return errors.Trace(err) + } + resp, err = response.Recv() + if err != nil { + if errors.Cause(err) == io.EOF { + return nil + } + + if err1 := bo.Backoff(boTiKVRPC, errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil { + return errors.Trace(err) + } + + // No coprocessor.Response for network error, rebuild task based on the last success one. + if errors.Cause(err) == context.Canceled { + logutil.BgLogger().Info("stream recv timeout", zap.Error(err)) + } else { + logutil.BgLogger().Info("stream unknown error", zap.Error(err)) + } + return errors.Trace(err) + } + } +} + +func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *coprocessor.BatchResponse, task *batchCopTask) (err error) { + if otherErr := response.GetOtherError(); otherErr != "" { + err = errors.Errorf("other error: %s", otherErr) + logutil.BgLogger().Warn("other error", + zap.Uint64("txnStartTS", b.req.StartTs), + zap.String("storeAddr", task.storeAddr), + zap.Error(err)) + return errors.Trace(err) + } + + b.sendToRespCh(&batchCopResponse{ + pbResp: response, + }) + + return +} + +func (b *batchCopIterator) sendToRespCh(resp *batchCopResponse) (exit bool) { + select { + case b.respChan <- resp: + case <-b.finishCh: + exit = true + } + return +} diff --git a/store/tikv/client.go b/store/tikv/client.go index df5b47b0c9a77..5af5aacf2b031 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -348,12 +348,20 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R client := tikvpb.NewTikvClient(clientConn) - if req.Type != tikvrpc.CmdCopStream { - ctx1, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - return tikvrpc.CallRPC(ctx1, client, req) + if req.Type == tikvrpc.CmdBatchCop { + return c.getBatchCopStreamResponse(ctx, client, req, timeout, connArray) } + if req.Type == tikvrpc.CmdCopStream { + return c.getCopStreamResponse(ctx, client, req, timeout, connArray) + } + + ctx1, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + return tikvrpc.CallRPC(ctx1, client, req) +} + +func (c *rpcClient) getCopStreamResponse(ctx context.Context, client tikvpb.TikvClient, req *tikvrpc.Request, timeout time.Duration, connArray *connArray) (*tikvrpc.Response, error) { // Coprocessor streaming request. // Use context to support timeout for grpc streaming client. ctx1, cancel := context.WithCancel(ctx) @@ -385,6 +393,42 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R } copStream.Response = first return resp, nil + +} + +func (c *rpcClient) getBatchCopStreamResponse(ctx context.Context, client tikvpb.TikvClient, req *tikvrpc.Request, timeout time.Duration, connArray *connArray) (*tikvrpc.Response, error) { + // Coprocessor streaming request. + // Use context to support timeout for grpc streaming client. + ctx1, cancel := context.WithCancel(ctx) + // Should NOT call defer cancel() here because it will cancel further stream.Recv() + // We put it in copStream.Lease.Cancel call this cancel at copStream.Close + // TODO: add unit test for SendRequest. + resp, err := tikvrpc.CallRPC(ctx1, client, req) + if err != nil { + cancel() + return nil, errors.Trace(err) + } + + // Put the lease object to the timeout channel, so it would be checked periodically. + copStream := resp.Resp.(*tikvrpc.BatchCopStreamResponse) + copStream.Timeout = timeout + copStream.Lease.Cancel = cancel + connArray.streamTimeout <- &copStream.Lease + + // Read the first streaming response to get CopStreamResponse. + // This can make error handling much easier, because SendReq() retry on + // region error automatically. + var first *coprocessor.BatchResponse + first, err = copStream.Recv() + if err != nil { + if errors.Cause(err) != io.EOF { + return nil, errors.Trace(err) + } + logutil.BgLogger().Debug("batch copstream returns nothing for the request.") + } + copStream.BatchResponse = first + return resp, nil + } func (c *rpcClient) Close() error { diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 4abd38f0f3e99..7b1e9ca2fc3b8 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -46,6 +46,7 @@ import ( ) var tikvTxnRegionsNumHistogramWithCoprocessor = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor") +var tikvTxnRegionsNumHistogramWithBatchCoprocessor = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor") // CopClient is coprocessor client. type CopClient struct { @@ -56,6 +57,10 @@ type CopClient struct { // Send builds the request and gets the coprocessor iterator response. func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables) kv.Response { + if req.StoreType == kv.TiFlash && req.BatchCop { + logutil.BgLogger().Debug("send batch requests") + return c.sendBatch(ctx, req, vars) + } ctx = context.WithValue(ctx, txnStartKey, req.StartTs) bo := NewBackoffer(ctx, copBuildTaskMaxBackoff).WithVars(vars) tasks, err := buildCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req) diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 1e20e09c6e229..ac671683eb58b 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -62,6 +62,37 @@ type RegionRequestSender struct { failStoreIDs map[uint64]struct{} } +// RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way. +type RegionBatchRequestSender struct { + RegionRequestSender +} + +// NewRegionBatchRequestSender creates a RegionBatchRequestSender object. +func NewRegionBatchRequestSender(cache *RegionCache, client Client) *RegionBatchRequestSender { + return &RegionBatchRequestSender{RegionRequestSender: RegionRequestSender{regionCache: cache, client: client}} +} + +func (ss *RegionBatchRequestSender) sendReqToAddr(bo *Backoffer, ctxs []copTaskAndRPCContext, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, err error) { + // use the first ctx to send request, because every ctx has same address. + ctx := ctxs[0].ctx + if e := tikvrpc.SetContext(req, ctx.Meta, ctx.Peer); e != nil { + return nil, false, errors.Trace(e) + } + resp, err = ss.client.SendRequest(bo.ctx, ctx.Addr, req, timout) + if err != nil { + ss.rpcError = err + for _, failedCtx := range ctxs { + e := ss.onSendFail(bo, failedCtx.ctx, err) + if e != nil { + return nil, false, errors.Trace(e) + } + } + return nil, true, nil + } + // We don't need to process region error or lock error. Because TiFlash will retry by itself. + return +} + // NewRegionRequestSender creates a new sender. func NewRegionRequestSender(regionCache *RegionCache, client Client) *RegionRequestSender { return &RegionRequestSender{ diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 3b5c5ce7a2526..e45b33cf57321 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -346,6 +346,9 @@ func (s *mockTikvGrpcServer) PhysicalScanLock(context.Context, *kvrpcpb.Physical func (s *mockTikvGrpcServer) Coprocessor(context.Context, *coprocessor.Request) (*coprocessor.Response, error) { return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) BatchCoprocessor(*coprocessor.BatchRequest, tikvpb.Tikv_BatchCoprocessorServer) error { + return errors.New("unreachable") +} func (s *mockTikvGrpcServer) Raft(tikvpb.Tikv_RaftServer) error { return errors.New("unreachable") } diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 7dbd0d98d5fb0..0e54aee45c495 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -68,6 +68,7 @@ const ( CmdCop CmdType = 512 + iota CmdCopStream + CmdBatchCop CmdMvccGetByKey CmdType = 1024 + iota CmdMvccGetByStartTs @@ -136,6 +137,8 @@ func (t CmdType) String() string { return "Cop" case CmdCopStream: return "CopStream" + case CmdBatchCop: + return "BatchCop" case CmdMvccGetByKey: return "MvccGetByKey" case CmdMvccGetByStartTs: @@ -309,6 +312,11 @@ func (req *Request) Cop() *coprocessor.Request { return req.req.(*coprocessor.Request) } +// BatchCop returns coprocessor request in request. +func (req *Request) BatchCop() *coprocessor.BatchRequest { + return req.req.(*coprocessor.BatchRequest) +} + // MvccGetByKey returns MvccGetByKeyRequest in request. func (req *Request) MvccGetByKey() *kvrpcpb.MvccGetByKeyRequest { return req.req.(*kvrpcpb.MvccGetByKeyRequest) @@ -495,6 +503,14 @@ type CopStreamResponse struct { Lease // Shared by this object and a background goroutine. } +// BatchCopStreamResponse comprises the BatchCoprocessorClient , the first result and timeout detector. +type BatchCopStreamResponse struct { + tikvpb.Tikv_BatchCoprocessorClient + *coprocessor.BatchResponse + Timeout time.Duration + Lease // Shared by this object and a background goroutine. +} + // SetContext set the Context field for the given req to the specified ctx. func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { ctx := &req.Context @@ -561,6 +577,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.Cop().Context = ctx case CmdCopStream: req.Cop().Context = ctx + case CmdBatchCop: + req.BatchCop().Context = ctx case CmdMvccGetByKey: req.MvccGetByKey().Context = ctx case CmdMvccGetByStartTs: @@ -797,6 +815,12 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Resp = &CopStreamResponse{ Tikv_CoprocessorStreamClient: streamClient, } + case CmdBatchCop: + var streamClient tikvpb.Tikv_BatchCoprocessorClient + streamClient, err = client.BatchCoprocessor(ctx, req.BatchCop()) + resp.Resp = &BatchCopStreamResponse{ + Tikv_BatchCoprocessorClient: streamClient, + } case CmdMvccGetByKey: resp.Resp, err = client.MvccGetByKey(ctx, req.MvccGetByKey()) case CmdMvccGetByStartTs: @@ -858,6 +882,27 @@ func (resp *CopStreamResponse) Close() { } } +// Recv overrides the stream client Recv() function. +func (resp *BatchCopStreamResponse) Recv() (*coprocessor.BatchResponse, error) { + deadline := time.Now().Add(resp.Timeout).UnixNano() + atomic.StoreInt64(&resp.Lease.deadline, deadline) + + ret, err := resp.Tikv_BatchCoprocessorClient.Recv() + + atomic.StoreInt64(&resp.Lease.deadline, 0) // Stop the lease check. + return ret, errors.Trace(err) +} + +// Close closes the CopStreamResponse object. +func (resp *BatchCopStreamResponse) Close() { + atomic.StoreInt64(&resp.Lease.deadline, 1) + // We also call cancel here because CheckStreamTimeoutLoop + // is not guaranteed to cancel all items when it exits. + if resp.Lease.Cancel != nil { + resp.Lease.Cancel() + } +} + // CheckStreamTimeoutLoop runs periodically to check is there any stream request timeouted. // Lease is an object to track stream requests, call this function with "go CheckStreamTimeoutLoop()" // It is not guaranteed to call every Lease.Cancel() putting into channel when exits.