Skip to content

Commit

Permalink
storage: simplify error handling in evaluateProposalInner
Browse files Browse the repository at this point in the history
Proposals that return errors but still want to go through Raft to
lay down intents were refactored away in cockroachdb#21140. This allows us to
simplify the error handling in `evaluateProposalInner`.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 15, 2019
1 parent 01ce10d commit 643e31e
Showing 1 changed file with 16 additions and 24 deletions.
40 changes: 16 additions & 24 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -5214,54 +5214,46 @@ func (r *Replica) evaluateProposalInner(
) result.Result {
// Evaluate the commands. If this returns without an error, the batch should
// be committed.
var result result.Result
var batch engine.Batch
{
// TODO(tschottdorf): absorb all returned values in `pd` below this point
// in the call stack as well.
var pErr *roachpb.Error
var ms enginepb.MVCCStats
var br *roachpb.BatchResponse
batch, ms, br, result, pErr = r.evaluateWriteBatch(ctx, idKey, ba, spans)
if r.store.cfg.Settings.Version.IsActive(cluster.VersionMVCCNetworkStats) {
result.Replicated.Delta = ms.ToNetworkStats()
} else {
result.Replicated.DeprecatedDelta = &ms
}
result.Local.Reply = br
result.Local.Err = pErr
}

if result.Local.Err != nil && ba.IsWrite() {
// TODO(tschottdorf): absorb all returned values in `res` below this point
// in the call stack as well.
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, spans)
if pErr != nil {
// TODO(tschottdorf): make `nil` acceptable. Corresponds to
// roachpb.Response{With->Or}Error.
result.Local.Reply = &roachpb.BatchResponse{}
res.Local.Reply = &roachpb.BatchResponse{}
res.Local.Err = pErr
// Reset the batch to clear out partial execution. Don't set
// a WriteBatch to signal to the caller that we fail-fast this
// proposal.
batch.Close()
batch = nil
// Restore the original txn's Writing bool if the error specifies a
// transaction.
if txn := result.Local.Err.GetTxn(); txn != nil {
if txn := res.Local.Err.GetTxn(); txn != nil {
if ba.Txn == nil {
log.Fatalf(ctx, "error had a txn but batch is non-transactional. Err txn: %s", txn)
}
if txn.ID == ba.Txn.ID {
txn.Writing = ba.Txn.Writing
}
}
return result
return res
}

result.WriteBatch = &storagebase.WriteBatch{
res.Local.Reply = br
if r.store.cfg.Settings.Version.IsActive(cluster.VersionMVCCNetworkStats) {
res.Replicated.Delta = ms.ToNetworkStats()
} else {
res.Replicated.DeprecatedDelta = &ms
}
res.WriteBatch = &storagebase.WriteBatch{
Data: batch.Repr(),
}
// Note: reusing the proposer's batch when applying the command on the
// proposer was explored as an optimization but resulted in no performance
// benefit.
batch.Close()
return result
return res
}

// checkIfTxnAborted checks the txn AbortSpan for the given
Expand Down

0 comments on commit 643e31e

Please sign in to comment.