diff --git a/pkg/ccl/storageccl/engineccl/mvcc.go b/pkg/ccl/storageccl/engineccl/mvcc.go index 8fc662602b88..4d9beda35289 100644 --- a/pkg/ccl/storageccl/engineccl/mvcc.go +++ b/pkg/ccl/storageccl/engineccl/mvcc.go @@ -81,6 +81,17 @@ func NewMVCCIncrementalIterator(e engine.Reader, opts IterOptions) *MVCCIncremen return &MVCCIncrementalIterator{ e: e, upperBound: opts.UpperBound, + // It is necessary for correctness that sanityIter be created before + // iter. This is because the provided Reader may not be a consistent + // snapshot, so the two could end up observing different information. + // The hack around sanityCheckMetadataKey only works properly if all + // possible discrepancies between the two iterators lead to intents + // and values falling outside of the timestamp range **from iter's + // perspective**. This allows us to simply ignore discrepancies that + // we notice in advance(). See #34819. + sanityIter: e.NewIterator(engine.IterOptions{ + UpperBound: opts.UpperBound, + }), iter: e.NewIterator(engine.IterOptions{ // The call to startTime.Next() converts our exclusive start bound into // the inclusive start bound that MinTimestampHint expects. This is @@ -214,11 +225,6 @@ func (i *MVCCIncrementalIterator) advance() { // sees that exact key. Otherwise, it returns false. It's used in the workaround // in `advance` for a time-bound iterator bug. func (i *MVCCIncrementalIterator) sanityCheckMetadataKey() ([]byte, bool, error) { - if i.sanityIter == nil { - // The common case is that we'll won't need the sanityIter for a given - // MVCCIncrementalIterator, so create it lazily. - i.sanityIter = i.e.NewIterator(engine.IterOptions{UpperBound: i.upperBound}) - } unsafeKey := i.iter.UnsafeKey() i.sanityIter.Seek(unsafeKey) if ok, err := i.sanityIter.Valid(); err != nil { diff --git a/pkg/ccl/storageccl/engineccl/mvcc_test.go b/pkg/ccl/storageccl/engineccl/mvcc_test.go index 1bbb5e634d9e..aea032e64761 100644 --- a/pkg/ccl/storageccl/engineccl/mvcc_test.go +++ b/pkg/ccl/storageccl/engineccl/mvcc_test.go @@ -24,7 +24,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/pkg/errors" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func iterateExpectErr( @@ -459,6 +461,28 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) { } } +func slurpKVsInTimeRange( + e engine.Reader, prefix roachpb.Key, startTime, endTime hlc.Timestamp, +) ([]engine.MVCCKeyValue, error) { + endKey := prefix.PrefixEnd() + iter := NewMVCCIncrementalIterator(e, IterOptions{ + StartTime: startTime, + EndTime: endTime, + UpperBound: endKey, + }) + defer iter.Close() + var kvs []engine.MVCCKeyValue + for iter.Seek(engine.MakeMVCCMetadataKey(prefix)); ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + return nil, err + } else if !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { + break + } + kvs = append(kvs, engine.MVCCKeyValue{Key: iter.Key(), Value: iter.Value()}) + } + return kvs, nil +} + // TestMVCCIncrementalIteratorIntentDeletion checks a workaround in // MVCCIncrementalIterator for a bug in time-bound iterators, where an intent // has been deleted, but the time-bound iterator doesn't see the deletion. @@ -482,27 +506,6 @@ func TestMVCCIncrementalIteratorIntentDeletion(t *testing.T) { Status: roachpb.COMMITTED, } } - slurpKVs := func( - e engine.Reader, prefix roachpb.Key, startTime, endTime hlc.Timestamp, - ) ([]engine.MVCCKeyValue, error) { - endKey := prefix.PrefixEnd() - iter := NewMVCCIncrementalIterator(e, IterOptions{ - StartTime: startTime, - EndTime: endTime, - UpperBound: endKey, - }) - defer iter.Close() - var kvs []engine.MVCCKeyValue - for iter.Seek(engine.MakeMVCCMetadataKey(prefix)); ; iter.Next() { - if ok, err := iter.Valid(); err != nil { - return nil, err - } else if !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { - break - } - kvs = append(kvs, engine.MVCCKeyValue{Key: iter.Key(), Value: iter.Value()}) - } - return kvs, nil - } ctx := context.Background() kA := roachpb.Key("kA") @@ -564,32 +567,102 @@ func TestMVCCIncrementalIteratorIntentDeletion(t *testing.T) { // The kA ts1 intent has been resolved. There's now a new intent on kA, but // the timestamp (ts3) is too new so it should be ignored. - kvs, err := slurpKVs(db, kA, ts0, ts1) + kvs, err := slurpKVsInTimeRange(db, kA, ts0, ts1) require.NoError(t, err) require.Equal(t, []engine.MVCCKeyValue{ {Key: engine.MVCCKey{Key: kA, Timestamp: ts1}, Value: vA1.RawBytes}, }, kvs) // kA has a value at ts2. Again the intent is too new (ts3), so ignore. - kvs, err = slurpKVs(db, kA, ts0, ts2) + kvs, err = slurpKVsInTimeRange(db, kA, ts0, ts2) require.NoError(t, err) require.Equal(t, []engine.MVCCKeyValue{ {Key: engine.MVCCKey{Key: kA, Timestamp: ts2}, Value: vA2.RawBytes}, {Key: engine.MVCCKey{Key: kA, Timestamp: ts1}, Value: vA1.RawBytes}, }, kvs) // At ts3, we should see the new intent - _, err = slurpKVs(db, kA, ts0, ts3) + _, err = slurpKVsInTimeRange(db, kA, ts0, ts3) require.EqualError(t, err, `conflicting intents on "kA"`) // Similar to the kA ts1 check, but there is no newer intent. We expect to // pick up the intent deletion and it should cancel out the intent, leaving // only the value at ts1. - kvs, err = slurpKVs(db, kB, ts0, ts1) + kvs, err = slurpKVsInTimeRange(db, kB, ts0, ts1) require.NoError(t, err) require.Equal(t, []engine.MVCCKeyValue{ {Key: engine.MVCCKey{Key: kB, Timestamp: ts1}, Value: vB1.RawBytes}, }, kvs) // Sanity check that we see the still unresolved intent for kC ts1. - _, err = slurpKVs(db, kC, ts0, ts1) + _, err = slurpKVsInTimeRange(db, kC, ts0, ts1) require.EqualError(t, err, `conflicting intents on "kC"`) } + +// TestMVCCIncrementalIteratorIntentRewrittenConcurrently verifies that the +// workaround in MVCCIncrementalIterator to double-check for deleted intents +// properly handles cases where an intent originally in a time-bound iterator's +// time range is rewritten at a timestamp outside of its time range. +func TestMVCCIncrementalIteratorIntentRewrittenConcurrently(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Create a DB containing a single intent. + ctx := context.Background() + db := engine.NewInMem(roachpb.Attributes{}, 10<<20 /* 10 MB */) + defer db.Close() + + kA := roachpb.Key("kA") + vA1 := roachpb.MakeValueFromString("vA1") + vA2 := roachpb.MakeValueFromString("vA2") + ts0 := hlc.Timestamp{WallTime: 0} + ts1 := hlc.Timestamp{WallTime: 1} + ts2 := hlc.Timestamp{WallTime: 2} + ts3 := hlc.Timestamp{WallTime: 3} + txn := &roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + Key: roachpb.Key("b"), + ID: uuid.MakeV4(), + Epoch: 1, + Timestamp: ts1, + Sequence: 1, + }, + OrigTimestamp: ts1, + } + if err := engine.MVCCPut(ctx, db, nil, kA, ts1, vA1, txn); err != nil { + t.Fatal(err) + } + + // Concurrently iterate over the intent using a time-bound iterator and move + // the intent out of the time-bound iterator's time range by writing to it + // again at a higher timestamp. + g, _ := errgroup.WithContext(ctx) + g.Go(func() error { + // Re-write the intent with a higher timestamp. + txn.Timestamp = ts3 + txn.Sequence = 2 + return engine.MVCCPut(ctx, db, nil, kA, ts1, vA2, txn) + }) + g.Go(func() error { + // Iterate with a time range that includes the initial intent but does + // not include the new intent. + kvs, err := slurpKVsInTimeRange(db, kA, ts0, ts2) + + // There are two permissible outcomes from the scan. If the iteration + // wins the race with the put that moves the intent then it should + // observe the intent and return a write intent error. If the iteration + // loses the race with the put that moves the intent then it should + // observe and return nothing because there will be no committed or + // provisional keys in its time range. + if err != nil { + if !testutils.IsError(err, `conflicting intents on "kA"`) { + return err + } + } else { + if len(kvs) != 0 { + return errors.Errorf(`unexpected kvs: %v`, kvs) + } + } + return nil + }) + if err := g.Wait(); err != nil { + t.Fatal(err) + } +}