Skip to content

Commit

Permalink
session, tikv: allocate task IDs for distsql requests (#16520)
Browse files Browse the repository at this point in the history
  • Loading branch information
sticnarf authored May 13, 2020
1 parent ddd8f37 commit 526a711
Show file tree
Hide file tree
Showing 13 changed files with 43 additions and 26 deletions.
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.Request.Concurrency = sv.DistSQLScanConcurrency
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.TaskID = sv.StmtCtx.TaskID
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.GetReplicaRead()
if sv.SnapshotInfoschema != nil {
Expand Down
1 change: 1 addition & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
var batchGetter kv.BatchGetter = snapshot
if txn.Valid() {
batchGetter = kv.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot)
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
TimeZone: vars.Location(),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), vars.MemQuotaQuery),
DiskTracker: disk.NewTracker(stringutil.MemoizeStr(s.Text), -1),
TaskID: stmtctx.AllocateTaskID(),
}
if config.GetGlobalConfig().OOMUseTmpStorage && GlobalDiskUsageTracker != nil {
sc.DiskTracker.AttachToGlobalTracker(GlobalDiskUsageTracker)
Expand Down
1 change: 1 addition & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
var tblID int64
if e.partInfo != nil {
tblID = e.partInfo.ID
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20200424032552-6650270c39c3
github.com/pingcap/kvproto v0.0.0-20200428135407-0f5ffe459677
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v0.0.0-20200507022230-f3bf29096657
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2
Expand Down
27 changes: 2 additions & 25 deletions go.sum

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (
SnapshotTS
// Set replica read
ReplicaRead
// Set task ID
TaskID
)

// Priority value for transaction priority.
Expand Down Expand Up @@ -319,6 +321,8 @@ type Request struct {
SchemaVar int64
// BatchCop indicates whether send batch coprocessor request to tiflash.
BatchCop bool
// TaskID is an unique ID for an execution of a statement
TaskID uint64
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down
14 changes: 14 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3205,3 +3205,17 @@ func (s *testSchemaSuite) TestTxnSize(c *C) {
c.Assert(err, IsNil)
c.Assert(txn.Size() > 0, IsTrue)
}

func (s *testSessionSuite2) TestPerStmtTaskID(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table task_id (v int)")

tk.MustExec("begin")
tk.MustExec("select * from task_id where v > 10")
taskID1 := tk.Se.GetSessionVars().StmtCtx.TaskID
tk.MustExec("select * from task_id where v < 5")
taskID2 := tk.Se.GetSessionVars().StmtCtx.TaskID
tk.MustExec("commit")

c.Assert(taskID1 != taskID2, IsTrue)
}
10 changes: 10 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/parser"
Expand All @@ -38,6 +39,13 @@ const (
WarnLevelNote = "Note"
)

var taskIDAlloc uint64

// AllocateTaskID allocates a new unique ID for a statement execution
func AllocateTaskID() uint64 {
return atomic.AddUint64(&taskIDAlloc, 1)
}

// SQLWarn relates a sql warning and it's level.
type SQLWarn struct {
Level string
Expand Down Expand Up @@ -144,6 +152,7 @@ type StatementContext struct {
LockKeysDuration time.Duration
LockKeysCount int32
TblInfo2UnionScan map[*model.TableInfo]bool
TaskID uint64 // unique ID for an execution of a statement
}

// StmtHints are SessionVars related sql hints.
Expand Down Expand Up @@ -465,6 +474,7 @@ func (sc *StatementContext) ResetForRetry() {
sc.BaseRowID = 0
sc.TableIDs = sc.TableIDs[:0]
sc.IndexNames = sc.IndexNames[:0]
sc.TaskID = AllocateTaskID()
}

// MergeExecDetails merges a single region execution details into self, used to print
Expand Down
1 change: 1 addition & 0 deletions store/tikv/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta
NotFillCache: b.req.NotFillCache,
HandleTime: true,
ScanDetail: true,
TaskId: b.req.TaskID,
})
req.StoreTp = kv.TiFlash

Expand Down
1 change: 1 addition & 0 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
NotFillCache: worker.req.NotFillCache,
HandleTime: true,
ScanDetail: true,
TaskId: worker.req.TaskID,
})
req.StoreTp = task.storeType
startTime := time.Now()
Expand Down
1 change: 1 addition & 0 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (s *Scanner) getData(bo *Backoffer) error {
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.replicaRead, s.snapshot.replicaReadSeed, pb.Context{
Priority: s.snapshot.priority,
NotFillCache: s.snapshot.notFillCache,
TaskId: s.snapshot.taskID,
})
resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type tikvSnapshot struct {
vars *kv.Variables
replicaRead kv.ReplicaReadType
replicaReadSeed uint32
taskID uint64
minCommitTSPushed

// Cache the result of BatchGet.
Expand Down Expand Up @@ -228,6 +229,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
}, s.replicaRead, s.replicaReadSeed, pb.Context{
Priority: s.priority,
NotFillCache: s.notFillCache,
TaskId: s.taskID,
})

resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, ReadTimeoutMedium, kv.TiKV, "")
Expand Down Expand Up @@ -338,6 +340,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
}, s.replicaRead, s.replicaReadSeed, pb.Context{
Priority: s.priority,
NotFillCache: s.notFillCache,
TaskId: s.taskID,
})
for {
loc, err := s.store.regionCache.LocateKey(bo, k)
Expand Down Expand Up @@ -405,6 +408,8 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
s.replicaRead = val.(kv.ReplicaReadType)
case kv.Priority:
s.priority = kvPriorityToCommandPri(val.(int))
case kv.TaskID:
s.taskID = val.(uint64)
}
}

Expand Down

0 comments on commit 526a711

Please sign in to comment.