Skip to content

Commit

Permalink
Merge #73725
Browse files Browse the repository at this point in the history
73725: kv: add TestTxnReadWithinUncertaintyInterval r=lidorcarmel a=nvanbenschoten

Extracted from #72121, because that PR is on-hold.

The first commit adds a new test called `TestTxnReadWithinUncertaintyInterval` which tests cases where a transaction observes a committed value below its global uncertainty limit while performing a read. In one variant, the transaction does not have an observed timestamp, so it hits a ReadWithinUncertaintyIntervalError. In a second variant, it does have an observed timestamp, so it avoids the error.

This is very old functionality, but I could not find a functional test like this in `pkg/kv/kvserver`, so I figured I'd add one. We have tests below this level (in pkg/storage) and above this level (jepsen, I hope elsewhere as well), but I couldn't find a similar test operating at this exact level.

The second commit adds an additional variant of this test which exercises a case where a transaction observes a committed value in its uncertainty interval that was written under a previous leaseholder. In the test, the transaction does collect an observed timestamp from the KV node that eventually serves the read, but this observed timestamp is not allowed to be used to constrain its local uncertainty limit below the lease start time and avoid the uncertainty error. As a result, it observes the committed value in its uncertain future and receives a ReadWithinUncertaintyIntervalError, which avoids a stale read.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Dec 13, 2021
2 parents 22df0a6 + 26eed0b commit d565b79
Showing 1 changed file with 200 additions and 0 deletions.
200 changes: 200 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,206 @@ func TestTxnPutOutOfOrder(t *testing.T) {
}
}

// TestTxnReadWithinUncertaintyInterval tests cases where a transaction observes
// a committed value below its global uncertainty limit while performing a read.
//
// In one variant of the test, the transaction does not have an observed
// timestamp from the KV node serving the read, so its uncertainty interval
// during its read extends all the way to its global uncertainty limit. As a
// result, it observes the committed value in its uncertain future and receives
// a ReadWithinUncertaintyIntervalError.
//
// In a second variant of the test, the transaction does have an observed
// timestamp from the KV node serving the read, so its uncertainty interval
// during its read extends only up to this observed timestamp. This observed
// timestamp is below the committed value's timestamp. As a result, the
// transaction observes the committed value in its certain future, allowing it
// to avoid the ReadWithinUncertaintyIntervalError.
func TestTxnReadWithinUncertaintyInterval(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunTrueAndFalse(t, "observedTS", func(t *testing.T, observedTS bool) {
ctx := context.Background()
manual := hlc.NewHybridManualClock()
srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manual.UnixNano,
},
},
})
s := srv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
store, err := s.Stores().GetStore(s.GetFirstStoreID())
require.NoError(t, err)

// Split off a scratch range.
key, err := s.ScratchRange()
require.NoError(t, err)

// Pause the server's clocks going forward.
manual.Pause()

// Create a new transaction.
now := s.Clock().Now()
maxOffset := s.Clock().MaxOffset().Nanoseconds()
require.NotZero(t, maxOffset)
txn := roachpb.MakeTransaction("test", key, 1, now, maxOffset)
require.True(t, txn.ReadTimestamp.Less(txn.GlobalUncertaintyLimit))
require.Len(t, txn.ObservedTimestamps, 0)

// If the test variant wants an observed timestamp from the server at a
// timestamp below the value, collect one now.
if observedTS {
get := getArgs(key)
resp, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: &txn}, get)
require.Nil(t, pErr)
txn.Update(resp.Header().Txn)
require.Len(t, txn.ObservedTimestamps, 1)
}

// Perform a non-txn write. This will grab a timestamp from the clock.
put := putArgs(key, []byte("val"))
_, pErr := kv.SendWrapped(ctx, store.TestSender(), put)
require.Nil(t, pErr)

// Perform another read on the same key. Depending on whether or not the txn
// had collected an observed timestamp, it may or may not observe the value
// in its uncertainty interval and throw an error.
get := getArgs(key)
_, pErr = kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: &txn}, get)
if observedTS {
require.Nil(t, pErr)
} else {
require.NotNil(t, pErr)
require.IsType(t, &roachpb.ReadWithinUncertaintyIntervalError{}, pErr.GetDetail())
}
})
}

// TestTxnReadWithinUncertaintyIntervalAfterLeaseTransfer tests a case where a
// transaction observes a committed value in its uncertainty interval that was
// written under a previous leaseholder. In the test, the transaction does
// collect an observed timestamp from the KV node that eventually serves the
// read, but this observed timestamp is not allowed to be used to constrain its
// local uncertainty limit below the lease start time and avoid the uncertainty
// error. As a result, it observes the committed value in its uncertain future
// and receives a ReadWithinUncertaintyIntervalError, which avoids a stale read.
//
// See TestRangeLocalUncertaintyLimitAfterNewLease for a similar test.
func TestTxnReadWithinUncertaintyIntervalAfterLeaseTransfer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numNodes = 2
var manuals []*hlc.HybridManualClock
var clocks []*hlc.Clock
for i := 0; i < numNodes; i++ {
manuals = append(manuals, hlc.NewHybridManualClock())
}
serverArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manuals[i].UnixNano,
},
},
}
}
ctx := context.Background()
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

// Split off two scratch ranges.
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
tc.SplitRangeOrFatal(t, keyA)
keyADesc, keyBDesc := tc.SplitRangeOrFatal(t, keyB)
// Place key A's sole replica on node 1 and key B's sole replica on node 2.
tc.AddVotersOrFatal(t, keyB, tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, keyBDesc, tc.Target(1))
tc.RemoveVotersOrFatal(t, keyB, tc.Target(0))

// Pause the servers' clocks going forward.
var maxNanos int64
for i, m := range manuals {
m.Pause()
if cur := m.UnixNano(); cur > maxNanos {
maxNanos = cur
}
clocks = append(clocks, tc.Servers[i].Clock())
}
// After doing so, perfectly synchronize them.
for _, m := range manuals {
m.Increment(maxNanos - m.UnixNano())
}

// Create a new transaction using the second node as the gateway.
now := clocks[1].Now()
maxOffset := clocks[1].MaxOffset().Nanoseconds()
require.NotZero(t, maxOffset)
txn := roachpb.MakeTransaction("test", keyB, 1, now, maxOffset)
require.True(t, txn.ReadTimestamp.Less(txn.GlobalUncertaintyLimit))
require.Len(t, txn.ObservedTimestamps, 0)

// Collect an observed timestamp in that transaction from node 2.
getB := getArgs(keyB)
resp, pErr := kv.SendWrappedWith(ctx, tc.Servers[1].DistSender(), roachpb.Header{Txn: &txn}, getB)
require.Nil(t, pErr)
txn.Update(resp.Header().Txn)
require.Len(t, txn.ObservedTimestamps, 1)

// Advance the clock on the first node.
manuals[0].Increment(100)

// Perform a non-txn write on node 1. This will grab a timestamp from node 1's
// clock, which leads the clock on node 2.
//
// NOTE: we perform the clock increment and write _after_ creating the
// transaction and collecting an observed timestamp. Ideally, we would write
// this test such that we did this before beginning the transaction on node 2,
// so that the absence of an uncertainty error would be a true "stale read".
// However, doing so causes the test to be flaky because background operations
// can leak the clock signal from node 1 to node 2 between the time that we
// write and the time that the transaction begins. If we had a way to disable
// all best-effort HLC clock stabilization channels and only propagate clock
// signals when strictly necessary then it's possible that we could avoid
// flakiness. For now, we just re-order the operations and assert that we
// receive an uncertainty error even though its absence would not be a true
// stale read.
ba := roachpb.BatchRequest{}
ba.Add(putArgs(keyA, []byte("val")))
br, pErr := tc.Servers[0].DistSender().Send(ctx, ba)
require.Nil(t, pErr)
writeTs := br.Timestamp

// The transaction has a read timestamp beneath the write's commit timestamp
// but a global uncertainty limit above the write's commit timestamp. The
// observed timestamp collected is also beneath the write's commit timestamp.
require.True(t, txn.ReadTimestamp.Less(writeTs))
require.True(t, writeTs.Less(txn.GlobalUncertaintyLimit))
require.True(t, txn.ObservedTimestamps[0].Timestamp.ToTimestamp().Less(writeTs))

// Add a replica for key A's range to node 2. Transfer the lease.
tc.AddVotersOrFatal(t, keyA, tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, keyADesc, tc.Target(1))

// Perform another read in the transaction, this time on key A. This will be
// routed to node 2, because it now holds the lease for key A. Even though the
// transaction has collected an observed timestamp from node 2, it cannot use
// it to constrain its local uncertainty limit below the lease start time and
// avoid the uncertainty error. This is a good thing, as doing so would allow
// for a stale read.
getA := getArgs(keyA)
_, pErr = kv.SendWrappedWith(ctx, tc.Servers[1].DistSender(), roachpb.Header{Txn: &txn}, getA)
require.NotNil(t, pErr)
require.IsType(t, &roachpb.ReadWithinUncertaintyIntervalError{}, pErr.GetDetail())
}

// TestRangeLookupUseReverse tests whether the results and the results count
// are correct when scanning in reverse order.
func TestRangeLookupUseReverse(t *testing.T) {
Expand Down

0 comments on commit d565b79

Please sign in to comment.