Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use tidb_kv_read_timeout as first kv request timeout (#919) #948

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
}

func (a *batchConn) getClientAndSend() {
if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil {
if timeout, ok := val.(int); ok && timeout > 0 {
time.Sleep(time.Duration(timeout * int(time.Millisecond)))
}
}

// Choose a connection by round-robbin.
var (
cli *batchCommandsClient
Expand Down
94 changes: 70 additions & 24 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ type replica struct {
peer *metapb.Peer
epoch uint32
attempts int
// deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error.
deadlineErrUsingConfTimeout bool
}

func (r *replica) isEpochStale() bool {
Expand Down Expand Up @@ -337,7 +339,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec
// a request. So, before the new leader is elected, we should not send requests
// to the unreachable old leader to avoid unnecessary timeout.
if liveness != reachable || leader.isExhausted(maxReplicaAttempt) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true}
return nil, stateChanged{}
}
selector.targetIdx = state.leaderIdx
Expand All @@ -352,15 +354,15 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep
return
}
if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true}
}
if liveness != reachable {
selector.invalidateReplicaStore(selector.targetReplica(), cause)
}
}

func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true}
}

// tryFollower is the state where we cannot access the known leader
Expand All @@ -372,22 +374,24 @@ func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) {
// the leader will be updated to replicas[0] and give it another chance.
type tryFollower struct {
stateBase
// if the leader is unavailable, but it still holds the leadership, fallbackFromLeader is true and replica read is enabled.
fallbackFromLeader bool
leaderIdx AccessIndex
lastIdx AccessIndex
labels []*metapb.StoreLabel
leaderIdx AccessIndex
lastIdx AccessIndex
labels []*metapb.StoreLabel
// fromAccessKnownLeader indicates whether the state is changed from `accessKnownLeader`.
fromAccessKnownLeader bool
}

func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
hasDeadlineExceededErr := false
filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) {
for i := 0; i < len(selector.replicas); i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
selectReplica := selector.replicas[idx]
hasDeadlineExceededErr = hasDeadlineExceededErr || selectReplica.deadlineErrUsingConfTimeout
if idx == state.leaderIdx {
continue
}
selectReplica := selector.replicas[idx]
if fn(selectReplica) && selectReplica.store.getLivenessState() != unreachable {
if fn(selectReplica) && selectReplica.store.getLivenessState() != unreachable && !selectReplica.deadlineErrUsingConfTimeout {
return idx, selectReplica
}
}
Expand Down Expand Up @@ -418,6 +422,10 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (

// If all followers are tried and fail, backoff and retry.
if selector.targetIdx < 0 {
if hasDeadlineExceededErr {
// when meet deadline exceeded error, do fast retry without invalidate region cache.
return nil, nil
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand All @@ -426,17 +434,17 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
if err != nil || rpcCtx == nil {
return rpcCtx, err
}
if state.fallbackFromLeader {
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
if !state.fromAccessKnownLeader {
replicaRead := true
rpcCtx.contextPatcher.replicaRead = &replicaRead
}
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
return rpcCtx, nil
}

func (state *tryFollower) onSendSuccess(selector *replicaSelector) {
if !state.fallbackFromLeader {
if state.fromAccessKnownLeader {
peer := selector.targetReplica().peer
if !selector.region.switchWorkLeaderToPeer(peer) {
logutil.BgLogger().Warn("the store must exist",
Expand Down Expand Up @@ -623,16 +631,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
zap.Bool("leader-invalid", leaderInvalid),
zap.Any("labels", state.option.labels))
}
// If leader tried and received deadline exceeded error, return nil to upper layer to retry with default timeout.
if leader.deadlineErrUsingConfTimeout {
return nil, nil
}
if leaderInvalid {
// In stale-read, the request will fallback to leader after the local follower failure.
// If the leader is also unavailable, we can fallback to the follower and use replica-read flag again,
// The remote follower not tried yet, and the local follower can retry without stale-read flag.
if state.isStaleRead {
selector.state = &tryFollower{
fallbackFromLeader: true,
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
}
if leaderEpochStale {
selector.regionCache.scheduleReloadRegion(selector.region)
Expand Down Expand Up @@ -677,7 +688,8 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic
}

func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool {
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable {
// the epoch is staled or retry exhausted, or the store is unreachable.
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout {
return false
}
if state.option.leaderOnly && idx == state.leaderIdx {
Expand Down Expand Up @@ -884,6 +896,16 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) {
s.state.onSendFailure(bo, s, err)
}

func (s *replicaSelector) onDeadlineExceeded() {
if target := s.targetReplica(); target != nil {
target.deadlineErrUsingConfTimeout = true
}
if accessLeader, ok := s.state.(*accessKnownLeader); ok {
// If leader return deadline exceeded error, we should try to access follower next time.
s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx}
}
}

func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState {
store := accessReplica.store
liveness := store.requestLiveness(bo, s.regionCache)
Expand Down Expand Up @@ -1484,7 +1506,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo
return nil, false, err
}
}
if e := s.onSendFail(bo, rpcCtx, err); e != nil {
if e := s.onSendFail(bo, rpcCtx, req, err); e != nil {
return nil, false, err
}
return nil, true, nil
Expand Down Expand Up @@ -1514,7 +1536,7 @@ func (s *RegionRequestSender) releaseStoreToken(st *Store) {
logutil.BgLogger().Warn("release store token failed, count equals to 0")
}

func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, err error) error {
func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, err error) error {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("regionRequest.onSendFail", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -1525,6 +1547,11 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e
return errors.WithStack(err)
} else if LoadShuttingDown() > 0 {
return errors.WithStack(tikverr.ErrTiDBShuttingDown)
} else if errors.Cause(err) == context.DeadlineExceeded && req.MaxExecutionDurationMs < uint64(client.ReadTimeoutShort.Milliseconds()) {
if s.replicaSelector != nil {
s.replicaSelector.onDeadlineExceeded()
return nil
}
}
if status.Code(errors.Cause(err)) == codes.Canceled {
select {
Expand Down Expand Up @@ -1598,6 +1625,9 @@ func regionErrorToLabel(e *errorpb.Error) string {
} else if e.GetEpochNotMatch() != nil {
return "epoch_not_match"
} else if e.GetServerIsBusy() != nil {
if strings.Contains(e.GetServerIsBusy().GetReason(), "deadline is exceeded") {
return "deadline_exceeded"
}
return "server_is_busy"
} else if e.GetStaleCommand() != nil {
return "stale_command"
Expand Down Expand Up @@ -1628,10 +1658,16 @@ func regionErrorToLabel(e *errorpb.Error) string {
// the `mismatch peer id` error does not has a specific error type, so we have to match the error message.
// TODO: add a specific error type for `mismatch peer id`.
return "mismatch_peer_id"
} else if isDeadlineExceeded(e) {
return "deadline_exceeded"
}
return "unknown"
}

func isDeadlineExceeded(e *errorpb.Error) bool {
return strings.Contains(e.GetMessage(), "Deadline is exceeded")
}

func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error) (shouldRetry bool, err error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1725,8 +1761,13 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
return retry, err
}

if regionErr.GetServerIsBusy() != nil {
logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil {
if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") {
s.replicaSelector.onDeadlineExceeded()
return true, nil
}
logutil.Logger(bo.GetCtx()).Warn(
"tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
zap.Stringer("ctx", ctx))
if s.replicaSelector.canFallback2Follower() {
Expand Down Expand Up @@ -1846,7 +1887,12 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
return true, nil
}

logutil.BgLogger().Debug("tikv reports region failed",
if isDeadlineExceeded(regionErr) && s.replicaSelector != nil {
s.replicaSelector.onDeadlineExceeded()
}

logutil.Logger(bo.GetCtx()).Debug(
"tikv reports region failed",
zap.Stringer("regionErr", regionErr),
zap.Stringer("ctx", ctx))

Expand Down
Loading