Skip to content

Commit

Permalink
client-go: attach request source with retry info for coprocessor (pin…
Browse files Browse the repository at this point in the history
…gcap#46706) (pingcap#47003) (pingcap#6)

* client-go: attach request source with retry info for coprocessor (pingcap#46509) (pingcap#46706)

close pingcap#46514

* restore bazelversiong

Signed-off-by: you06 <[email protected]>

* remove unused field

Signed-off-by: you06 <[email protected]>

* update client-go

Signed-off-by: you06 <[email protected]>

---------

Signed-off-by: you06 <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>

Signed-off-by: you06 <[email protected]>
Co-authored-by: you06 <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
3 people authored and GitHub Enterprise committed Sep 19, 2023
1 parent 4cd95b2 commit 92cc413
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 13 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3703,8 +3703,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:u21KmROEJmJL+Wkqz4cUe2BpjmcobqsYVNGEbVWKmnU=",
version = "v2.0.4-0.20230829002742-dfae543556aa",
sum = "h1:tayF5Szzeemsrgmup5dh5Uom4BiwzmYTMQ84NdrA+68=",
version = "v2.0.4-0.20230918031736-9126d0716e90",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.4-0.20230829002742-dfae543556aa
github.com/tikv/client-go/v2 v2.0.4-0.20230829002742-32c4ef54d6ed
github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -935,8 +935,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.4-0.20230829002742-dfae543556aa h1:u21KmROEJmJL+Wkqz4cUe2BpjmcobqsYVNGEbVWKmnU=
github.com/tikv/client-go/v2 v2.0.4-0.20230829002742-dfae543556aa/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ=
github.com/tikv/client-go/v2 v2.0.4-0.20230918062126-32c4ef54d6ed h1:O3MQ1ucUSsH8jTalvX98NZk66YCaxSABRp5UgcPenIc=
github.com/tikv/client-go/v2 v2.0.4-0.20230918062126-32c4ef54d6ed/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ=
github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05 h1:e4hLUKfgfPeJPZwOfU+/I/03G0sn6IZqVcbX/5o+hvM=
github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05/go.mod h1:MLIl+d2WbOF4A3U88WKtyXrQQW417wZDDvBcq2IW9bQ=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
3 changes: 1 addition & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4173,9 +4173,8 @@ func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNo
if !s.isInternal() {
if txn, _ := s.Txn(false); txn != nil && txn.Valid() {
txn.SetOption(kv.RequestSourceType, stmtLabel)
} else {
s.sessionVars.RequestSourceType = stmtLabel
}
s.sessionVars.RequestSourceType = stmtLabel
return
}
if source := ctx.Value(kv.RequestSourceKey); source != nil {
Expand Down
25 changes: 19 additions & 6 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ type copTask struct {

// timeout value for one kv readonly request
tikvClientReadTimeout uint64
// firstReadType is used to indicate the type of first read when retrying.
firstReadType string
}

type batchedCopTask struct {
Expand Down Expand Up @@ -1022,13 +1024,17 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
RecordTimeStat: true,
RecordScanStat: true,
TaskId: worker.req.TaskID,
RequestSource: task.requestSource.GetRequestSource(),
})
req.InputRequestSource = task.requestSource.GetRequestSource()
if task.firstReadType != "" {
req.ReadType = task.firstReadType
req.IsRetryRequest = true
}
if worker.req.ResourceGroupTagger != nil {
worker.req.ResourceGroupTagger(req)
}
timeout := tikv.ReadTimeoutMedium
if task.tikvClientReadTimeout> 0 {
if task.tikvClientReadTimeout > 0 {
timeout = time.Duration(task.tikvClientReadTimeout) * time.Millisecond
}
req.StoreTp = getEndPointType(task.storeType)
Expand Down Expand Up @@ -1073,12 +1079,19 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
tidbmetrics.DistSQLCoprRespBodySize.WithLabelValues(storeAddr).Observe(float64(len(copResp.Data)))
}

var remains []*copTask
if worker.req.Paging.Enable {
return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: copResp}, cacheKey, cacheValue, task, ch, costTime)
remains, err = worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: copResp}, cacheKey, cacheValue, task, ch, costTime)
} else {
// Handles the response for non-paging copTask.
remains, err = worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: copResp}, cacheKey, cacheValue, task, ch, nil, costTime)
}

// Handles the response for non-paging copTask.
return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: copResp}, cacheKey, cacheValue, task, ch, nil, costTime)
if req.ReadType != "" {
for _, remain := range remains {
remain.firstReadType = req.ReadType
}
}
return remains, err
}

const (
Expand Down

0 comments on commit 92cc413

Please sign in to comment.