Skip to content

Commit

Permalink
Merge #60567
Browse files Browse the repository at this point in the history
60567: kv/kvserver: allow the merge transaction to be pushed r=nvanbenschoten a=ajwerner

Historically we have not allowed the merge transaction to be pushed. The
reason we disabled this was because of the hazard due to attempting to
refresh reads on the RHS of the merge after the SubsumeRequest has been
sent. The `SubsumeRequest` effectively freezes the RHS until the merge
commits or aborts. In order to side-step this hazard, this change ensures
that nothing should prevent the merge transaction from either committing
or aborting.

### kv/kvclient: add ManualRefresh support for transactions

This commit adds support for client-initiated refreshes of transactions.
The implementation is somewhat simplistic in that it hijacks existing
logic that occurs during the sending of a request. This makes the
implementation more uniform with the rest of the client library at
the cost of being somewhat awkward and implicit from a code-reading
perspective.

The motivation for this change is to provide the necessary tools to allow
the merge transaction to get pushed. The adoption follows in the next
commit.

Fixes #59308.

Release note: None


Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Feb 16, 2021
2 parents 1e1f915 + 3ff8b35 commit 8f5dc42
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 53 deletions.
21 changes: 21 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,3 +1143,24 @@ func (tc *TxnCoordSender) GetSteppingMode(ctx context.Context) (curMode kv.Stepp
}
return curMode
}

// ManualRefresh is part of the TxnSender interface.
func (tc *TxnCoordSender) ManualRefresh(ctx context.Context) error {
tc.mu.Lock()
defer tc.mu.Unlock()

// Hijack the pre-emptive refresh code path to perform the refresh but
// provide the force flag to ensure that the refresh occurs unconditionally.
var ba roachpb.BatchRequest
ba.Txn = tc.mu.txn.Clone()
const force = true
ba, pErr := tc.interceptorAlloc.txnSpanRefresher.maybeRefreshPreemptivelyLocked(ctx, ba, force)
if pErr != nil {
pErr = tc.updateStateLocked(ctx, ba, nil, pErr)
} else {
var br roachpb.BatchResponse
br.Txn = ba.Txn
pErr = tc.updateStateLocked(ctx, ba, &br, pErr)
}
return pErr.GoError()
}
167 changes: 167 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2400,3 +2400,170 @@ func TestPutsInStagingTxn(t *testing.T) {
// seen a batch with the STAGING status.
require.True(t, putInStagingSeen)
}

// TestTxnManualRefresh verifies that TxnCoordSender's ManualRefresh method
// works as expected.
func TestTxnManualRefresh(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Create some machinery to mock out the kvserver and allow the test to
// launch some requests from the client and then pass control flow of handling
// those requests back to the test.
type resp struct {
br *roachpb.BatchResponse
pErr *roachpb.Error
}
type req struct {
ba roachpb.BatchRequest
respCh chan resp
}
type testCase struct {
name string
run func(
ctx context.Context,
t *testing.T,
db *kv.DB,
clock *hlc.ManualClock,
reqCh <-chan req,
)
}
var cases = []testCase{
{
name: "no-op",
run: func(
ctx context.Context, t *testing.T, db *kv.DB,
clock *hlc.ManualClock, reqCh <-chan req,
) {
txn := db.NewTxn(ctx, "test")
errCh := make(chan error)
go func() {
_, err := txn.Get(ctx, "foo")
errCh <- err
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Get)
require.True(t, ok)
var br roachpb.BatchResponse
br.Txn = r.ba.Txn
br.Add(&roachpb.GetResponse{})
r.respCh <- resp{br: &br}
}
require.NoError(t, <-errCh)

// Now a refresh should be a no-op which is indicated by the fact that
// this call does not block to send requests.
require.NoError(t, txn.ManualRefresh(ctx))
require.NoError(t, txn.Commit(ctx))
},
},
{
name: "refresh occurs due to read",
run: func(
ctx context.Context, t *testing.T, db *kv.DB,
clock *hlc.ManualClock, reqCh <-chan req,
) {
txn := db.NewTxn(ctx, "test")
errCh := make(chan error)
go func() {
_, err := txn.Get(ctx, "foo")
errCh <- err
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Get)
require.True(t, ok)
var br roachpb.BatchResponse
br.Txn = r.ba.Txn
br.Add(&roachpb.GetResponse{})
r.respCh <- resp{br: &br}
}
require.NoError(t, <-errCh)

go func() {
errCh <- txn.Put(ctx, "bar", "baz")
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Put)
require.True(t, ok)
var br roachpb.BatchResponse
br.Txn = r.ba.Txn.Clone()
// Push the WriteTimestamp simulating an interaction with the
// timestamp cache.
br.Txn.WriteTimestamp =
br.Txn.WriteTimestamp.Add(time.Millisecond.Nanoseconds(), 0)
br.Add(&roachpb.PutResponse{})
r.respCh <- resp{br: &br}
}
require.NoError(t, <-errCh)

go func() {
errCh <- txn.ManualRefresh(ctx)
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Refresh)
require.True(t, ok)
var br roachpb.BatchResponse
br.Txn = r.ba.Txn.Clone()
br.Add(&roachpb.RefreshResponse{})
r.respCh <- resp{br: &br}
}
require.NoError(t, <-errCh)

// Now a refresh should be a no-op which is indicated by the fact that
// this call does not block to send requests.
require.NoError(t, txn.ManualRefresh(ctx))
},
},
}
run := func(t *testing.T, tc testCase) {
stopper := stop.NewStopper()
manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
ctx := context.Background()
defer stopper.Stop(ctx)

reqCh := make(chan req)
var senderFn kv.SenderFunc = func(_ context.Context, ba roachpb.BatchRequest) (
*roachpb.BatchResponse, *roachpb.Error) {
r := req{
ba: ba,
respCh: make(chan resp),
}
select {
case reqCh <- r:
case <-ctx.Done():
return nil, roachpb.NewError(ctx.Err())
}
select {
case rr := <-r.respCh:
return rr.br, rr.pErr
case <-ctx.Done():
return nil, roachpb.NewError(ctx.Err())
}
}
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
tsf := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Clock: clock,
Stopper: stopper,
HeartbeatInterval: time.Hour,
},
senderFn,
)
db := kv.NewDB(ambient, tsf, clock, stopper)

cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
tc.run(cancelCtx, t, db, manual, reqCh)
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
})
}
}
12 changes: 7 additions & 5 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (sr *txnSpanRefresher) SendLocked(
}

// Attempt a refresh before sending the batch.
ba, pErr := sr.maybeRefreshPreemptively(ctx, ba)
ba, pErr := sr.maybeRefreshPreemptivelyLocked(ctx, ba, false)
if pErr != nil {
return nil, pErr
}
Expand Down Expand Up @@ -406,13 +406,15 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend(
return br, nil
}

// maybeRefreshPreemptively attempts to refresh a transaction's read timestamp
// maybeRefreshPreemptivelyLocked attempts to refresh a transaction's read timestamp
// eagerly. Doing so can take advantage of opportunities where the refresh is
// free or can avoid wasting work issuing a batch containing an EndTxn that will
// necessarily throw a serializable error. The method returns a batch with an
// updated transaction if the refresh is successful, or a retry error if not.
func (sr *txnSpanRefresher) maybeRefreshPreemptively(
ctx context.Context, ba roachpb.BatchRequest,
// If the force flag is true, the refresh will be attempted even if a refresh
// is not inevitable.
func (sr *txnSpanRefresher) maybeRefreshPreemptivelyLocked(
ctx context.Context, ba roachpb.BatchRequest, force bool,
) (roachpb.BatchRequest, *roachpb.Error) {
// If we know that the transaction will need a refresh at some point because
// its write timestamp has diverged from its read timestamp, consider doing
Expand Down Expand Up @@ -466,7 +468,7 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptively(
refreshInevitable := hasET && args.(*roachpb.EndTxnRequest).Commit

// If neither condition is true, defer the refresh.
if !refreshFree && !refreshInevitable {
if !refreshFree && !refreshInevitable && !force {
return ba, nil
}

Expand Down
69 changes: 32 additions & 37 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,37 +1241,10 @@ func TestStoreRangeMergeSplitRace_MergeWins(t *testing.T) {
// transaction's only intent so far is on P's local range descriptor, and so the
// split transaction can happily commit.
//
// The merge transaction then continues, writing an intent on Q's local
// descriptor. Since the merge transaction is executing at an earlier timestamp
// than the split transaction, the intent is written "under" the updated
// descriptor written by the split transaction.
//
// In the past, the merge transaction would simply push its commit timestamp
// forward and proceed, even though, upon committing, it would discover that it
// was forbidden from committing with a pushed timestamp and abort instead. (For
// why merge transactions cannot forward their commit timestamps, see the
// discussion on the retry loop within AdminMerge.) This was problematic. Before
// the doomed merge transaction attempted to commit, it would send a Subsume
// request, launching a merge watcher goroutine on Q. This watcher goroutine
// could incorrectly think that the merge transaction committed. Why? To
// determine whether a merge has truly aborted, the watcher goroutine sends a
// Get(/Meta2/QEndKey) request with a read uncommitted isolation level. If the
// Get request returns either nil or a descriptor for a different range, the
// merge is assumed to have committed. In this case, unfortunately, QEndKey is
// the Q's end key post-split. After all, the split has committed and updated
// Q's in-memory descriptor. The split transactions intents are cleaned up
// asynchronously, however, and since the watcher goroutine is not performing a
// consistent read it will not wait for the intents to be cleaned up. So
// Get(/Meta2/QEndKey) might return nil, in which case the watcher goroutine
// will incorrectly infer that the merge committed. (Note that the watcher
// goroutine can't perform a consistent read, as that would look up the
// transaction record on Q and deadlock, since Q is blocked for merging.)
//
// The bug was fixed by updating Q's local descriptor with a conditional put
// instead of a put. This forces the merge transaction to fail early if writing
// the intent would require forwarding the commit timestamp. In other words,
// this ensures that the merge watcher goroutine is never launched if the RHS
// local descriptor is updated while the merge transaction is executing.
// The merge transaction then continues, reading and writing an intent on Q's
// local descriptor. The locking nature of the read request to Q's local
// descriptor ensures that the merge transaction will observe the post-split
// value for Q.
func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -1280,20 +1253,34 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) {

var distSender *kvcoord.DistSender
var lhsDescKey atomic.Value
var lhsStartKey atomic.Value
var launchSplit int64
var mergeRetries int64
var mergePreSplit atomic.Value
var splitCommit atomic.Value
var mergeEndTxnTimestamp atomic.Value
testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
for _, req := range ba.Requests {
if get := req.GetGet(); get != nil && get.KeyLocking != lock.None {
if v := lhsDescKey.Load(); v != nil && v.(roachpb.Key).Equal(get.Key) {
// If this is the first merge attempt, launch the split
// before the merge's first locking read succeeds.
if atomic.CompareAndSwapInt64(&launchSplit, 1, 0) {
mergePreSplit.Store(ba.Txn.ReadTimestamp)
_, pErr := kv.SendWrapped(ctx, distSender, adminSplitArgs(roachpb.Key("c")))
return pErr
}
// Otherwise, record that the merge retried and proceed.
atomic.AddInt64(&mergeRetries, 1)
// Otherwise, proceed.
}
}
if split := req.GetAdminSplit(); split != nil && split.Key.Equal(roachpb.Key("c")) {
splitCommit.Store(ba.Timestamp)
}
if endTxn := req.GetEndTxn(); endTxn != nil {
ct := endTxn.InternalCommitTrigger
startKey, _ := lhsStartKey.Load().(roachpb.RKey)
if ct != nil && ct.MergeTrigger != nil && startKey != nil &&
startKey.Equal(ct.MergeTrigger.LeftDesc.StartKey) {
mergeEndTxnTimestamp.Store(ba.Txn.ReadTimestamp)
}
}
}
Expand Down Expand Up @@ -1321,13 +1308,21 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) {
}
lhsDescKey.Store(keys.RangeDescriptorKey(lhsDesc.StartKey))
atomic.StoreInt64(&launchSplit, 1)
lhsStartKey.Store(lhsDesc.StartKey)

mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
if _, pErr := kv.SendWrapped(ctx, distSender, mergeArgs); pErr != nil {
_, pErr := kv.SendWrapped(ctx, distSender, mergeArgs)
if pErr != nil {
t.Fatal(pErr)
}
if atomic.LoadInt64(&mergeRetries) == 0 {
t.Fatal("expected merge to retry at least once due to concurrent split")
mergePreSplitTS := mergePreSplit.Load().(hlc.Timestamp)
splitTS := splitCommit.Load().(hlc.Timestamp)
mergePostSplitTS := mergeEndTxnTimestamp.Load().(hlc.Timestamp)
if splitTS.LessEq(mergePreSplitTS) {
t.Fatalf("expected merge to start before concurrent split, %v <= %v", splitTS, mergePreSplitTS)
}
if mergePostSplitTS.LessEq(splitTS) {
t.Fatalf("expected merge to finish after concurrent split, %v <= %v", mergePostSplitTS, splitTS)
}
}

Expand Down
24 changes: 16 additions & 8 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,12 +564,6 @@ func (r *Replica) AdminMerge(
log.Event(ctx, "merge txn begins")
txn.SetDebugName(mergeTxnName)

// Observe the commit timestamp to force a client-side retry. See the
// comment on the retry loop after this closure for details.
//
// TODO(benesch): expose a proper API for preventing the fast path.
_ = txn.CommitTimestamp()

// Pipelining might send QueryIntent requests to the RHS after the RHS has
// noticed the merge and started blocking all traffic. This causes the merge
// transaction to deadlock. Just turn pipelining off; the structure of the
Expand Down Expand Up @@ -708,6 +702,18 @@ func (r *Replica) AdminMerge(
return err
}

// Refresh the transaction so that the transaction won't try to refresh
// its reads on the RHS after it is frozen.
if err := txn.ManualRefresh(ctx); err != nil {
return err
}

// Freeze the commit timestamp of the transaction to prevent future pushes
// due to high-priority reads from other transactions. Any attempt to
// refresh reads on the RHS would result in a stalled merge because the
// RHS will be frozen after the Subsume is sent.
_ = txn.CommitTimestamp()

// Intents have been placed, so the merge is now in its critical phase. Get
// a consistent view of the data from the right-hand range. If the merge
// commits, we'll write this data to the left-hand range in the merge
Expand Down Expand Up @@ -761,8 +767,10 @@ func (r *Replica) AdminMerge(
// we'll unlock the right-hand range, giving the next, fresh transaction a
// chance to succeed.
//
// Note that client.DB.Txn performs retries using the same transaction, so we
// have to use our own retry loop.
// A second reason to eschew kv.DB.Txn() is that the API to disable pipelining
// is finicky and only allows disabling pipelining before any operations have
// been sent, even in prior epochs. Calling DisablePipelining() on a restarted
// transaction yields an error.
for {
txn := kv.NewTxn(ctx, r.store.DB(), r.NodeID())
err := runMergeTxn(txn)
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ func (m *MockTransactionalSender) GetSteppingMode(context.Context) SteppingMode
return SteppingDisabled
}

// ManualRefresh is part of the TxnSender interface.
func (m *MockTransactionalSender) ManualRefresh(ctx context.Context) error {
panic("unimplemented")
}

// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) (
Expand Down
Loading

0 comments on commit 8f5dc42

Please sign in to comment.