Skip to content

Commit

Permalink
storage: tolerate missing transaction records when pushing
Browse files Browse the repository at this point in the history
Informs cockroachdb#25437.
Informs cockroachdb#32971.

This is the second part of addressing cockroachdb#32971. The final part will be updating
the txn client to stop sending BeginTxn requests.

The change closely resembles what is laid out in the corresponding RFC sections
(`PushTxn` and `QueryTxn / txnwait.Queue`). Both `PushTxn` requests and `QueryTxn`
requests are adjusted to tolerate missing transaction records and to synthesize
them based on the information pulled from intents if necessary. By hiding these
details behind the request API abstraction, we don't actually need to modify
the `txnwait.Queue` at all!

The change does diverge from the RFC in two distinct ways. The first is that it
introduces a new invariant about transaction record creation. Transaction can
now only ever be created by requests originating from their own coordinator
(`BeginTxn`, `HeartbeatTxn`, and `EndTxn`). They can never be created in any
state (not even ABORTED) by concurrent actors. This is actually a much stronger
invariant than what previously existed (and even what was proposed in the RFC),
and it simplifies a lot of logic by dramatically refining the transaction state
machine. We now know that for a transaction record to exist, it must have been
created by its own coordinator and it must have checked with `CanCreateTxnRecord`.

Making this new invariant work required the second major divergence from the
RFC. The RFC suggested that we use the read timestamp cache for successful
transaction timestamp pushes before a transaction record was written. This PR
takes this a step further and uses the write timestamp cache for successful
transaction aborts before a transaction record was written. In doing this, we
emerge with a very sane timestamp cache policy - the read timestamp cache is
used to push transaction timestamps and the write timestamp cache is used to
abort them entirely (which it was already essentially being used for). The txnID
marker on these timestamp cache entries then becomes the transaction that
initiated the push/abort. Transactions then consult both of these sources before
creating their transaction record in `CanCreateTxnRecord`. In doing this, we
avoid an entire class of situations where concurrent transactions created and
abandoned transaction records for another transaction, requiring eventual GC.

Release note: None
  • Loading branch information
nvanbenschoten committed Jan 8, 2019
1 parent 600bdd5 commit 6a9ef2e
Show file tree
Hide file tree
Showing 19 changed files with 786 additions and 520 deletions.
2 changes: 1 addition & 1 deletion docs/RFCS/20181209_lazy_txn_record_creation.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
- Feature Name: Lazy Transaction Record Creation (a.k.a Deprecate BeginTransaction)
- Status: draft
- Status: in-progress
- Start Date: 2018-12-09
- Authors: Nathan VanBenschoten
- RFC PR: #32971
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ func (*AdminChangeReplicasRequest) flags() int { return isAdmin | isAlone }
func (*AdminRelocateRangeRequest) flags() int { return isAdmin | isAlone }
func (*HeartbeatTxnRequest) flags() int { return isWrite | isTxn }
func (*GCRequest) flags() int { return isWrite | isRange }
func (*PushTxnRequest) flags() int { return isWrite | isAlone }
func (*PushTxnRequest) flags() int { return isWrite | isAlone | updatesReadTSCache }
func (*QueryTxnRequest) flags() int { return isRead | isAlone }

// QueryIntent only updates the read timestamp cache when attempting
Expand Down
240 changes: 120 additions & 120 deletions pkg/roachpb/api.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,8 @@ message PushTxnRequest {
// this to PUSH_TOUCH to determine whether the pushee can be aborted
// due to inactivity (based on the now field).
PushTxnType push_type = 6;
// Forces the push by overriding the normal checks in PushTxn to
// either abort or push the timestamp.
// Forces the push by overriding the normal expiration and priority checks
// in PushTxn to either abort or push the timestamp.
bool force = 7;

reserved 8;
Expand Down
108 changes: 55 additions & 53 deletions pkg/roachpb/errors.pb.go

Large diffs are not rendered by default.

28 changes: 15 additions & 13 deletions pkg/roachpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,13 @@ enum TransactionAbortedReason {
// For backwards compatibility.
ABORT_REASON_UNKNOWN = 0;

// A BeginTransaction or EndTransaction(commit=true) request found an aborted
// transaction record. Another txn must have written this record - that other
// txn probably ran into one of our intents written before BeginTransaction
// (on a different range) and pushed it successfully. Or, a high-priority
// transaction simply pushed us, or we failed to heartbeat for a while and
// another txn (of any priority) considered us abandoned and pushed us.
// A BeginTransaction, HeartbeatTxn, or EndTransaction(commit=true) request
// found an aborted transaction record. Another txn must have written this
// record - that other txn probably ran into one of our intents written (on a
// different range) before our transaction record was written and pushed it
// successfully. Or, a high-priority transaction simply pushed us, or we
// failed to heartbeat for a while and another txn (of any priority)
// considered us abandoned and pushed us.
ABORT_REASON_ABORTED_RECORD_FOUND = 1;

// The request attempting to create a transaction record has a timestamp below
Expand All @@ -152,13 +153,14 @@ enum TransactionAbortedReason {
// meantime because the transaction was aborted.
ABORT_REASON_ABORT_SPAN = 5;

// An EndTransaction encountered a timestamp cache entry for the txn key, and
// the entry identifies this transaction. This means that the transaction
// definitely committed or rolled back before.
// So, this EndTransaction is either a delayed replay of some sort, or it
// raced with an async abort and lost. If a client gets this
// TransactionAbortedError (without it being wrapped in an ambiguous error),
// it must be the latter case, and the transaction can be retried.
// A requests attempting to create a transaction record encountered a
// timestamp cache entry for the txn key, and the entry identifies this
// transaction. This means that the transaction definitely committed or rolled
// back before.
// So, this request is either a delayed replay of some sort, or it raced with
// an async abort and lost. If a client gets this TransactionAbortedError
// (without it being wrapped in an ambiguous error), it must be the latter
// case, and the transaction can be retried.
ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY = 6;

// Like the above, except the timestamp cache doesn't have a txn id in it.
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batcheval/cmd_begin_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func BeginTransaction(
}

// Verify that it is safe to create the transaction record.
if ok, reason := cArgs.EvalCtx.CanCreateTxnRecord(reply.Txn); !ok {
return result.Result{}, roachpb.NewTransactionAbortedError(reason)
if err := CanCreateTxnRecord(cArgs.EvalCtx, reply.Txn); err != nil {
return result.Result{}, err
}

// Write the txn record.
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ func evalEndTransaction(
// to perform this verification for commits. Rollbacks can always write
// an aborted txn record.
if args.Commit {
if ok, reason := cArgs.EvalCtx.CanCreateTxnRecord(reply.Txn); !ok {
return result.Result{}, roachpb.NewTransactionAbortedError(reason)
if err := CanCreateTxnRecord(cArgs.EvalCtx, reply.Txn); err != nil {
return result.Result{}, err
}
}
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batcheval/cmd_heartbeat_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func HeartbeatTxn(
}

// Verify that it is safe to create the transaction record.
if ok, reason := cArgs.EvalCtx.CanCreateTxnRecord(&txn); !ok {
return result.Result{}, roachpb.NewTransactionAbortedError(reason)
if err := CanCreateTxnRecord(cArgs.EvalCtx, &txn); err != nil {
return result.Result{}, err
}
}

Expand Down
206 changes: 125 additions & 81 deletions pkg/storage/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,58 +39,69 @@ func declareKeysPushTransaction(
pr := req.(*roachpb.PushTxnRequest)
spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.TransactionKey(pr.PusheeTxn.Key, pr.PusheeTxn.ID)})
spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.AbortSpanKey(header.RangeID, pr.PusheeTxn.ID)})
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeTxnSpanGCThresholdKey(header.RangeID)})
}

// PushTxn resolves conflicts between concurrent txns (or
// between a non-transactional reader or writer and a txn) in several
// ways depending on the statuses and priorities of the conflicting
// transactions. The PushTxn operation is invoked by a
// "pusher" (the writer trying to abort a conflicting txn or the
// reader trying to push a conflicting txn's commit timestamp
// PushTxn resolves conflicts between concurrent txns (or between
// a non-transactional reader or writer and a txn) in several ways,
// depending on the statuses and priorities of the conflicting
// transactions. The PushTxn operation is invoked by a "pusher"
// (args.PusherTxn -- the writer trying to abort a conflicting txn
// or the reader trying to push a conflicting txn's commit timestamp
// forward), who attempts to resolve a conflict with a "pushee"
// (args.PushTxn -- the pushee txn whose intent(s) caused the
// (args.PusheeTxn -- the pushee txn whose intent(s) caused the
// conflict). A pusher is either transactional, in which case
// PushTxn is completely initialized, or not, in which case the
// PushTxn has only the priority set.
// PusherTxn is completely initialized, or not, in which case the
// PusherTxn has only the priority set.
//
// Txn already committed/aborted: If pushee txn is committed or
// aborted return success.
// The request arrives and immediately tries to determine the current
// disposition of the pushee transaction by reading its transaction
// record. If it finds one, it continues with the push. If not, it
// uses knowledge from the existence of the conflicting intent to
// determine the current state of the pushee. It's possible that the
// transaction record is missing either because it hasn't been written
// yet or because it has already been GCed after being finalized. Once
// the request determines which case its in, it decides whether to
// continue with the push. There are a number of different outcomes
// that a push can result in, based on the state that the pushee's
// transaction record is found in:
//
// Txn Timeout: If pushee txn entry isn't present or its LastHeartbeat
// timestamp isn't set, use its as LastHeartbeat. If current time -
// LastHeartbeat > 2 * DefaultHeartbeatInterval, then the pushee txn
// should be either pushed forward, aborted, or confirmed not pending,
// depending on value of Request.PushType.
// Txn already committed/aborted: If the pushee txn is committed or
// aborted return success.
//
// Old Txn Epoch: If persisted pushee txn entry has a newer Epoch than
// PushTxn.Epoch, return success, as older epoch may be removed.
// Txn record expired: If the pushee txn is pending, its last
// heartbeat timestamp is observed to determine the latest client
// activity. This heartbeat is forwarded by the conflicting intent's
// timestamp because that timestamp also indicates definitive client
// activity (NB: the intent timestamp used is not updated on intent
// pushes). This time of "last activity" is compared against the
// current time to determine whether the transaction has expired.
// If so, it is aborted.
//
// Lower Txn Priority: If pushee txn has a lower priority than pusher,
// adjust pushee's persisted txn depending on value of
// args.PushType. If args.PushType is PUSH_ABORT, set txn.Status to
// ABORTED, and priority to one less than the pusher's priority and
// return success. If args.PushType is PUSH_TIMESTAMP, set
// txn.Timestamp to just after PushTo.
// Txn record not expired: If the pushee txn is not expired, its
// priority is compared against the pusher's (see CanPushWithPriority).
//
// Higher Txn Priority: If pushee txn has a higher priority than
// pusher, return TransactionPushError. Transaction will be retried
// with priority one less than the pushee's higher priority.
// Push cannot proceed: a TransactionPushError is returned.
//
// If the pusher is non-transactional, args.PusherTxn is an empty
// proto with only the priority set.
// Push can proceed: the pushee's transaction record is modified and
// rewritten, based on the value of args.PushType. If args.PushType
// is PUSH_ABORT, txn.Status is set to ABORTED. If args.PushType is
// PUSH_TIMESTAMP, txn.Timestamp is set to just after args.PushTo.
//
// TODO(nvanbenschoten): I don't understand this below...
// If the pushee is aborted, its timestamp will be forwarded to match its last
// client activity timestamp (i.e. last heartbeat), if available. This is done
// so that the updated timestamp populates the AbortSpan, allowing the GC
// queue to purge entries for which the transaction coordinator must have found
// queue to purge records for which the transaction coordinator must have found
// out via its heartbeats that the transaction has failed.
func PushTxn(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*roachpb.PushTxnRequest)
h := cArgs.Header
reply := resp.(*roachpb.PushTxnResponse)

if cArgs.Header.Txn != nil {
if h.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
}
if args.Now == (hlc.Timestamp{}) {
Expand All @@ -107,47 +118,55 @@ func PushTxn(
ok, err := engine.MVCCGetProto(ctx, batch, key, hlc.Timestamp{}, existTxn, engine.MVCCGetOptions{})
if err != nil {
return result.Result{}, err
} else if !ok {
// There are three cases in which there is no transaction record:
//
// * the pushee is still active but its transaction record has not
// been written yet. This is fairly common because transactions
// do not eagerly write their transaction record before writing
// intents, which another reader or writer might stumble upon and
// be forced to push.
// * the pushee resolved its intents synchronously on successful commit;
// in this case, the transaction record of the pushee is also removed.
// Note that in this case, the intent which prompted this PushTxn
// doesn't exist any more.
// * the pushee timed out or was aborted and the intent not cleaned up,
// but the transaction record was garbage collected.
//
// To determine which case we're in, we check whether the transaction could
// ever write a transaction record. We do this by using the metadata from
// the intent and attempting to synthesize a transaction record (TODO?). If
// a transaction record for the transaction could be written in the future
// then we must be in the first case. If one could not be written then we
// know we're in either the second or the third case.
//
// Performing this detection could have false positives where we determine
// that a record could still be written and conclude that we're in the
// first case. However, it cannot have false negatives where we determine
// that a record can not be written and conclude that we're in the second
// or third case. This is important, because it means that we may end up
// failing to push a finalized transaction but will never determine that
// a transaction is finalized when it still could end up committing.
reply.PusheeTxn = SynthesizeTxnFromMeta(cArgs.EvalCtx, args.PusheeTxn)
if reply.PusheeTxn.Status == roachpb.ABORTED {
// If the transaction is uncommittable, we don't even need to
// persist an ABORTED transaction record, we can just consider it
// aborted. This is good because it allows us to obey the invariant
// that only the transaction's own coordinator can create its
// transaction record.
result := result.Result{}
result.Local.UpdatedTxns = &[]*roachpb.Transaction{&reply.PusheeTxn}
return result, nil
}
} else {
// Start with the persisted transaction record.
reply.PusheeTxn = existTxn.Clone()

// Forward the last heartbeat time of the transaction record by
// the timestamp of the intent. This is another indication of
// client activity.
reply.PusheeTxn.LastHeartbeat.Forward(args.PusheeTxn.Timestamp)
}
// There are three cases in which there is no transaction entry:
//
// * the pushee is still active but the BeginTransaction was delayed
// for long enough that a write intent from this txn to another
// range is causing another reader or writer to push.
// * the pushee resolved its intents synchronously on successful commit;
// in this case, the transaction record of the pushee is also removed.
// Note that in this case, the intent which prompted this PushTxn
// doesn't exist any more.
// * the pushee timed out or was aborted and the intent not cleaned up,
// but the transaction record was garbage collected.
//
// We currently make no attempt at guessing which one it is, though we
// could (see #1939). Instead, a new aborted entry is always written.
//
// TODO(tschottdorf): we should actually improve this when we
// garbage-collect aborted transactions, or we run the risk of a push
// recreating a GC'ed transaction as PENDING, which is an error if it
// has open intents (which is likely if someone pushes it).
if !ok {
// The transaction doesn't exist on disk; we're allowed to abort it.
// TODO(tschottdorf): especially for SNAPSHOT transactions, there's
// something to win here by not aborting, but instead pushing the
// timestamp. For SERIALIZABLE it's less important, but still better
// to have them restart than abort. See #3344.
// TODO(tschottdorf): double-check for problems emanating from
// using a trivial Transaction proto here. Maybe some fields ought
// to receive dummy values.
reply.PusheeTxn.Status = roachpb.ABORTED
reply.PusheeTxn.TxnMeta = args.PusheeTxn
reply.PusheeTxn.Timestamp = args.Now // see method comment
// Setting OrigTimestamp bumps LastActive(); see #9265.
reply.PusheeTxn.OrigTimestamp = args.Now
result := result.Result{}
result.Local.UpdatedTxns = &[]*roachpb.Transaction{&reply.PusheeTxn}
txnRecord := reply.PusheeTxn.AsRecord()
return result, engine.MVCCPutProto(ctx, batch, cArgs.Stats, key, hlc.Timestamp{}, nil, &txnRecord)
}
// Start with the persisted transaction record as final transaction.
reply.PusheeTxn = existTxn.Clone()

// If already committed or aborted, return success.
if reply.PusheeTxn.Status != roachpb.PENDING {
Expand Down Expand Up @@ -212,23 +231,48 @@ func PushTxn(
// Upgrade priority of pushed transaction to one less than pusher's.
reply.PusheeTxn.UpgradePriority(args.PusherTxn.Priority - 1)

// If aborting transaction, set new status and return success.
if args.PushType == roachpb.PUSH_ABORT {
// Determine what to do with the pushee, based on the push type.
switch args.PushType {
case roachpb.PUSH_ABORT:
// If aborting the transaction, set the new status.
reply.PusheeTxn.Status = roachpb.ABORTED
// Forward the timestamp to accommodate AbortSpan GC. See method
// comment for details.
reply.PusheeTxn.Timestamp.Forward(reply.PusheeTxn.LastActive())
} else if args.PushType == roachpb.PUSH_TIMESTAMP {
// Otherwise, update timestamp to be one greater than the request's timestamp.
reply.PusheeTxn.Timestamp = args.PushTo
reply.PusheeTxn.Timestamp.Logical++
case roachpb.PUSH_TIMESTAMP:
// Otherwise, update timestamp to be one greater than the request's
// timestamp. This new timestamp will be use to update the read
// timestamp cache. If the transaction record was not already present
// then we rely on the read timestamp cache to prevent the record from
// ever being written with a timestamp beneath this timestamp.
//
// TODO(nvanbenschoten): Remove this comment - the migration path for
// this is subtle. We rely on the read timestamp cache for timestamp
// pushes of missing transaction records. However, in mixed version
// clusters we can't be sure that a future leaseholder will consult the
// read timestamp cache. Luckily, we can be sure that it will consult
// its write timestamp cache, whose low water mark will necessarily be
// equal to or greater than this timestamp. If it finds a transaction
// with an original timestamp below this time, it will abort it.
reply.PusheeTxn.Timestamp = args.PushTo.Add(0, 1)
default:
return result.Result{}, errors.Errorf("unexpected push type: %v", args.PushType)
}

// Persist the pushed transaction using zero timestamp for inline value.
txnRecord := reply.PusheeTxn.AsRecord()
if err := engine.MVCCPutProto(ctx, batch, cArgs.Stats, key, hlc.Timestamp{}, nil, &txnRecord); err != nil {
return result.Result{}, err
// If the transaction record was already present, persist the updates to it.
// If not, then we don't want to create it. This could allow for finalized
// transaction to be revived. Instead, we obey the invariant that only the
// transaction's own coordinator can issue requests that create its
// transaction record. To ensure that a timestamp push or an abort is
// respected for transactions without transaction records, we rely on the
// read and write timestamp cache, respectively.
if ok {
txnRecord := reply.PusheeTxn.AsRecord()
if err := engine.MVCCPutProto(ctx, batch, cArgs.Stats, key, hlc.Timestamp{}, nil, &txnRecord); err != nil {
return result.Result{}, err
}
}

result := result.Result{}
result.Local.UpdatedTxns = &[]*roachpb.Transaction{&reply.PusheeTxn}
return result, nil
Expand Down
13 changes: 9 additions & 4 deletions pkg/storage/batcheval/cmd_query_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func declareKeysQueryTransaction(
) {
qr := req.(*roachpb.QueryTxnRequest)
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.TransactionKey(qr.Txn.Key, qr.Txn.ID)})
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeTxnSpanGCThresholdKey(header.RangeID)})
}

// QueryTxn fetches the current state of a transaction.
Expand All @@ -59,11 +60,15 @@ func QueryTxn(
}
key := keys.TransactionKey(args.Txn.Key, args.Txn.ID)

// Fetch transaction record; if missing, return empty txn.
ok, err := engine.MVCCGetProto(ctx, batch, key, hlc.Timestamp{}, &reply.QueriedTxn,
engine.MVCCGetOptions{})
if err != nil || !ok {
// Fetch transaction record; if missing, attempt to synthesize one.
if ok, err := engine.MVCCGetProto(
ctx, batch, key, hlc.Timestamp{}, &reply.QueriedTxn, engine.MVCCGetOptions{},
); err != nil {
return result.Result{}, err
} else if !ok {
// The transaction hasn't written a transaction record yet.
// Attempt to synthesize it from the provided TxnMeta.
reply.QueriedTxn = SynthesizeTxnFromMeta(cArgs.EvalCtx, args.Txn)
}
// Get the list of txns waiting on this txn.
reply.WaitingTxns = cArgs.EvalCtx.GetTxnWaitQueue().GetDependents(args.Txn.ID)
Expand Down
Loading

0 comments on commit 6a9ef2e

Please sign in to comment.