diff --git a/pkg/internal/client/sender.go b/pkg/internal/client/sender.go index 1c9ba4cb2d7c..d3947ff96ba3 100644 --- a/pkg/internal/client/sender.go +++ b/pkg/internal/client/sender.go @@ -170,6 +170,13 @@ type TxnSender interface { // like the transaction that merges ranges together. DisablePipelining() error + // EagerRecord instructs the transaction write its transaction record as soon as + // possible, instead of waiting for the transaction's first heartbeat or for the + // end of the transaction to write it. + // + // TODO(nvanbenschoten): Fix up flaky tests to allow us to get rid of this. + EagerRecord() error + // OrigTimestamp returns the transaction's starting timestamp. // Note a transaction can be internally pushed forward in time before // committing so this is not guaranteed to be the commit timestamp. @@ -361,6 +368,9 @@ func (m *MockTransactionalSender) UpdateStateOnRemoteRetryableErr( // DisablePipelining is part of the client.TxnSender interface. func (m *MockTransactionalSender) DisablePipelining() error { return nil } +// EagerRecord is part of the client.TxnSender interface. +func (m *MockTransactionalSender) EagerRecord() error { return nil } + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) ( diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index bc0e10d778af..2adff023b97a 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -292,6 +292,18 @@ func (txn *Txn) DisablePipelining() error { return txn.mu.sender.DisablePipelining() } +// EagerRecord instructs the transaction write its transaction record as soon as +// possible, instead of waiting for the transaction's first heartbeat or for the +// end of the transaction to write it. +// +// EagerRecord must be called before any operations are performed on the +// transaction. +func (txn *Txn) EagerRecord() error { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.EagerRecord() +} + // NewBatch creates and returns a new empty batch object for use with the Txn. func (txn *Txn) NewBatch() *Batch { return &Batch{txn: txn} diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index cb5f8cc9e0b2..b1ad7d798ca9 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -544,6 +544,17 @@ func (tc *TxnCoordSender) DisablePipelining() error { return nil } +// EagerRecord is part of the client.TxnSender interface. +func (tc *TxnCoordSender) EagerRecord() error { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.active { + return errors.Errorf("cannot request an eager transaction record write on a running transaction") + } + tc.interceptorAlloc.txnHeartbeat.eagerRecord = true + return nil +} + // commitReadOnlyTxnLocked "commits" a read-only txn. It is equivalent, but // cheaper than, sending an EndTransactionRequest. A read-only txn doesn't have // a transaction record, so there's no need to send any request to the server. diff --git a/pkg/kv/txn_interceptor_heartbeat.go b/pkg/kv/txn_interceptor_heartbeat.go index 2b133cc3a1cb..d676dcc77e88 100644 --- a/pkg/kv/txn_interceptor_heartbeat.go +++ b/pkg/kv/txn_interceptor_heartbeat.go @@ -65,6 +65,10 @@ type txnHeartbeat struct { // is to notify the TxnCoordSender to shut itself down. asyncAbortCallbackLocked func(context.Context) + // When set to true, the transaction will always send a BeginTxn request to + // lay down a transaction record as early as possible. + eagerRecord bool + // mu contains state protected by the TxnCoordSender's mutex. mu struct { sync.Locker @@ -178,7 +182,7 @@ func (h *txnHeartbeat) SendLocked( ba.Txn.Key = anchor } - if !h.st.Version.IsActive(cluster.VersionLazyTxnRecord) { + if h.eagerRecord || !h.st.Version.IsActive(cluster.VersionLazyTxnRecord) { addedBeginTxn = true // Set the key in the begin transaction request to the txn's anchor key. diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 78d7e42f5157..d7b9c8619a5d 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -502,6 +502,14 @@ func (r *Replica) AdminMerge( // TODO(benesch): expose a proper API for preventing the fast path. _ = txn.CommitTimestamp() + // Multiple merge tests rely on a transaction record being written as + // early as possible in the lifecycle of a transaction. They aren't + // prepared for an untimely lease transfer to cause a restart. + // TODO(nvanbenschoten): Remove this once the tests are fixed. + if err := txn.EagerRecord(); err != nil { + return err + } + // Pipelining might send QueryIntent requests to the RHS after the RHS has // noticed the merge and started blocking all traffic. This causes the merge // transaction to deadlock. Just turn pipelining off; the structure of the