From 9f08a5c74e1102cd237e719815710f81218b1f5b Mon Sep 17 00:00:00 2001 From: zyguan Date: Sun, 26 Jan 2025 10:14:22 +0000 Subject: [PATCH 1/5] locate: refactor RegionRequestSender.SendReqCtx Signed-off-by: zyguan --- internal/locate/region_request.go | 633 +++++++++++++++--------------- 1 file changed, 324 insertions(+), 309 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 16e56881ce..5cd2c6a77b 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -732,328 +732,171 @@ func IsFakeRegionError(err *errorpb.Error) bool { const slowLogSendReqTime = 100 * time.Millisecond -// SendReqCtx sends a request to tikv server and return response and RPCCtx of this RPC. -func (s *RegionRequestSender) SendReqCtx( +type sendReqState struct { + *RegionRequestSender + + vars struct { + rpcCtx *RPCContext + resp *tikvrpc.Response + regionErr *errorpb.Error + err error + msg string + sendTimes int + } +} + +func (s *sendReqState) next( bo *retry.Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, et tikvrpc.EndpointType, - opts ...StoreSelectorOption, -) ( - resp *tikvrpc.Response, - rpcCtx *RPCContext, - retryTimes int, - err error, -) { - if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("regionRequest.SendReqCtx", opentracing.ChildOf(span.Context())) - defer span1.Finish() - bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) - } - - if resp, err = failpointSendReqResult(req, et); err != nil || resp != nil { - return + opts []StoreSelectorOption, +) (done bool) { + // check whether the session/query is killed during the Next() + if err := bo.CheckKilled(); err != nil { + s.vars.resp, s.vars.err = nil, err + return true } - if err = s.validateReadTS(bo.GetCtx(), req); err != nil { - logutil.Logger(bo.GetCtx()).Error("validate read ts failed for request", zap.Stringer("reqType", req.Type), zap.Stringer("req", req.Req.(fmt.Stringer)), zap.Stringer("context", &req.Context), zap.Stack("stack"), zap.Error(err)) - return nil, nil, 0, err + // handle send error + if s.vars.err != nil { + if e := s.onSendFail(bo, s.vars.rpcCtx, req, s.vars.err); e != nil { + s.vars.rpcCtx, s.vars.resp = nil, nil + s.vars.msg = fmt.Sprintf("failed to handle send error: %v", s.vars.err) + return true + } + s.vars.err = nil } - // If the MaxExecutionDurationMs is not set yet, we set it to be the RPC timeout duration - // so TiKV can give up the requests whose response TiDB cannot receive due to timeout. - if req.Context.MaxExecutionDurationMs == 0 { - req.Context.MaxExecutionDurationMs = uint64(timeout.Milliseconds()) + // handle region error + if s.vars.regionErr != nil { + retry, err := s.onRegionError(bo, s.vars.rpcCtx, req, s.vars.regionErr) + if err != nil { + s.vars.rpcCtx, s.vars.resp = nil, nil + s.vars.msg = fmt.Sprintf("failed to handle region error: %v", err) + return true + } + if !retry { + s.vars.msg = fmt.Sprintf("met unretriable region error: %T", s.vars.regionErr) + return true + } + s.vars.regionErr = nil } - s.reset() - startTime := time.Now() - startBackOff := bo.GetTotalSleep() - retryTimes = 0 - defer func() { - if retryTimes > 0 { - metrics.TiKVRequestRetryTimesHistogram.Observe(float64(retryTimes)) - } - }() + s.vars.rpcCtx, s.vars.resp = nil, nil - if req.StaleRead { - defer func() { - if retryTimes == 0 { - metrics.StaleReadHitCounter.Add(1) - } else { - metrics.StaleReadMissCounter.Add(1) - } - }() + s.vars.rpcCtx, s.vars.err = s.getRPCContext(bo, req, regionID, et, opts...) + if s.vars.err != nil { + return true } - for { - if retryTimes > 0 { - if retryTimes%100 == 0 { - logutil.Logger(bo.GetCtx()).Warn( - "retry", - zap.Uint64("region", regionID.GetID()), - zap.Int("times", retryTimes), - ) - } - } + if s.vars.rpcCtx == nil { + // TODO(youjiali1995): remove it when using the replica selector for all requests. + // If the region is not found in cache, it must be out + // of date and already be cleaned up. We can skip the + // RPC by returning RegionError directly. - rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...) - if err != nil { - return nil, nil, retryTimes, err - } - - if _, err := util.EvalFailpoint("invalidCacheAndRetry"); err == nil { - // cooperate with tikvclient/setGcResolveMaxBackoff - if c := bo.GetCtx().Value("injectedBackoff"); c != nil { - resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) - return resp, nil, retryTimes, err - } - } - if rpcCtx == nil { - // TODO(youjiali1995): remove it when using the replica selector for all requests. - // If the region is not found in cache, it must be out - // of date and already be cleaned up. We can skip the - // RPC by returning RegionError directly. - - // TODO: Change the returned error to something like "region missing in cache", - // and handle this error like EpochNotMatch, which means to re-split the request and retry. - if s.replicaSelector != nil { - if err := s.replicaSelector.backoffOnNoCandidate(bo); err != nil { - return nil, nil, retryTimes, err - } - if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 { - s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) - } - } - resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) - return resp, nil, retryTimes, err - } - // patch the access location if it is not set under region request sender. which includes the coprocessor, - // txn relative tikv request. - // note: MPP not use this path. need specified in the MPP layer. - patchAccessLocation := func() { - if s.replicaSelector.target.store.IsLabelsMatch(s.replicaSelector.option.labels) { - req.AccessLocation = kv.AccessLocalZone - } else { - req.AccessLocation = kv.AccessCrossZone + // TODO: Change the returned error to something like "region missing in cache", + // and handle this error like EpochNotMatch, which means to re-split the request and retry. + if s.replicaSelector != nil { + if s.vars.err = s.replicaSelector.backoffOnNoCandidate(bo); s.vars.err != nil { + return true } } - if s.replicaSelector != nil && - s.replicaSelector.target != nil && - req.AccessLocation == kv.AccessUnknown && - len(s.replicaSelector.option.labels) != 0 { - // patch the access location if it is not set under region request sender. - patchAccessLocation() - } - logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr) - s.storeAddr = rpcCtx.Addr + s.vars.regionErr = &errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}} + s.vars.resp, s.vars.err = tikvrpc.GenRegionErrorResp(req, s.vars.regionErr) + s.vars.msg = "throwing pseudo region error due to no replica available" + return true + } - if _, err := util.EvalFailpoint("beforeSendReqToRegion"); err == nil { - if hook := bo.GetCtx().Value("sendReqToRegionHook"); hook != nil { - h := hook.(func(*tikvrpc.Request)) - h(req) - } + if s.replicaSelector != nil && + s.replicaSelector.target != nil && + req.AccessLocation == kv.AccessUnknown && + len(s.replicaSelector.option.labels) != 0 { + // patch the access location if it is not set under region request sender. + if s.replicaSelector.target.store.IsLabelsMatch(s.replicaSelector.option.labels) { + req.AccessLocation = kv.AccessLocalZone + } else { + req.AccessLocation = kv.AccessCrossZone } + } - req.Context.ClusterId = rpcCtx.ClusterID - if req.InputRequestSource != "" && s.replicaSelector != nil { - patchRequestSource(req, s.replicaSelector.replicaType()) - } - // RPCClient.SendRequest will attach `req.Context` thus skip attaching here to reduce overhead. - if err := tikvrpc.SetContextNoAttach(req, rpcCtx.Meta, rpcCtx.Peer); err != nil { - return nil, nil, retryTimes, err - } - if s.replicaSelector != nil { - if err := s.replicaSelector.backoffOnRetry(rpcCtx.Store, bo); err != nil { - return nil, nil, retryTimes, err - } - } + logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, s.vars.rpcCtx.Addr) + s.storeAddr = s.vars.rpcCtx.Addr - var retry bool - resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) - req.IsRetryRequest = true - if err != nil { - if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 { - msg := fmt.Sprintf("send request failed, err: %v", err.Error()) - s.logSendReqError(bo, msg, regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) - } - return nil, nil, retryTimes, err + if _, err := util.EvalFailpoint("beforeSendReqToRegion"); err == nil { + if hook := bo.GetCtx().Value("sendReqToRegionHook"); hook != nil { + h := hook.(func(*tikvrpc.Request)) + h(req) } + } - if _, err1 := util.EvalFailpoint("afterSendReqToRegion"); err1 == nil { - if hook := bo.GetCtx().Value("sendReqToRegionFinishHook"); hook != nil { - h := hook.(func(*tikvrpc.Request, *tikvrpc.Response, error)) - h(req, resp, err) - } + req.IsRetryRequest = s.vars.sendTimes > 0 + req.Context.ClusterId = s.vars.rpcCtx.ClusterID + if req.InputRequestSource != "" && s.replicaSelector != nil { + patchRequestSource(req, s.replicaSelector.replicaType()) + } + // RPCClient.SendRequest will attach `req.Context` thus skip attaching here to reduce overhead. + if s.vars.err = tikvrpc.SetContextNoAttach(req, s.vars.rpcCtx.Meta, s.vars.rpcCtx.Peer); s.vars.err != nil { + return true + } + if s.replicaSelector != nil { + if s.vars.err = s.replicaSelector.backoffOnRetry(s.vars.rpcCtx.Store, bo); s.vars.err != nil { + return true } + } - // recheck whether the session/query is killed during the Next() - if err2 := bo.CheckKilled(); err2 != nil { - return nil, nil, retryTimes, err2 - } - if val, err := util.EvalFailpoint("mockRetrySendReqToRegion"); err == nil { - if val.(bool) { - retry = true - } - } - if retry { - retryTimes++ - continue + // judge the store limit switch. + if limit := kv.StoreLimit.Load(); limit > 0 { + if s.vars.err = s.getStoreToken(s.vars.rpcCtx.Store, limit); s.vars.err != nil { + return true } + defer s.releaseStoreToken(s.vars.rpcCtx.Store) + } - var regionErr *errorpb.Error - regionErr, err = resp.GetRegionError() - if err != nil { - return nil, nil, retryTimes, err + sendCtx := s.send(bo, req, timeout) + s.vars.sendTimes++ + + if s.vars.err != nil { + // Because in rpc logic, context.Cancel() will be transferred to rpcContext.Cancel error. For rpcContext cancel, + // we need to retry the request. But for context cancel active, for example, limitExec gets the required rows, + // we shouldn't retry the request, it will go to backoff and hang in retry logic. + if sendCtx.Err() != nil && errors.Cause(sendCtx.Err()) == context.Canceled { + metrics.TiKVRPCErrorCounter.WithLabelValues("context-canceled", storeIDLabel(s.vars.rpcCtx)).Inc() + return true } - if regionErr != nil { - retry, err = s.onRegionError(bo, rpcCtx, req, regionErr) - if err != nil { - if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 { - msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) - s.logSendReqError(bo, msg, regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) - } - return nil, nil, retryTimes, err - } - if retry { - retryTimes++ - continue - } - if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 { - s.logSendReqError(bo, "send request meet region error without retry", regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) - } - } else { - if s.replicaSelector != nil { - s.replicaSelector.onSendSuccess(req) - } + if val, e := util.EvalFailpoint("noRetryOnRpcError"); e == nil && val.(bool) { + return true } - - return resp, rpcCtx, retryTimes, nil + // need to handle send error + return false } -} -func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, cost time.Duration, currentBackoffMs int, timeout time.Duration) { - var builder strings.Builder - // build the total round stats string. - builder.WriteString("{total-backoff: ") - builder.WriteString(util.FormatDuration(time.Duration(bo.GetTotalSleep() * int(time.Millisecond)))) - builder.WriteString(", total-backoff-times: ") - builder.WriteString(strconv.Itoa(bo.GetTotalBackoffTimes())) - if s.Stats != nil { - builder.WriteString(", total-rpc: {") - builder.WriteString(s.Stats.String()) - builder.WriteString("}") + if val, err := util.EvalFailpoint("mockRetrySendReqToRegion"); err == nil && val.(bool) { + // force retry + return false } - builder.WriteString("}") - totalRoundStats := builder.String() - // build the current round stats string. - builder.Reset() - builder.WriteString("{time: ") - builder.WriteString(util.FormatDuration(cost)) - builder.WriteString(", backoff: ") - builder.WriteString(util.FormatDuration(time.Duration(currentBackoffMs * int(time.Millisecond)))) - builder.WriteString(", timeout: ") - builder.WriteString(util.FormatDuration(timeout)) - builder.WriteString(", req-max-exec-timeout: ") - builder.WriteString(util.FormatDuration(time.Duration(int64(req.Context.MaxExecutionDurationMs) * int64(time.Millisecond)))) - builder.WriteString(", retry-times: ") - builder.WriteString(strconv.Itoa(retryTimes)) - if s.AccessStats != nil { - builder.WriteString(", replica-access: {") - builder.WriteString(s.AccessStats.String()) - builder.WriteString("}") + s.vars.regionErr, s.vars.err = s.vars.resp.GetRegionError() + if s.vars.err != nil { + s.vars.rpcCtx, s.vars.resp = nil, nil + return true + } else if s.vars.regionErr != nil { + // need to handle region error + return false } - builder.WriteString("}") - currentRoundStats := builder.String() - logutil.Logger(bo.GetCtx()).Info(msg, - zap.Uint64("req-ts", req.GetStartTS()), - zap.String("req-type", req.Type.String()), - zap.String("region", regionID.String()), - zap.String("replica-read-type", req.ReplicaReadType.String()), - zap.Bool("stale-read", req.StaleRead), - zap.Stringer("request-sender", s), - zap.String("total-round-stats", totalRoundStats), - zap.String("current-round-stats", currentRoundStats)) -} - -// RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx. -type RPCCancellerCtxKey struct{} -// RPCCanceller is rpc send cancelFunc collector. -type RPCCanceller struct { - sync.Mutex - allocID int - cancels map[int]func() - cancelled bool -} - -// NewRPCanceller creates RPCCanceller with init state. -func NewRPCanceller() *RPCCanceller { - return &RPCCanceller{cancels: make(map[int]func())} -} - -// WithCancel generates new context with cancel func. -func (h *RPCCanceller) WithCancel(ctx context.Context) (context.Context, func()) { - nctx, cancel := context.WithCancel(ctx) - h.Lock() - if h.cancelled { - h.Unlock() - cancel() - return nctx, func() {} - } - id := h.allocID - h.allocID++ - h.cancels[id] = cancel - h.Unlock() - return nctx, func() { - cancel() - h.Lock() - delete(h.cancels, id) - h.Unlock() + if s.replicaSelector != nil { + s.replicaSelector.onSendSuccess(req) } -} -// CancelAll cancels all inflight rpc context. -func (h *RPCCanceller) CancelAll() { - h.Lock() - for _, c := range h.cancels { - c() - } - h.cancelled = true - h.Unlock() + return true } -func fetchRespInfo(resp *tikvrpc.Response) string { - var extraInfo string - if resp == nil || resp.Resp == nil { - extraInfo = "nil response" - } else { - regionErr, e := resp.GetRegionError() - if e != nil { - extraInfo = e.Error() - } else if regionErr != nil { - extraInfo = regionErr.String() - } else if prewriteResp, ok := resp.Resp.(*kvrpcpb.PrewriteResponse); ok { - extraInfo = prewriteResp.String() - } - } - return extraInfo -} - -func (s *RegionRequestSender) sendReqToRegion( - bo *retry.Backoffer, rpcCtx *RPCContext, req *tikvrpc.Request, timeout time.Duration, -) (resp *tikvrpc.Response, retry bool, err error) { - // judge the store limit switch. - if limit := kv.StoreLimit.Load(); limit > 0 { - if err := s.getStoreToken(rpcCtx.Store, limit); err != nil { - return nil, false, err - } - defer s.releaseStoreToken(rpcCtx.Store) - } - - ctx := bo.GetCtx() +func (s *sendReqState) send(bo *retry.Backoffer, req *tikvrpc.Request, timeout time.Duration) (ctx context.Context) { + rpcCtx := s.vars.rpcCtx + ctx = bo.GetCtx() if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil { var cancel context.CancelFunc ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx) @@ -1107,13 +950,13 @@ func (s *RegionRequestSender) sendReqToRegion( zap.Stringer("req", req.Req.(fmt.Stringer)), zap.Stringer("ctx", &req.Context), ) injectFailOnSend = true - err = errors.New("injected RPC error on send") + s.vars.err = errors.New("injected RPC error on send") } } if !injectFailOnSend { start := time.Now() - resp, err = s.client.SendRequest(ctx, sendToAddr, req, timeout) + s.vars.resp, s.vars.err = s.client.SendRequest(ctx, sendToAddr, req, timeout) rpcDuration := time.Since(start) if s.replicaSelector != nil { recordAttemptedTime(s.replicaSelector, rpcDuration) @@ -1127,9 +970,10 @@ func (s *RegionRequestSender) sendReqToRegion( if val, fpErr := util.EvalFailpoint("tikvStoreRespResult"); fpErr == nil { if val.(bool) { if req.Type == tikvrpc.CmdCop && bo.GetTotalSleep() == 0 { - return &tikvrpc.Response{ + s.vars.resp, s.vars.err = &tikvrpc.Response{ Resp: &coprocessor.Response{RegionError: &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}}, - }, false, nil + }, nil + return } } } @@ -1152,10 +996,9 @@ func (s *RegionRequestSender) sendReqToRegion( logutil.Logger(ctx).Info( "[failpoint] injected RPC error on recv", zap.Stringer("type", req.Type), zap.Stringer("req", req.Req.(fmt.Stringer)), zap.Stringer("ctx", &req.Context), - zap.Error(err), zap.String("extra response info", fetchRespInfo(resp)), + zap.Error(s.vars.err), zap.String("extra response info", fetchRespInfo(s.vars.resp)), ) - err = errors.New("injected RPC error on recv") - resp = nil + s.vars.resp, s.vars.err = nil, errors.New("injected RPC error on recv") } } @@ -1165,15 +1008,14 @@ func (s *RegionRequestSender) sendReqToRegion( cancel() <-ctx1.Done() ctx = ctx1 - err = ctx.Err() - resp = nil + s.vars.resp, s.vars.err = nil, ctx.Err() } } if _, e := util.EvalFailpoint("onRPCFinishedHook"); e == nil { if hook := bo.GetCtx().Value("onRPCFinishedHook"); hook != nil { h := hook.(func(*tikvrpc.Request, *tikvrpc.Response, error) (*tikvrpc.Response, error)) - resp, err = h(req, resp, err) + s.vars.resp, s.vars.err = h(req, s.vars.resp, s.vars.err) } } } @@ -1182,42 +1024,215 @@ func (s *RegionRequestSender) sendReqToRegion( fromStore := strconv.FormatUint(rpcCtx.ProxyStore.storeID, 10) toStore := strconv.FormatUint(rpcCtx.Store.storeID, 10) result := "ok" - if err != nil { + if s.vars.err != nil { result = "fail" } metrics.TiKVForwardRequestCounter.WithLabelValues(fromStore, toStore, req.Type.String(), result).Inc() } - if err != nil { + if err := s.vars.err; err != nil { if isRPCError(err) { s.rpcError = err } if s.Stats != nil { errStr := getErrMsg(err) s.Stats.RecordRPCErrorStats(errStr) - s.recordRPCAccessInfo(req, rpcCtx, errStr) - } - // Because in rpc logic, context.Cancel() will be transferred to rpcContext.Cancel error. For rpcContext cancel, - // we need to retry the request. But for context cancel active, for example, limitExec gets the required rows, - // we shouldn't retry the request, it will go to backoff and hang in retry logic. - if ctx.Err() != nil && errors.Cause(ctx.Err()) == context.Canceled { - metrics.TiKVRPCErrorCounter.WithLabelValues("context-canceled", storeIDLabel(rpcCtx)).Inc() - return nil, false, errors.WithStack(ctx.Err()) + s.recordRPCAccessInfo(req, s.vars.rpcCtx, errStr) } + } + return +} - if val, e := util.EvalFailpoint("noRetryOnRpcError"); e == nil { - if val.(bool) { - return nil, false, err +// SendReqCtx sends a request to tikv server and return response and RPCCtx of this RPC. +func (s *RegionRequestSender) SendReqCtx( + bo *retry.Backoffer, + req *tikvrpc.Request, + regionID RegionVerID, + timeout time.Duration, + et tikvrpc.EndpointType, + opts ...StoreSelectorOption, +) ( + resp *tikvrpc.Response, + rpcCtx *RPCContext, + retryTimes int, + err error, +) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("regionRequest.SendReqCtx", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } + + if resp, err = failpointSendReqResult(req, et); err != nil || resp != nil { + return + } + + if err = s.validateReadTS(bo.GetCtx(), req); err != nil { + logutil.Logger(bo.GetCtx()).Error("validate read ts failed for request", zap.Stringer("reqType", req.Type), zap.Stringer("req", req.Req.(fmt.Stringer)), zap.Stringer("context", &req.Context), zap.Stack("stack"), zap.Error(err)) + return nil, nil, 0, err + } + + // If the MaxExecutionDurationMs is not set yet, we set it to be the RPC timeout duration + // so TiKV can give up the requests whose response TiDB cannot receive due to timeout. + if req.Context.MaxExecutionDurationMs == 0 { + req.Context.MaxExecutionDurationMs = uint64(timeout.Milliseconds()) + } + + state := &sendReqState{RegionRequestSender: s} + defer func() { + if retryTimes := state.vars.sendTimes - 1; retryTimes > 0 { + metrics.TiKVRequestRetryTimesHistogram.Observe(float64(retryTimes)) + } + if req.StaleRead { + if state.vars.sendTimes == 1 { + metrics.StaleReadHitCounter.Add(1) + } else { + metrics.StaleReadMissCounter.Add(1) } } - if e := s.onSendFail(bo, rpcCtx, req, err); e != nil { - return nil, false, err + }() + + s.reset() + startTime := time.Now() + startBackOff := bo.GetTotalSleep() + + for !state.next(bo, req, regionID, timeout, et, opts) { + if retryTimes := state.vars.sendTimes - 1; retryTimes > 0 && retryTimes%100 == 0 { + logutil.Logger(bo.GetCtx()).Warn("retry", zap.Uint64("region", regionID.GetID()), zap.Int("times", retryTimes)) } - return nil, true, nil } + + if state.vars.err == nil { + resp, rpcCtx = state.vars.resp, state.vars.rpcCtx + } else { + err = state.vars.err + } + if state.vars.sendTimes > 1 { + retryTimes = state.vars.sendTimes - 1 + } + + if len(state.vars.msg) > 0 || err != nil { + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 { + msg := state.vars.msg + if len(msg) == 0 { + msg = fmt.Sprintf("send request failed: %v", err) + } + s.logSendReqError(bo, msg, regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } + } + return } +func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, cost time.Duration, currentBackoffMs int, timeout time.Duration) { + var builder strings.Builder + // build the total round stats string. + builder.WriteString("{total-backoff: ") + builder.WriteString(util.FormatDuration(time.Duration(bo.GetTotalSleep() * int(time.Millisecond)))) + builder.WriteString(", total-backoff-times: ") + builder.WriteString(strconv.Itoa(bo.GetTotalBackoffTimes())) + if s.Stats != nil { + builder.WriteString(", total-rpc: {") + builder.WriteString(s.Stats.String()) + builder.WriteString("}") + } + builder.WriteString("}") + totalRoundStats := builder.String() + + // build the current round stats string. + builder.Reset() + builder.WriteString("{time: ") + builder.WriteString(util.FormatDuration(cost)) + builder.WriteString(", backoff: ") + builder.WriteString(util.FormatDuration(time.Duration(currentBackoffMs * int(time.Millisecond)))) + builder.WriteString(", timeout: ") + builder.WriteString(util.FormatDuration(timeout)) + builder.WriteString(", req-max-exec-timeout: ") + builder.WriteString(util.FormatDuration(time.Duration(int64(req.Context.MaxExecutionDurationMs) * int64(time.Millisecond)))) + builder.WriteString(", retry-times: ") + builder.WriteString(strconv.Itoa(retryTimes)) + if s.AccessStats != nil { + builder.WriteString(", replica-access: {") + builder.WriteString(s.AccessStats.String()) + builder.WriteString("}") + } + builder.WriteString("}") + currentRoundStats := builder.String() + logutil.Logger(bo.GetCtx()).Info(msg, + zap.Uint64("req-ts", req.GetStartTS()), + zap.String("req-type", req.Type.String()), + zap.String("region", regionID.String()), + zap.String("replica-read-type", req.ReplicaReadType.String()), + zap.Bool("stale-read", req.StaleRead), + zap.Stringer("request-sender", s), + zap.String("total-round-stats", totalRoundStats), + zap.String("current-round-stats", currentRoundStats)) +} + +// RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx. +type RPCCancellerCtxKey struct{} + +// RPCCanceller is rpc send cancelFunc collector. +type RPCCanceller struct { + sync.Mutex + allocID int + cancels map[int]func() + cancelled bool +} + +// NewRPCanceller creates RPCCanceller with init state. +func NewRPCanceller() *RPCCanceller { + return &RPCCanceller{cancels: make(map[int]func())} +} + +// WithCancel generates new context with cancel func. +func (h *RPCCanceller) WithCancel(ctx context.Context) (context.Context, func()) { + nctx, cancel := context.WithCancel(ctx) + h.Lock() + if h.cancelled { + h.Unlock() + cancel() + return nctx, func() {} + } + id := h.allocID + h.allocID++ + h.cancels[id] = cancel + h.Unlock() + return nctx, func() { + cancel() + h.Lock() + delete(h.cancels, id) + h.Unlock() + } +} + +// CancelAll cancels all inflight rpc context. +func (h *RPCCanceller) CancelAll() { + h.Lock() + for _, c := range h.cancels { + c() + } + h.cancelled = true + h.Unlock() +} + +func fetchRespInfo(resp *tikvrpc.Response) string { + var extraInfo string + if resp == nil || resp.Resp == nil { + extraInfo = "nil response" + } else { + regionErr, e := resp.GetRegionError() + if e != nil { + extraInfo = e.Error() + } else if regionErr != nil { + extraInfo = regionErr.String() + } else if prewriteResp, ok := resp.Resp.(*kvrpcpb.PrewriteResponse); ok { + extraInfo = prewriteResp.String() + } + } + return extraInfo +} + func isRPCError(err error) bool { // exclude ErrClientResourceGroupThrottled return err != nil && errs.ErrClientResourceGroupThrottled.NotEqual(err) From b571c0825f5790098adc6ac23098d35615900b50 Mon Sep 17 00:00:00 2001 From: zyguan Date: Sun, 26 Jan 2025 10:27:33 +0000 Subject: [PATCH 2/5] try to fix ut Signed-off-by: zyguan --- internal/locate/region_request.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 5cd2c6a77b..d56d5b323c 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -804,7 +804,7 @@ func (s *sendReqState) next( return true } } - s.vars.regionErr = &errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}} + s.vars.regionErr = &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}} s.vars.resp, s.vars.err = tikvrpc.GenRegionErrorResp(req, s.vars.regionErr) s.vars.msg = "throwing pseudo region error due to no replica available" return true From d6ce406b299edfd9a421ecbc11fc3f29fdb3e37b Mon Sep 17 00:00:00 2001 From: zyguan Date: Sun, 26 Jan 2025 10:35:03 +0000 Subject: [PATCH 3/5] fix an anothor issue Signed-off-by: zyguan --- internal/locate/region_request.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index d56d5b323c..f0e6e01ed0 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -774,6 +774,7 @@ func (s *sendReqState) next( retry, err := s.onRegionError(bo, s.vars.rpcCtx, req, s.vars.regionErr) if err != nil { s.vars.rpcCtx, s.vars.resp = nil, nil + s.vars.err = err s.vars.msg = fmt.Sprintf("failed to handle region error: %v", err) return true } From 0b6c751f4c3990273c7d77144765b275be9bc236 Mon Sep 17 00:00:00 2001 From: zyguan Date: Sun, 26 Jan 2025 13:34:04 +0000 Subject: [PATCH 4/5] fix TestIsRetryRequestFlagWithRegionError Signed-off-by: zyguan --- internal/locate/region_request.go | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index f0e6e01ed0..8ee38285a8 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -786,12 +786,24 @@ func (s *sendReqState) next( } s.vars.rpcCtx, s.vars.resp = nil, nil + if !req.IsRetryRequest && s.vars.sendTimes > 0 { + req.IsRetryRequest = true + } s.vars.rpcCtx, s.vars.err = s.getRPCContext(bo, req, regionID, et, opts...) if s.vars.err != nil { return true } + if _, err := util.EvalFailpoint("invalidCacheAndRetry"); err == nil { + // cooperate with tikvclient/setGcResolveMaxBackoff + if c := bo.GetCtx().Value("injectedBackoff"); c != nil { + s.vars.regionErr = &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}} + s.vars.resp, s.vars.err = tikvrpc.GenRegionErrorResp(req, s.vars.regionErr) + return true + } + } + if s.vars.rpcCtx == nil { // TODO(youjiali1995): remove it when using the replica selector for all requests. // If the region is not found in cache, it must be out @@ -826,14 +838,6 @@ func (s *sendReqState) next( logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, s.vars.rpcCtx.Addr) s.storeAddr = s.vars.rpcCtx.Addr - if _, err := util.EvalFailpoint("beforeSendReqToRegion"); err == nil { - if hook := bo.GetCtx().Value("sendReqToRegionHook"); hook != nil { - h := hook.(func(*tikvrpc.Request)) - h(req) - } - } - - req.IsRetryRequest = s.vars.sendTimes > 0 req.Context.ClusterId = s.vars.rpcCtx.ClusterID if req.InputRequestSource != "" && s.replicaSelector != nil { patchRequestSource(req, s.replicaSelector.replicaType()) @@ -848,6 +852,13 @@ func (s *sendReqState) next( } } + if _, err := util.EvalFailpoint("beforeSendReqToRegion"); err == nil { + if hook := bo.GetCtx().Value("sendReqToRegionHook"); hook != nil { + h := hook.(func(*tikvrpc.Request)) + h(req) + } + } + // judge the store limit switch. if limit := kv.StoreLimit.Load(); limit > 0 { if s.vars.err = s.getStoreToken(s.vars.rpcCtx.Store, limit); s.vars.err != nil { From 7498d18a4141fa62a5b860792d9239c638ee59fe Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 18 Feb 2025 12:27:14 +0000 Subject: [PATCH 5/5] add some comments to sendReqState Signed-off-by: zyguan --- internal/locate/region_request.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 8ee38285a8..f62665d9c0 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -732,6 +732,8 @@ func IsFakeRegionError(err *errorpb.Error) bool { const slowLogSendReqTime = 100 * time.Millisecond +// sendReqState represents the state of sending request with retry, which allows us to construct a state and start to +// retry from that state. type sendReqState struct { *RegionRequestSender @@ -745,6 +747,10 @@ type sendReqState struct { } } +// next encapsulates one iteration of the retry loop. calling `next` will handle send error (s.vars.err) or region error +// (s.vars.regionErr) if one of them exists. When the error is retriable, `next` then constructs a new RPCContext and +// sends the request again. `next` returns true if the retry loop should stop, either because the request is done or +// exhausted (cannot complete by retrying). func (s *sendReqState) next( bo *retry.Backoffer, req *tikvrpc.Request,