Skip to content

Commit

Permalink
Merge #106750
Browse files Browse the repository at this point in the history
106750: kvserver: fully adopt useReproposalsV2 r=erikgrinaker a=tbg

This hard-codes useReproposalsV2 to true (prior to this PR, it's metamorphic with a default of true) and applies the resulting simplifications across the codebase.

These include simplified control flow and an overall simplified architecture.

Closes #105625.

Epic: CRDB-25287
Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Jul 19, 2023
2 parents 06b5ba7 + d795e6d commit b2ba2e5
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 580 deletions.
88 changes: 8 additions & 80 deletions pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (d *replicaDecoder) DecodeAndBind(ctx context.Context, ents []raftpb.Entry)
if err := d.decode(ctx, ents); err != nil {
return false, err
}
anyLocal := d.retrieveLocalProposals(ctx)
anyLocal := d.retrieveLocalProposals()
d.createTracingSpans(ctx)
return anyLocal, nil
}
Expand All @@ -75,15 +75,13 @@ func (d *replicaDecoder) decode(ctx context.Context, ents []raftpb.Entry) error
return nil
}

// retrieveLocalProposalsV2 is used with useReproposalsV2, replacing a call
// to retrieveLocalProposals. The V2 implementation is simpler because a log
// entry that comes up for a local proposal can always consume that proposal
// from the map because V2 never mutates the MaxLeaseIndex for the same proposal.
// In contrast, with V1, we can only remove the proposal from the map once we
// have found a log entry that had a matching MaxLeaseIndex. This lead to the
// complexity of having multiple entries associated to the same proposal during
// application.
func (d *replicaDecoder) retrieveLocalProposalsV2() (anyLocal bool) {
// retrieveLocalProposals removes all proposals which have a log entry pending
// immediate application from the proposals map. The entries which are paired up
// with a proposal in that way are considered "local", meaning a client is
// waiting on their result, and may be reproposed (as a new proposal) with a new
// lease index in case they apply with an illegal lease index (see
// tryReproposeWithNewLeaseIndex).
func (d *replicaDecoder) retrieveLocalProposals() (anyLocal bool) {
d.r.mu.Lock()
defer d.r.mu.Unlock()

Expand Down Expand Up @@ -137,76 +135,6 @@ func (d *replicaDecoder) retrieveLocalProposalsV2() (anyLocal bool) {
return anyLocal
}

// retrieveLocalProposals binds each of the decoder's commands to their local
// proposals if they were proposed locally. The method also sets the ctx fields
// on all commands.
func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal bool) {
if useReproposalsV2 {
// NB: we *must* use this new code for correctness, since we have an invariant
// described within.
return d.retrieveLocalProposalsV2()
}
d.r.mu.Lock()
defer d.r.mu.Unlock()
// Assign all the local proposals first then delete all of them from the map
// in a second pass. This ensures that we retrieve all proposals correctly
// even if the applier has multiple entries for the same proposal, in which
// case the proposal was reproposed (either under its original or a new
// MaxLeaseIndex) which we handle in a second pass below.
var it replicatedCmdBufSlice
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
cmd.proposal = d.r.mu.proposals[cmd.ID]
anyLocal = anyLocal || cmd.IsLocal()
}
if !anyLocal && d.r.mu.proposalQuota == nil {
// Fast-path.
return false
}
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
var toRelease *quotapool.IntAlloc
shouldRemove := cmd.IsLocal() &&
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't remove the
// proposal from the map. We expect this entry to be rejected by
// checkForcedErr.
//
// Note that lease proposals always use a MaxLeaseIndex of zero (since
// they have their own replay protection), so they always meet this
// criterion. While such proposals can be reproposed, only the first
// instance that gets applied matters and so removing the command is
// always what we want to happen.
cmd.Cmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex
if shouldRemove {
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
// lease index, meaning that they will all be rejected after this entry
// applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex
// picks up the proposal on failure, it will re-add the proposal to the
// proposal map, but this won't affect this replicaApplier.
//
// While here, add the proposal's quota size to the quota release queue.
// We check the proposal map again first to avoid double free-ing quota
// when reproposals from the same proposal end up in the same entry
// application batch.
delete(d.r.mu.proposals, cmd.ID)
toRelease = cmd.proposal.quotaAlloc
cmd.proposal.quotaAlloc = nil
}
// At this point we're not guaranteed to have proposalQuota initialized,
// the same is true for quotaReleaseQueues. Only queue the proposal's
// quota for release if the proposalQuota is initialized.
if d.r.mu.proposalQuota != nil {
d.r.mu.quotaReleaseQueue = append(d.r.mu.quotaReleaseQueue, toRelease)
}
}
return anyLocal
}

// createTracingSpans creates and assigns a new tracing span for each decoded
// command. If a command was proposed locally, it will be given a tracing span
// that follows from its proposal's span.
Expand Down
144 changes: 41 additions & 103 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"go.etcd.io/raft/v3"
)

// useReproposalsV2 activates prototype code that instead of reproposing using a
// modified lease index makes a new proposal (different CmdID), transfers the
// waiting caller (if any) to it, and proposes that. With this strategy, the
// *RaftCommand associated to a proposal becomes immutable, which simplifies the
// mental model and allows various simplifications in the proposal pipeline. For
// now, the old and new behavior coexist, and we want to keep exercising both.
// Once we have confidence, we'll hard-code true and remove all old code paths.
var useReproposalsV2 = util.ConstantWithMetamorphicTestBool("reproposals-v2", true)

// replica_application_*.go files provide concrete implementations of
// the interfaces defined in the storage/apply package:
//
Expand Down Expand Up @@ -112,20 +102,10 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
if pErr != nil {
// A forced error was set (i.e. we did not apply the proposal,
// for instance due to its log position).
cmd.response.Err = pErr
switch cmd.Rejection {
case kvserverbase.ProposalRejectionPermanent:
cmd.response.Err = pErr
case kvserverbase.ProposalRejectionIllegalLeaseIndex:
if useReproposalsV2 {
// If we're using V2 reproposals, this proposal is actually going to
// be fully rejected, but the client won't be listening to it at that
// point any more. But we should set the error. (This ends up being
// inconsequential but it's the right thing to do).
//
// TODO(tbg): once useReproposalsV2 is baked in, set the error unconditionally
// above the `switch`.
cmd.response.Err = pErr
}
// Reset the error as it's now going to be determined by the outcome of
// reproposing (or not); note that tryReproposeWithNewLeaseIndex will
// return `nil` if the entry is not eligible for reproposals.
Expand Down Expand Up @@ -192,17 +172,13 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
}
}
if pErr == nil { // since we might have injected an error
if useReproposalsV2 {
pErr = kvpb.NewError(r.tryReproposeWithNewLeaseIndexV2(ctx, cmd))
if pErr == nil {
// Avoid falling through below. We managed to repropose, but this
// proposal is still erroring out. We don't want to assign to
// localResult. If there is an error though, we do fall through into
// the existing tangle of correct but unreadable handling below.
return
}
} else {
pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd)
pErr = kvpb.NewError(r.tryReproposeWithNewLeaseIndex(ctx, cmd))
if pErr == nil {
// Avoid falling through below. We managed to repropose, but this
// proposal is still erroring out. We don't want to assign to
// localResult. If there is an error though, we do fall through into
// the existing tangle of correct but unreadable handling below.
return
}
}

Expand Down Expand Up @@ -241,19 +217,14 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
// For proposed simplifications, see:
// https://github.com/cockroachdb/cockroach/issues/97633
log.Infof(ctx, "failed to repropose %s at idx %d with new lease index: %s", cmd.ID, cmd.Index(), pErr)
// TODO(repl): we're replacing an error (illegal LAI) here with another error.
// A pattern where the error is assigned exactly once would be simpler to
// reason about. In particular, we want to make sure we never replace an
// ambiguous error with an unambiguous one (at least if the resulting
// error will reach the proposer, such as can be the case here, though
// the error we're replacing is also definite so it's ok).
cmd.response.Err = pErr
// Fall through.
} else if !useReproposalsV2 {
// Unbind the entry's local proposal because we just succeeded
// in reproposing it and we don't want to acknowledge the client
// yet.
//
// NB: in v2, reproposing already moved the waiting caller over to a new
// proposal, and by design we don't change the "Localness" of the old
// proposal mid-application but instead let it fail as a local proposal
// (which signals into an throwaway channel).
cmd.proposal = nil
return
}
default:
panic("unexpected")
Expand All @@ -271,7 +242,22 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
// TODO(tbg): it doesn't make sense to assign to `cmd.response` unconditionally.
// We're returning an error; the response should be nil. The error tracking in
// this method should be cleaned up.
// TODO(tbg): we should have an invariant about `cmd.response`: it is
// initially nil (in particular, a command that evaluates with an error - say
// a TransactionRetryError - must not enter the replication pipeline; we could
// relax this if we ever want erroring commands to be able to mutate the state
// machine but safe to say we don't have this now) and is only written once
// (in this method).
// Also, If a caller gets signaled early (ambiguous result, etc) this does not
// affect `response.Err`.
cmd.response.EncounteredIntents = cmd.proposal.Local.DetachEncounteredIntents()
// TODO(tbg): this seems wrong. the "Always" (pErr != nil) flavor of intents is
// for transaction aborts that still "know" about the ultimate fate of an intent.
// But since we're never reaching this code for a proposal that evaluated to an
// error, we can only reach it for illegal lease errors and the like, and even
// though it might be "correct" to surface the "always" intents in the response
// in that case, it would seem prudent not to take advantage of that. In other
// words, the line below this comment should be conditional on `pErr == nil`.
cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil)
if pErr == nil {
cmd.localResult = cmd.proposal.Local
Expand All @@ -280,9 +266,7 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
}
}

func (r *Replica) tryReproposeWithNewLeaseIndexV2(
ctx context.Context, origCmd *replicatedCmd,
) error {
func (r *Replica) tryReproposeWithNewLeaseIndex(ctx context.Context, origCmd *replicatedCmd) error {
// NB: `origCmd` remains "Local". It's just not going to signal anyone
// or release any latches.

Expand All @@ -301,7 +285,7 @@ func (r *Replica) tryReproposeWithNewLeaseIndexV2(
// non-local now invites logic bugs) but not bound to the caller.

// NB: quotaAlloc is always nil here, because we already
// released the quota unconditionally in retrieveLocalProposalsV2.
// released the quota unconditionally in retrieveLocalProposals.
// So the below is a no-op.
//
// TODO(tbg): if we shifted the release of proposal quota to *after*
Expand Down Expand Up @@ -395,62 +379,15 @@ func (r *Replica) tryReproposeWithNewLeaseIndexV2(
}
}()

if err := r.tryReproposeWithNewLeaseIndexShared(ctx, newProposal).GoError(); err != nil {
return err
}

success = true
return nil
}

// tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose
// commands that have gotten an illegal lease index error, and that we know
// could not have applied while their lease index was valid (that is, we
// observed all applied entries between proposal and the lease index becoming
// invalid, as opposed to skipping some of them by applying a snapshot).
//
// It is not intended for use elsewhere and is only a top-level function so that
// it can avoid the below_raft_protos check. Returns a nil error if the command
// has already been successfully applied or has been reproposed here or by a
// different entry for the same proposal that hit an illegal lease index error.
func (r *Replica) tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd,
) *kvpb.Error {
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
p := cmd.proposal
if p.applied || cmd.Cmd.MaxLeaseIndex != p.command.MaxLeaseIndex {
// If the command associated with this rejected raft entry already
// applied then we don't want to repropose it. Doing so could lead
// to duplicate application of the same proposal.
//
// Similarly, if the command associated with this rejected raft
// entry has a different (larger) MaxLeaseIndex than the one we
// decoded from the entry itself, the command must have already
// been reproposed (this can happen if there are multiple copies
// of the command in the logs; see TestReplicaRefreshMultiple).
// We must not create multiple copies with multiple lease indexes,
// so don't repropose it again. This ensures that at any time,
// there is only up to a single lease index that has a chance of
// succeeding in the Raft log for a given command.
return nil
}
return r.tryReproposeWithNewLeaseIndexShared(ctx, cmd.proposal)
}

func (r *Replica) tryReproposeWithNewLeaseIndexShared(
ctx context.Context, p *ProposalData,
) *kvpb.Error {
// We need to track the request again in order to protect its timestamp until
// it gets reproposed.
// TODO(andrei): Only track if the request consults the ts cache. Some
// requests (e.g. EndTxn) don't care about closed timestamps.
minTS, tok := r.mu.proposalBuf.TrackEvaluatingRequest(ctx, p.Request.WriteTimestamp())
minTS, tok := r.mu.proposalBuf.TrackEvaluatingRequest(ctx, newProposal.Request.WriteTimestamp())
defer tok.DoneIfNotMoved(ctx)

// NB: p.Request.Timestamp reflects the action of ba.SetActiveTimestamp.
if p.Request.AppliesTimestampCache() && p.Request.WriteTimestamp().LessEq(minTS) {
// NB: newProposal.Request.Timestamp reflects the action of ba.SetActiveTimestamp.
if newProposal.Request.AppliesTimestampCache() && newProposal.Request.WriteTimestamp().LessEq(minTS) {
// The tracker wants us to forward the request timestamp, but we can't
// do that without re-evaluating, so give up. The error returned here
// will go to back to DistSender, so send something it can digest.
Expand All @@ -460,20 +397,21 @@ func (r *Replica) tryReproposeWithNewLeaseIndexShared(
r.mu.state.Desc,
"reproposal failed due to closed timestamp",
)
return kvpb.NewError(err)
return err
}
// Some tests check for this log message in the trace.
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")

// See I7 from kvflowcontrol/doc.go: we don't re-deduct flow tokens on
// reproposals.
p.raftAdmissionMeta = nil
newProposal.raftAdmissionMeta = nil

pErr := r.propose(ctx, p, tok.Move(ctx))
if pErr != nil {
return pErr
if pErr := r.propose(ctx, newProposal, tok.Move(ctx)); pErr != nil {
return pErr.GoError()
}
log.VEventf(ctx, 2, "reproposed command %x", p.idKey)
log.VEventf(ctx, 2, "reproposed command %x", newProposal.idKey)

success = true
return nil
}

Expand Down
Loading

0 comments on commit b2ba2e5

Please sign in to comment.