Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: provide option to request eager txn record creation #33674

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ type TxnSender interface {
// like the transaction that merges ranges together.
DisablePipelining() error

// EagerRecord instructs the transaction write its transaction record as soon as
// possible, instead of waiting for the transaction's first heartbeat or for the
// end of the transaction to write it.
//
// TODO(nvanbenschoten): Fix up flaky tests to allow us to get rid of this.
EagerRecord() error

// OrigTimestamp returns the transaction's starting timestamp.
// Note a transaction can be internally pushed forward in time before
// committing so this is not guaranteed to be the commit timestamp.
Expand Down Expand Up @@ -361,6 +368,9 @@ func (m *MockTransactionalSender) UpdateStateOnRemoteRetryableErr(
// DisablePipelining is part of the client.TxnSender interface.
func (m *MockTransactionalSender) DisablePipelining() error { return nil }

// EagerRecord is part of the client.TxnSender interface.
func (m *MockTransactionalSender) EagerRecord() error { return nil }

// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) (
Expand Down
12 changes: 12 additions & 0 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,18 @@ func (txn *Txn) DisablePipelining() error {
return txn.mu.sender.DisablePipelining()
}

// EagerRecord instructs the transaction write its transaction record as soon as
// possible, instead of waiting for the transaction's first heartbeat or for the
// end of the transaction to write it.
//
// EagerRecord must be called before any operations are performed on the
// transaction.
func (txn *Txn) EagerRecord() error {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.EagerRecord()
}

// NewBatch creates and returns a new empty batch object for use with the Txn.
func (txn *Txn) NewBatch() *Batch {
return &Batch{txn: txn}
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,17 @@ func (tc *TxnCoordSender) DisablePipelining() error {
return nil
}

// EagerRecord is part of the client.TxnSender interface.
func (tc *TxnCoordSender) EagerRecord() error {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.mu.active {
return errors.Errorf("cannot request an eager transaction record write on a running transaction")
}
tc.interceptorAlloc.txnHeartbeat.eagerRecord = true
return nil
}

// commitReadOnlyTxnLocked "commits" a read-only txn. It is equivalent, but
// cheaper than, sending an EndTransactionRequest. A read-only txn doesn't have
// a transaction record, so there's no need to send any request to the server.
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/txn_interceptor_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type txnHeartbeat struct {
// is to notify the TxnCoordSender to shut itself down.
asyncAbortCallbackLocked func(context.Context)

// When set to true, the transaction will always send a BeginTxn request to
// lay down a transaction record as early as possible.
eagerRecord bool

// mu contains state protected by the TxnCoordSender's mutex.
mu struct {
sync.Locker
Expand Down Expand Up @@ -178,7 +182,7 @@ func (h *txnHeartbeat) SendLocked(
ba.Txn.Key = anchor
}

if !h.st.Version.IsActive(cluster.VersionLazyTxnRecord) {
if h.eagerRecord || !h.st.Version.IsActive(cluster.VersionLazyTxnRecord) {
addedBeginTxn = true

// Set the key in the begin transaction request to the txn's anchor key.
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,14 @@ func (r *Replica) AdminMerge(
// TODO(benesch): expose a proper API for preventing the fast path.
_ = txn.CommitTimestamp()

// Multiple merge tests rely on a transaction record being written as
// early as possible in the lifecycle of a transaction. They aren't
// prepared for an untimely lease transfer to cause a restart.
// TODO(nvanbenschoten): Remove this once the tests are fixed.
if err := txn.EagerRecord(); err != nil {
return err
}

// Pipelining might send QueryIntent requests to the RHS after the RHS has
// noticed the merge and started blocking all traffic. This causes the merge
// transaction to deadlock. Just turn pipelining off; the structure of the
Expand Down