diff --git a/error/error.go b/error/error.go index 62a75ff656..e198763bba 100644 --- a/error/error.go +++ b/error/error.go @@ -64,7 +64,8 @@ var ( // ErrTiFlashServerTimeout is the error when tiflash server is timeout. ErrTiFlashServerTimeout = errors.New("tiflash server timeout") // ErrQueryInterrupted is the error when the query is interrupted. - ErrQueryInterrupted = errors.New("query interruppted") + // This is deprecated. Keep it only to pass CI :-(. We can remove this later. + ErrQueryInterrupted = errors.New("query interrupted") // ErrTiKVStaleCommand is the error that the command is stale in tikv. ErrTiKVStaleCommand = errors.New("tikv stale command") // ErrTiKVMaxTimestampNotSynced is the error that tikv's max timestamp is not synced. @@ -96,11 +97,19 @@ var ( // ErrIsWitness is the error when a request is send to a witness. ErrIsWitness = errors.New("peer is witness") // ErrUnknown is the unknow error. - ErrUnknown = errors.New("unknow") + ErrUnknown = errors.New("unknown") // ErrResultUndetermined is the error when execution result is unknown. ErrResultUndetermined = errors.New("execution result undetermined") ) +type ErrQueryInterruptedWithSignal struct { + Signal uint32 +} + +func (e ErrQueryInterruptedWithSignal) Error() string { + return fmt.Sprintf("query interrupted by signal %d", e.Signal) +} + // MismatchClusterID represents the message that the cluster ID of the PD client does not match the PD. const MismatchClusterID = "mismatch cluster id" diff --git a/internal/client/retry/backoff.go b/internal/client/retry/backoff.go new file mode 100644 index 0000000000..229d9766cb --- /dev/null +++ b/internal/client/retry/backoff.go @@ -0,0 +1,399 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// NOTE: The code in this file is based on code from the +// TiDB project, licensed under the Apache License v 2.0 +// +// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/retry/backoff.go +// + +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "bytes" + "context" + "fmt" + "math" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/log" + "github.com/pkg/errors" + tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/internal/logutil" + "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/util" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// Backoffer is a utility for retrying queries. +type Backoffer struct { + ctx context.Context + + fn map[string]backoffFn + maxSleep int + totalSleep int + excludedSleep int + + vars *kv.Variables + noop bool + + errors []error + configs []*Config + backoffSleepMS map[string]int + backoffTimes map[string]int + parent *Backoffer +} + +type txnStartCtxKeyType struct{} + +// TxnStartKey is a key for transaction start_ts info in context.Context. +var TxnStartKey interface{} = txnStartCtxKeyType{} + +// NewBackoffer (Deprecated) creates a Backoffer with maximum sleep time(in ms). +func NewBackoffer(ctx context.Context, maxSleep int) *Backoffer { + return &Backoffer{ + ctx: ctx, + maxSleep: maxSleep, + vars: kv.DefaultVars, + } +} + +// NewBackofferWithVars creates a Backoffer with maximum sleep time(in ms) and kv.Variables. +func NewBackofferWithVars(ctx context.Context, maxSleep int, vars *kv.Variables) *Backoffer { + return NewBackoffer(ctx, maxSleep).withVars(vars) +} + +// NewNoopBackoff create a Backoffer do nothing just return error directly +func NewNoopBackoff(ctx context.Context) *Backoffer { + return &Backoffer{ctx: ctx, noop: true} +} + +// withVars sets the kv.Variables to the Backoffer and return it. +func (b *Backoffer) withVars(vars *kv.Variables) *Backoffer { + if vars != nil { + b.vars = vars + } + // maxSleep is the max sleep time in millisecond. + // When it is multiplied by BackOffWeight, it should not be greater than MaxInt32. + if b.maxSleep > 0 && math.MaxInt32/b.vars.BackOffWeight >= b.maxSleep { + b.maxSleep *= b.vars.BackOffWeight + } + return b +} + +// Backoff sleeps a while base on the Config and records the error message. +// It returns a retryable error if total sleep time exceeds maxSleep. +func (b *Backoffer) Backoff(cfg *Config, err error) error { + if span := opentracing.SpanFromContext(b.ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan(fmt.Sprintf("tikv.backoff.%s", cfg), opentracing.ChildOf(span.Context())) + defer span1.Finish() + opentracing.ContextWithSpan(b.ctx, span1) + } + return b.BackoffWithCfgAndMaxSleep(cfg, -1, err) +} + +// BackoffWithMaxSleepTxnLockFast sleeps a while base on the MaxSleepTxnLock and records the error message +// and never sleep more than maxSleepMs for each sleep. +func (b *Backoffer) BackoffWithMaxSleepTxnLockFast(maxSleepMs int, err error) error { + cfg := BoTxnLockFast + return b.BackoffWithCfgAndMaxSleep(cfg, maxSleepMs, err) +} + +// BackoffWithCfgAndMaxSleep sleeps a while base on the Config and records the error message +// and never sleep more than maxSleepMs for each sleep. +func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err error) error { + if strings.Contains(err.Error(), tikverr.MismatchClusterID) { + logutil.Logger(b.ctx).Fatal("critical error", zap.Error(err)) + } + select { + case <-b.ctx.Done(): + return errors.WithStack(err) + default: + } + if b.noop { + return err + } + maxBackoffTimeExceeded := (b.totalSleep - b.excludedSleep) >= b.maxSleep + maxExcludedTimeExceeded := false + if maxLimit, ok := isSleepExcluded[cfg.name]; ok { + maxExcludedTimeExceeded = b.excludedSleep >= maxLimit && b.excludedSleep >= b.maxSleep + } + maxTimeExceeded := maxBackoffTimeExceeded || maxExcludedTimeExceeded + if b.maxSleep > 0 && maxTimeExceeded { + longestSleepCfg, longestSleepTime := b.longestSleepCfg() + errMsg := fmt.Sprintf("%s backoffer.maxSleep %dms is exceeded, errors:", cfg.String(), b.maxSleep) + for i, err := range b.errors { + // Print only last 3 errors for non-DEBUG log levels. + if log.GetLevel() == zapcore.DebugLevel || i >= len(b.errors)-3 { + errMsg += "\n" + err.Error() + } + } + var backoffDetail bytes.Buffer + totalTimes := 0 + for name, times := range b.backoffTimes { + totalTimes += times + if backoffDetail.Len() > 0 { + backoffDetail.WriteString(", ") + } + backoffDetail.WriteString(name) + backoffDetail.WriteString(":") + backoffDetail.WriteString(strconv.Itoa(times)) + } + errMsg += fmt.Sprintf("\ntotal-backoff-times: %v, backoff-detail: %v, maxBackoffTimeExceeded: %v, maxExcludedTimeExceeded: %v", + totalTimes, backoffDetail.String(), maxBackoffTimeExceeded, maxExcludedTimeExceeded) + returnedErr := err + if longestSleepCfg != nil { + errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime) + returnedErr = longestSleepCfg.err + } + logutil.Logger(b.ctx).Warn(errMsg) + // Use the backoff type that contributes most to the timeout to generate a MySQL error. + return errors.WithStack(returnedErr) + } + b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano))) + b.configs = append(b.configs, cfg) + + // Lazy initialize. + if b.fn == nil { + b.fn = make(map[string]backoffFn) + } + f, ok := b.fn[cfg.name] + if !ok { + f = cfg.createBackoffFn(b.vars) + b.fn[cfg.name] = f + } + realSleep := f(b.ctx, maxSleepMs) + if cfg.metric != nil { + (*cfg.metric).Observe(float64(realSleep) / 1000) + } + + b.totalSleep += realSleep + if _, ok := isSleepExcluded[cfg.name]; ok { + b.excludedSleep += realSleep + } + if b.backoffSleepMS == nil { + b.backoffSleepMS = make(map[string]int) + } + b.backoffSleepMS[cfg.name] += realSleep + if b.backoffTimes == nil { + b.backoffTimes = make(map[string]int) + } + b.backoffTimes[cfg.name]++ + + stmtExec := b.ctx.Value(util.ExecDetailsKey) + if stmtExec != nil { + detail := stmtExec.(*util.ExecDetails) + atomic.AddInt64(&detail.BackoffDuration, int64(realSleep)*int64(time.Millisecond)) + atomic.AddInt64(&detail.BackoffCount, 1) + } + + err2 := b.checkKilled() + if err2 != nil { + return err2 + } + + var startTs interface{} + if ts := b.ctx.Value(TxnStartKey); ts != nil { + startTs = ts + } + logutil.Logger(b.ctx).Debug( + "retry later", + zap.Error(err), + zap.Int("totalSleep", b.totalSleep), + zap.Int("excludedSleep", b.excludedSleep), + zap.Int("maxSleep", b.maxSleep), + zap.Stringer("type", cfg), + zap.Reflect("txnStartTS", startTs), + ) + return nil +} + +func (b *Backoffer) checkKilled() error { + if b.vars != nil && b.vars.Killed != nil { + killed := atomic.LoadUint32(b.vars.Killed) + if killed != 0 { + logutil.BgLogger().Info( + "backoff stops because a killed signal is received", + zap.Uint32("signal", killed), + ) + return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: killed}) + } + } + return nil +} + +func (b *Backoffer) String() string { + if b.totalSleep == 0 { + return "" + } + return fmt.Sprintf(" backoff(%dms %v)", b.totalSleep, b.configs) +} + +// copyMapWithoutRecursive is only used to deep copy map fields in the Backoffer type. +func copyMapWithoutRecursive(srcMap map[string]int) map[string]int { + result := map[string]int{} + for k, v := range srcMap { + result[k] = v + } + return result +} + +// Clone creates a new Backoffer which keeps current Backoffer's sleep time and errors, and shares +// current Backoffer's context. +// Some fields like `configs` and `vars` are concurrently used by all the backoffers in different threads, +// try not to modify the referenced content directly. +func (b *Backoffer) Clone() *Backoffer { + return &Backoffer{ + ctx: b.ctx, + maxSleep: b.maxSleep, + totalSleep: b.totalSleep, + excludedSleep: b.excludedSleep, + vars: b.vars, + errors: append([]error{}, b.errors...), + configs: append([]*Config{}, b.configs...), + backoffSleepMS: copyMapWithoutRecursive(b.backoffSleepMS), + backoffTimes: copyMapWithoutRecursive(b.backoffTimes), + parent: b.parent, + } +} + +// Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors, and holds +// a child context of current Backoffer's context. +// Some fields like `configs` and `vars` are concurrently used by all the backoffers in different threads, +// try not to modify the referenced content directly. +func (b *Backoffer) Fork() (*Backoffer, context.CancelFunc) { + ctx, cancel := context.WithCancel(b.ctx) + return &Backoffer{ + ctx: ctx, + maxSleep: b.maxSleep, + totalSleep: b.totalSleep, + excludedSleep: b.excludedSleep, + errors: append([]error{}, b.errors...), + configs: append([]*Config{}, b.configs...), + backoffSleepMS: copyMapWithoutRecursive(b.backoffSleepMS), + backoffTimes: copyMapWithoutRecursive(b.backoffTimes), + vars: b.vars, + parent: b, + }, cancel +} + +// GetVars returns the binded vars. +func (b *Backoffer) GetVars() *kv.Variables { + return b.vars +} + +// GetTotalSleep returns total sleep time. +func (b *Backoffer) GetTotalSleep() int { + return b.totalSleep +} + +// GetTypes returns type list of this backoff and all its ancestors. +func (b *Backoffer) GetTypes() []string { + typs := make([]string, 0, len(b.configs)) + for b != nil { + for _, cfg := range b.configs { + typs = append(typs, cfg.String()) + } + b = b.parent + } + return typs +} + +// GetCtx returns the binded context. +func (b *Backoffer) GetCtx() context.Context { + return b.ctx +} + +// SetCtx sets the binded context to ctx. +func (b *Backoffer) SetCtx(ctx context.Context) { + b.ctx = ctx +} + +// GetBackoffTimes returns a map contains backoff time count by type. +func (b *Backoffer) GetBackoffTimes() map[string]int { + return b.backoffTimes +} + +// GetTotalBackoffTimes returns the total backoff times of the backoffer. +func (b *Backoffer) GetTotalBackoffTimes() int { + total := 0 + for _, time := range b.backoffTimes { + total += time + } + return total +} + +// GetBackoffSleepMS returns a map contains backoff sleep time by type. +func (b *Backoffer) GetBackoffSleepMS() map[string]int { + return b.backoffSleepMS +} + +// ErrorsNum returns the number of errors. +func (b *Backoffer) ErrorsNum() int { + return len(b.errors) +} + +// Reset resets the sleep state of the backoffer, so that following backoff +// can sleep shorter. The reason why we don't create a new backoffer is that +// backoffer is similar to context and it records some metrics that we +// want to record for an entire process which is composed of serveral stages. +func (b *Backoffer) Reset() { + b.fn = nil + b.totalSleep = 0 + b.excludedSleep = 0 +} + +// ResetMaxSleep resets the sleep state and max sleep limit of the backoffer. +// It's used when switches to the next stage of the process. +func (b *Backoffer) ResetMaxSleep(maxSleep int) { + b.Reset() + b.maxSleep = maxSleep + b.withVars(b.vars) +} + +func (b *Backoffer) longestSleepCfg() (*Config, int) { + candidate := "" + maxSleep := 0 + for cfgName, sleepTime := range b.backoffSleepMS { + if _, ok := isSleepExcluded[cfgName]; sleepTime > maxSleep && !ok { + maxSleep = sleepTime + candidate = cfgName + } + } + for _, cfg := range b.configs { + if cfg.name == candidate { + return cfg, maxSleep + } + } + return nil, 0 +} diff --git a/internal/retry/backoff.go b/internal/retry/backoff.go index 9f6dcdd9a2..2ad3362028 100644 --- a/internal/retry/backoff.go +++ b/internal/retry/backoff.go @@ -383,7 +383,7 @@ func (b *Backoffer) CheckKilled() error { "backoff stops because a killed signal is received", zap.Uint32("signal", killed), ) - return errors.WithStack(tikverr.ErrQueryInterrupted) + return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: killed}) } } return nil diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index c7d99333bf..b7471f87a2 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1048,7 +1048,27 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action } // doActionOnBatches does action to batches in parallel. -func (c *twoPhaseCommitter) doActionOnBatches(bo *retry.Backoffer, action twoPhaseCommitAction, batches []batchMutations) error { +func (c *twoPhaseCommitter) doActionOnBatches( + bo *retry.Backoffer, action twoPhaseCommitAction, + batches []batchMutations, +) error { + // killSignal should never be nil for TiDB + if c.txn != nil && c.txn.vars != nil && c.txn.vars.Killed != nil { + // Do not reset the killed flag here. Let the upper layer reset the flag. + // Before it resets, any request is considered valid to be killed. + status := atomic.LoadUint32(c.txn.vars.Killed) + if status != 0 { + logutil.BgLogger().Info( + "query is killed", zap.Uint32( + "signal", + status, + ), + ) + // TODO: There might be various signals besides a query interruption, + // but we are unable to differentiate them, because the definition is in TiDB. + return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: status}) + } + } if len(batches) == 0 { return nil }