Skip to content

Commit

Permalink
kvclient: buffer blind writes on the client
Browse files Browse the repository at this point in the history
This patch adds logic to buffer blind writes (Puts or Deletes) in the
txnWriteBuffer. Interceptors and layers above remain oblivious to the
txnWriteBuffer's decision to buffer any writes.

All buffered writes are flushed at commit time, in the same batch as
the EndTxn request. This flushing at commit time is also hidden from
interceptors above the txnWriteBuffer by stripping responses on the
return path.

The code structure here allows us to split a batch request into
different bits, where some portion is evaluated locally and the rest
is sent to the KV layer. It's written with a future where we split
read-write requests (e.g. CPuts) into separate read/write halves, where
the read portion needs to be evaluated at the leaseholder, but the write
needs to be buffered.

Closes #139053

Release note: None
  • Loading branch information
arulajmani committed Feb 7, 2025
1 parent a3ce908 commit 9db05de
Show file tree
Hide file tree
Showing 6 changed files with 919 additions and 11 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ go_test(
"txn_interceptor_pipeliner_test.go",
"txn_interceptor_seq_num_allocator_test.go",
"txn_interceptor_span_refresher_test.go",
"txn_interceptor_write_buffer_test.go",
"txn_test.go",
":bufferedwrite_interval_btree_test.go", # keep
":mock_kvcoord", # keep
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,8 @@ func (tc *TxnCoordSender) Send(
return nil, pErr
}

if ba.IsSingleEndTxnRequest() && !tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() {
if ba.IsSingleEndTxnRequest() && !tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() &&
!tc.interceptorAlloc.txnWriteBuffer.hasBufferedWrites() {
return nil, tc.finalizeNonLockingTxnLocked(ctx, ba)
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
// to SendLocked will return the default successful response.
type mockLockedSender struct {
mockFn func(*kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error)
// numCalled is the number of times the mock function has been called.
numCalled int
}

func (m *mockLockedSender) SendLocked(
Expand All @@ -45,6 +47,7 @@ func (m *mockLockedSender) SendLocked(
br.Txn = ba.Txn
return br, nil
}
m.numCalled++
return m.mockFn(ba)
}

Expand All @@ -55,6 +58,11 @@ func (m *mockLockedSender) MockSend(
m.mockFn = fn
}

// NumCalled returns the number of times the mock function has been called.
func (m *mockLockedSender) NumCalled() int {
return m.numCalled
}

// ChainMockSend sets a series of mocking functions on the mockLockedSender.
// The provided mocking functions are set in the order that they are provided
// and a given mocking function is set after the previous one has been called.
Expand Down
Loading

0 comments on commit 9db05de

Please sign in to comment.