Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session, tikv: allocate task IDs for distsql requests #16520

Merged
merged 10 commits into from
May 13, 2020
Merged
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.Request.Concurrency = sv.DistSQLScanConcurrency
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.TaskID = sv.StmtCtx.TaskID
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.GetReplicaRead()
if sv.SnapshotInfoschema != nil {
Expand Down
1 change: 1 addition & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
var batchGetter kv.BatchGetter = snapshot
if txn.Valid() {
batchGetter = kv.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot)
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
TimeZone: vars.Location(),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), vars.MemQuotaQuery),
DiskTracker: disk.NewTracker(stringutil.MemoizeStr(s.Text), -1),
TaskID: stmtctx.AllocateTaskID(),
}
if config.GetGlobalConfig().OOMUseTmpStorage && GlobalDiskUsageTracker != nil {
sc.DiskTracker.AttachToGlobalTracker(GlobalDiskUsageTracker)
Expand Down
1 change: 1 addition & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
var tblID int64
if e.partInfo != nil {
tblID = e.partInfo.ID
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20200424032552-6650270c39c3
github.com/pingcap/kvproto v0.0.0-20200428135407-0f5ffe459677
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v0.0.0-20200507022230-f3bf29096657
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2
Expand Down
27 changes: 2 additions & 25 deletions go.sum

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (
SnapshotTS
// Set replica read
ReplicaRead
// Set task ID
TaskID
)

// Priority value for transaction priority.
Expand Down Expand Up @@ -319,6 +321,8 @@ type Request struct {
SchemaVar int64
// BatchCop indicates whether send batch coprocessor request to tiflash.
BatchCop bool
// TaskID is an unique ID for an execution of a statement
TaskID uint64
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down
14 changes: 14 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3205,3 +3205,17 @@ func (s *testSchemaSuite) TestTxnSize(c *C) {
c.Assert(err, IsNil)
c.Assert(txn.Size() > 0, IsTrue)
}

func (s *testSessionSuite2) TestPerStmtTaskID(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table task_id (v int)")

tk.MustExec("begin")
tk.MustExec("select * from task_id where v > 10")
taskID1 := tk.Se.GetSessionVars().StmtCtx.TaskID
tk.MustExec("select * from task_id where v < 5")
taskID2 := tk.Se.GetSessionVars().StmtCtx.TaskID
tk.MustExec("commit")

c.Assert(taskID1 != taskID2, IsTrue)
}
10 changes: 10 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/parser"
Expand All @@ -38,6 +39,13 @@ const (
WarnLevelNote = "Note"
)

var taskIDAlloc uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskIDAlloc is a server-level variable here.
Can we define it as a session-level variable to reduce the contention on it?


// AllocateTaskID allocates a new unique ID for a statement execution
func AllocateTaskID() uint64 {
return atomic.AddUint64(&taskIDAlloc, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a chance that different tidb-server use the same task ID.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, how about moving this calculation logic into the request sender side?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TiDB's logic is too complicated, I'm not sure if start ts exists when task id is assigned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the statement idx, starts from 0 as the task id in the transaction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original author also suggests to use the ID for tracing. I'm not sure if allocating as statement index will affect the purpose.

}

// SQLWarn relates a sql warning and it's level.
type SQLWarn struct {
Level string
Expand Down Expand Up @@ -144,6 +152,7 @@ type StatementContext struct {
LockKeysDuration time.Duration
LockKeysCount int32
TblInfo2UnionScan map[*model.TableInfo]bool
TaskID uint64 // unique ID for an execution of a statement
}

// StmtHints are SessionVars related sql hints.
Expand Down Expand Up @@ -465,6 +474,7 @@ func (sc *StatementContext) ResetForRetry() {
sc.BaseRowID = 0
sc.TableIDs = sc.TableIDs[:0]
sc.IndexNames = sc.IndexNames[:0]
sc.TaskID = AllocateTaskID()
}

// MergeExecDetails merges a single region execution details into self, used to print
Expand Down
1 change: 1 addition & 0 deletions store/tikv/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta
NotFillCache: b.req.NotFillCache,
HandleTime: true,
ScanDetail: true,
TaskId: b.req.TaskID,
})
req.StoreTp = kv.TiFlash

Expand Down
1 change: 1 addition & 0 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
NotFillCache: worker.req.NotFillCache,
HandleTime: true,
ScanDetail: true,
TaskId: worker.req.TaskID,
})
req.StoreTp = task.storeType
startTime := time.Now()
Expand Down
1 change: 1 addition & 0 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (s *Scanner) getData(bo *Backoffer) error {
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.replicaRead, s.snapshot.replicaReadSeed, pb.Context{
Priority: s.snapshot.priority,
NotFillCache: s.snapshot.notFillCache,
TaskId: s.snapshot.taskID,
})
resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type tikvSnapshot struct {
vars *kv.Variables
replicaRead kv.ReplicaReadType
replicaReadSeed uint32
taskID uint64
minCommitTSPushed

// Cache the result of BatchGet.
Expand Down Expand Up @@ -228,6 +229,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
}, s.replicaRead, s.replicaReadSeed, pb.Context{
Priority: s.priority,
NotFillCache: s.notFillCache,
TaskId: s.taskID,
})

resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, ReadTimeoutMedium, kv.TiKV, "")
Expand Down Expand Up @@ -338,6 +340,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
}, s.replicaRead, s.replicaReadSeed, pb.Context{
Priority: s.priority,
NotFillCache: s.notFillCache,
TaskId: s.taskID,
})
for {
loc, err := s.store.regionCache.LocateKey(bo, k)
Expand Down Expand Up @@ -405,6 +408,8 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
s.replicaRead = val.(kv.ReplicaReadType)
case kv.Priority:
s.priority = kvPriorityToCommandPri(val.(int))
case kv.TaskID:
s.taskID = val.(uint64)
}
}

Expand Down