Skip to content

Commit

Permalink
kvserver: move nondeterministic failures to kvserverbase
Browse files Browse the repository at this point in the history
They need to be in a leaf package, since we need to return them during
standalone log application.

Release note: None
  • Loading branch information
tbg committed Nov 15, 2022
1 parent b57ea75 commit a1d7605
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 70 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvserverbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"bulk_adder.go",
"forced_error.go",
"knobs.go",
"nondeterministic_error.go",
"raftversion.go",
"stores.go",
],
Expand Down
71 changes: 71 additions & 0 deletions pkg/kv/kvserver/kvserverbase/nondeterministic_failure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//

package kvserverbase

import (
"fmt"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// NonDeterministicError is an error type that indicates that a state machine
// transition failed due to an unexpected error. Failure to perform a state
// transition is a form of non-determinism, so it can't be permitted for any
// reason during the application phase of state machine replication. The only
// acceptable recourse is to signal that the replica has become corrupted.
//
// Many errors returned by apply.Decoder and apply.StateMachine implementations
// will be instances of this type.
type NonDeterministicError struct {
wrapped error
expl redact.RedactableString
}

// NonDeterministicErrorf creates a NonDeterministicError.
func NonDeterministicErrorf(format string, args ...interface{}) error {
err := errors.AssertionFailedWithDepthf(1, format, args...)
return &NonDeterministicError{
wrapped: err,
expl: redact.Sprintf(format, args...),
}
}

// NonDeterministicErrorWrapf wraps the provided error with a NonDeterministicError.
func NonDeterministicErrorWrapf(err error, format string, args ...interface{}) error {
return &NonDeterministicError{
wrapped: errors.Wrapf(err, format, args...),
expl: redact.Sprintf(format, args...),
}
}

// Error implements the error interface.
func (e *NonDeterministicError) Error() string {
return fmt.Sprintf("non-deterministic failure: %s", e.wrapped.Error())
}

// Cause implements the github.com/pkg/errors.causer interface.
func (e *NonDeterministicError) Cause() error { return e.wrapped }

// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is
// planned to be moved to the stdlib in go 1.13.
func (e *NonDeterministicError) Unwrap() error { return e.wrapped }

// GetRedactedNonDeterministicFailureExplanation loads message from the first wrapped
// NonDeterministicError, if any. The returned message is *redacted*,
// i.e. contains no sensitive information.
func GetRedactedNonDeterministicFailureExplanation(err error) redact.RedactableString {
if nd := (*NonDeterministicError)(nil); errors.As(err, &nd) {
return nd.expl.Redact()
}
return "???"
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/raftlog/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "raftlog",
srcs = [
"command.go",
"entry.go",
"iterator.go",
],
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *ReplicatedCmdBase) Decode(e *raftpb.Entry) error {
var err error
c.Entry, err = raftlog.NewEntry(*e)
if err != nil {
return wrapWithNonDeterministicFailure(err, "while decoding raft entry")
return kvserverbase.WrapWithNonDeterministicFailuref(err, "while decoding raft entry")
}
return nil
}
Expand Down
67 changes: 12 additions & 55 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package kvserver

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
Expand Down Expand Up @@ -58,48 +57,6 @@ type applyCommittedEntriesStats struct {
followerStoreWriteBytes kvadmission.FollowerStoreWriteBytes
}

// nonDeterministicFailure is an error type that indicates that a state machine
// transition failed due to an unexpected error. Failure to perform a state
// transition is a form of non-determinism, so it can't be permitted for any
// reason during the application phase of state machine replication. The only
// acceptable recourse is to signal that the replica has become corrupted.
//
// All errors returned by replicaDecoder and replicaStateMachine will be instances
// of this type.
type nonDeterministicFailure struct {
wrapped error
safeExpl string
}

// The provided format string should be safe for reporting.
func makeNonDeterministicFailure(format string, args ...interface{}) error {
err := errors.AssertionFailedWithDepthf(1, format, args...)
return &nonDeterministicFailure{
wrapped: err,
safeExpl: err.Error(),
}
}

// The provided msg should be safe for reporting.
func wrapWithNonDeterministicFailure(err error, format string, args ...interface{}) error {
return &nonDeterministicFailure{
wrapped: errors.Wrapf(err, format, args...),
safeExpl: fmt.Sprintf(format, args...),
}
}

// Error implements the error interface.
func (e *nonDeterministicFailure) Error() string {
return fmt.Sprintf("non-deterministic failure: %s", e.wrapped.Error())
}

// Cause implements the github.com/pkg/errors.causer interface.
func (e *nonDeterministicFailure) Cause() error { return e.wrapped }

// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is
// planned to be moved to the stdlib in go 1.13.
func (e *nonDeterministicFailure) Unwrap() error { return e.wrapped }

// replicaStateMachine implements the apply.StateMachine interface.
//
// The structure coordinates state transitions within the Replica state machine
Expand Down Expand Up @@ -259,12 +216,12 @@ func (b *replicaAppBatch) Stage(
) (apply.CheckedCommand, error) {
cmd := cmdI.(*replicatedCmd)
if cmd.Index() == 0 {
return nil, makeNonDeterministicFailure("processRaftCommand requires a non-zero index")
return nil, kvserverbase.NonDeterministicErrorf("processRaftCommand requires a non-zero index")
}
if idx, applied := cmd.Index(), b.state.RaftAppliedIndex; idx != applied+1 {
// If we have an out of order index, there's corruption. No sense in
// trying to update anything or running the command. Simply return.
return nil, makeNonDeterministicFailure("applied index jumped from %d to %d", applied, idx)
return nil, kvserverbase.NonDeterministicErrorf("applied index jumped from %d to %d", applied, idx)
}
if log.V(4) {
log.Infof(ctx, "processing command %x: raftIndex=%d maxLeaseIndex=%d closedts=%s",
Expand Down Expand Up @@ -302,9 +259,9 @@ func (b *replicaAppBatch) Stage(
// way, it would become less of a one-off.
if splitMergeUnlock, err := b.r.maybeAcquireSplitMergeLock(ctx, cmd.Cmd); err != nil {
if cmd.Cmd.ReplicatedEvalResult.Split != nil {
err = wrapWithNonDeterministicFailure(err, "unable to acquire split lock")
err = kvserverbase.NonDeterministicErrorWrapf(err, "unable to acquire split lock")
} else {
err = wrapWithNonDeterministicFailure(err, "unable to acquire merge lock")
err = kvserverbase.NonDeterministicErrorWrapf(err, "unable to acquire merge lock")
}
return nil, err
} else if splitMergeUnlock != nil {
Expand Down Expand Up @@ -388,7 +345,7 @@ func (b *replicaAppBatch) stageWriteBatch(ctx context.Context, cmd *replicatedCm
b.mutations += mutations
}
if err := b.batch.ApplyBatchRepr(wb.Data, false); err != nil {
return wrapWithNonDeterministicFailure(err, "unable to apply WriteBatch")
return kvserverbase.NonDeterministicErrorWrapf(err, "unable to apply WriteBatch")
}
return nil
}
Expand Down Expand Up @@ -503,7 +460,7 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
// An initialized replica is always contained in its descriptor.
rhsRepl, err := b.r.store.GetReplica(merge.RightDesc.RangeID)
if err != nil {
return wrapWithNonDeterministicFailure(err, "unable to get replica for merge")
return kvserverbase.NonDeterministicErrorWrapf(err, "unable to get replica for merge")
}
// We should already have acquired the raftMu for the rhsRepl and now hold
// its unlock method in cmd.splitMergeUnlock.
Expand All @@ -530,7 +487,7 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
if err := rhsRepl.preDestroyRaftMuLocked(
ctx, b.batch, b.batch, mergedTombstoneReplicaID, clearRangeIDLocalOnly, mustClearRange,
); err != nil {
return wrapWithNonDeterministicFailure(err, "unable to destroy replica before merge")
return kvserverbase.NonDeterministicErrorWrapf(err, "unable to destroy replica before merge")
}

// Shut down rangefeed processors on either side of the merge.
Expand Down Expand Up @@ -603,7 +560,7 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
if apply, err = handleTruncatedStateBelowRaftPreApply(
ctx, b.state.TruncatedState, res.State.TruncatedState, b.r.raftMu.stateLoader, b.batch,
); err != nil {
return wrapWithNonDeterministicFailure(err, "unable to handle truncated state")
return kvserverbase.NonDeterministicErrorWrapf(err, "unable to handle truncated state")
}
} else {
b.r.store.raftTruncator.addPendingTruncation(
Expand Down Expand Up @@ -673,7 +630,7 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
false, /* clearRangeIDLocalOnly */
false, /* mustUseClearRange */
); err != nil {
return wrapWithNonDeterministicFailure(err, "unable to destroy replica before removal")
return kvserverbase.NonDeterministicErrorWrapf(err, "unable to destroy replica before removal")
}
}

Expand Down Expand Up @@ -757,7 +714,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
// See handleChangeReplicasResult().
sync := b.changeRemovesReplica
if err := b.batch.Commit(sync); err != nil {
return wrapWithNonDeterministicFailure(err, "unable to commit Raft entry batch")
return kvserverbase.NonDeterministicErrorWrapf(err, "unable to commit Raft entry batch")
}
b.batch.Close()
b.batch = nil
Expand Down Expand Up @@ -883,7 +840,7 @@ func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd)
} else {
req.SafeString("request unknown; not leaseholder")
}
return wrapWithNonDeterministicFailure(errors.AssertionFailedf(
return kvserverbase.NonDeterministicErrorWrapf(errors.AssertionFailedf(
"command writing below closed timestamp; cmd: %x, write ts: %s, "+
"batch state closed: %s, command closed: %s, request: %s, lease: %s.\n"+
"This assertion will fire again on restart; to ignore run with env var\n"+
Expand Down Expand Up @@ -1029,7 +986,7 @@ func (sm *replicaStateMachine) ApplySideEffects(

// On ConfChange entries, inform the raft.RawNode.
if err := sm.maybeApplyConfChange(ctx, cmd); err != nil {
return nil, wrapWithNonDeterministicFailure(err, "unable to apply conf change")
return nil, kvserverbase.NonDeterministicErrorWrapf(err, "unable to apply conf change")
}

// Mark the command as applied and return it as an apply.AppliedCommand.
Expand Down
18 changes: 6 additions & 12 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver

import (
"context"
"fmt"
"math/rand"
"sort"
"strings"
Expand Down Expand Up @@ -832,11 +833,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// If we didn't expect Raft to have a snapshot but it has one
// regardless, that is unexpected and indicates a programming
// error.
err := makeNonDeterministicFailure(
err := kvserverbase.NonDeterministicErrorf(
"have inSnap=nil, but raft has a snapshot %s",
raft.DescribeSnapshot(rd.Snapshot),
)
return stats, getNonDeterministicFailureExplanation(err), err
return stats, fmt.Sprint(kvserverbase.GetRedactedNonDeterministicFailureExplanation(err)), err
}

// If the ready struct includes entries that have been committed, these
Expand All @@ -863,11 +864,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize)
defer appTask.Close()
if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil {
return stats, getNonDeterministicFailureExplanation(err), err
return stats, fmt.Sprint(kvserverbase.GetRedactedNonDeterministicFailureExplanation(err)), err
}
if knobs := r.store.TestingKnobs(); knobs == nil || !knobs.DisableCanAckBeforeApplication {
if err := appTask.AckCommittedEntriesBeforeApplication(ctx, state.lastIndex); err != nil {
return stats, getNonDeterministicFailureExplanation(err), err
return stats, fmt.Sprint(kvserverbase.GetRedactedNonDeterministicFailureExplanation(err)), err
}
}

Expand Down Expand Up @@ -1009,7 +1010,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// will be processed by this Replica.
return stats, "", err
} else if err != nil {
return stats, getNonDeterministicFailureExplanation(err), err
return stats, fmt.Sprint(kvserverbase.GetRedactedNonDeterministicFailureExplanation(err)), err
}
if r.store.cfg.KVAdmissionController != nil &&
stats.apply.followerStoreWriteBytes.NumEntries > 0 {
Expand Down Expand Up @@ -2390,13 +2391,6 @@ func shouldCampaignAfterConfChange(
return false
}

func getNonDeterministicFailureExplanation(err error) string {
if nd := (*nonDeterministicFailure)(nil); errors.As(err, &nd) {
return nd.safeExpl
}
return "???"
}

// printRaftTail pretty-prints the tail of the log and returns it as a string,
// with the same format as `cockroach debug raft-log`. The entries are printed
// from newest to oldest. maxEntries and maxCharsPerEntry control the size of
Expand Down
4 changes: 2 additions & 2 deletions pkg/testutils/lint/passes/fmtsafe/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ var requireConstFmt = map[string]bool{
"(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Fatalf": true,
"(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Panicf": true,

"github.com/cockroachdb/cockroach/pkg/kv/kvserver.makeNonDeterministicFailure": true,
"github.com/cockroachdb/cockroach/pkg/kv/kvserver.wrapWithNonDeterministicFailure": true,
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase.NonDeterministicErrorf": true,
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase.NonDeterministicErrorWrapf": true,

"(go.etcd.io/etcd/raft/v3.Logger).Debugf": true,
"(go.etcd.io/etcd/raft/v3.Logger).Infof": true,
Expand Down

0 comments on commit a1d7605

Please sign in to comment.