Skip to content

Commit

Permalink
engineccl/mvcc: fix time-bound iterator's interaction with moving int…
Browse files Browse the repository at this point in the history
…ents

Fixes cockroachdb#34819.

349ff61 introduced a workaround for cockroachdb#28358 into MVCCIncrementalIterator.
This workaround created a second (non-time-bound) iterator to verify
possibly-phantom MVCCMetadata keys during iteration.

We found in cockroachdb#34819 that it is necessary for correctness that sanityIter
be created before the original iter. This is because the provided Reader
that both iterators are created from may not be a consistent snapshot, so
the two iterators could end up observing different information. The hack
around sanityCheckMetadataKey only works properly if all possible
discrepancies between the two iterators lead to intents and values falling
outside of the timestamp range **from the time-bound iterators perspective**.
This allows us to simply ignore discrepancies that we notice in advance().

This commit makes this change. It also adds a test that failed regularly
before the fix under stress and no longer fails after the fix.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 20, 2019
1 parent deb08aa commit ab04e70
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 31 deletions.
16 changes: 11 additions & 5 deletions pkg/ccl/storageccl/engineccl/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ func NewMVCCIncrementalIterator(e engine.Reader, opts IterOptions) *MVCCIncremen
return &MVCCIncrementalIterator{
e: e,
upperBound: opts.UpperBound,
// It is necessary for correctness that sanityIter be created before
// iter. This is because the provided Reader may not be a consistent
// snapshot, so the two could end up observing different information.
// The hack around sanityCheckMetadataKey only works properly if all
// possible discrepancies between the two iterators lead to intents
// and values falling outside of the timestamp range **from iter's
// perspective**. This allows us to simply ignore discrepancies that
// we notice in advance(). See #34819.
sanityIter: e.NewIterator(engine.IterOptions{
UpperBound: opts.UpperBound,
}),
iter: e.NewIterator(engine.IterOptions{
// The call to startTime.Next() converts our exclusive start bound into
// the inclusive start bound that MinTimestampHint expects. This is
Expand Down Expand Up @@ -214,11 +225,6 @@ func (i *MVCCIncrementalIterator) advance() {
// sees that exact key. Otherwise, it returns false. It's used in the workaround
// in `advance` for a time-bound iterator bug.
func (i *MVCCIncrementalIterator) sanityCheckMetadataKey() ([]byte, bool, error) {
if i.sanityIter == nil {
// The common case is that we'll won't need the sanityIter for a given
// MVCCIncrementalIterator, so create it lazily.
i.sanityIter = i.e.NewIterator(engine.IterOptions{UpperBound: i.upperBound})
}
unsafeKey := i.iter.UnsafeKey()
i.sanityIter.Seek(unsafeKey)
if ok, err := i.sanityIter.Valid(); err != nil {
Expand Down
125 changes: 99 additions & 26 deletions pkg/ccl/storageccl/engineccl/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func iterateExpectErr(
Expand Down Expand Up @@ -459,6 +461,28 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) {
}
}

func slurpKVsInTimeRange(
e engine.Reader, prefix roachpb.Key, startTime, endTime hlc.Timestamp,
) ([]engine.MVCCKeyValue, error) {
endKey := prefix.PrefixEnd()
iter := NewMVCCIncrementalIterator(e, IterOptions{
StartTime: startTime,
EndTime: endTime,
UpperBound: endKey,
})
defer iter.Close()
var kvs []engine.MVCCKeyValue
for iter.Seek(engine.MakeMVCCMetadataKey(prefix)); ; iter.Next() {
if ok, err := iter.Valid(); err != nil {
return nil, err
} else if !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 {
break
}
kvs = append(kvs, engine.MVCCKeyValue{Key: iter.Key(), Value: iter.Value()})
}
return kvs, nil
}

// TestMVCCIncrementalIteratorIntentDeletion checks a workaround in
// MVCCIncrementalIterator for a bug in time-bound iterators, where an intent
// has been deleted, but the time-bound iterator doesn't see the deletion.
Expand All @@ -482,27 +506,6 @@ func TestMVCCIncrementalIteratorIntentDeletion(t *testing.T) {
Status: roachpb.COMMITTED,
}
}
slurpKVs := func(
e engine.Reader, prefix roachpb.Key, startTime, endTime hlc.Timestamp,
) ([]engine.MVCCKeyValue, error) {
endKey := prefix.PrefixEnd()
iter := NewMVCCIncrementalIterator(e, IterOptions{
StartTime: startTime,
EndTime: endTime,
UpperBound: endKey,
})
defer iter.Close()
var kvs []engine.MVCCKeyValue
for iter.Seek(engine.MakeMVCCMetadataKey(prefix)); ; iter.Next() {
if ok, err := iter.Valid(); err != nil {
return nil, err
} else if !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 {
break
}
kvs = append(kvs, engine.MVCCKeyValue{Key: iter.Key(), Value: iter.Value()})
}
return kvs, nil
}

ctx := context.Background()
kA := roachpb.Key("kA")
Expand Down Expand Up @@ -564,32 +567,102 @@ func TestMVCCIncrementalIteratorIntentDeletion(t *testing.T) {

// The kA ts1 intent has been resolved. There's now a new intent on kA, but
// the timestamp (ts3) is too new so it should be ignored.
kvs, err := slurpKVs(db, kA, ts0, ts1)
kvs, err := slurpKVsInTimeRange(db, kA, ts0, ts1)
require.NoError(t, err)
require.Equal(t, []engine.MVCCKeyValue{
{Key: engine.MVCCKey{Key: kA, Timestamp: ts1}, Value: vA1.RawBytes},
}, kvs)
// kA has a value at ts2. Again the intent is too new (ts3), so ignore.
kvs, err = slurpKVs(db, kA, ts0, ts2)
kvs, err = slurpKVsInTimeRange(db, kA, ts0, ts2)
require.NoError(t, err)
require.Equal(t, []engine.MVCCKeyValue{
{Key: engine.MVCCKey{Key: kA, Timestamp: ts2}, Value: vA2.RawBytes},
{Key: engine.MVCCKey{Key: kA, Timestamp: ts1}, Value: vA1.RawBytes},
}, kvs)
// At ts3, we should see the new intent
_, err = slurpKVs(db, kA, ts0, ts3)
_, err = slurpKVsInTimeRange(db, kA, ts0, ts3)
require.EqualError(t, err, `conflicting intents on "kA"`)

// Similar to the kA ts1 check, but there is no newer intent. We expect to
// pick up the intent deletion and it should cancel out the intent, leaving
// only the value at ts1.
kvs, err = slurpKVs(db, kB, ts0, ts1)
kvs, err = slurpKVsInTimeRange(db, kB, ts0, ts1)
require.NoError(t, err)
require.Equal(t, []engine.MVCCKeyValue{
{Key: engine.MVCCKey{Key: kB, Timestamp: ts1}, Value: vB1.RawBytes},
}, kvs)

// Sanity check that we see the still unresolved intent for kC ts1.
_, err = slurpKVs(db, kC, ts0, ts1)
_, err = slurpKVsInTimeRange(db, kC, ts0, ts1)
require.EqualError(t, err, `conflicting intents on "kC"`)
}

// TestMVCCIncrementalIteratorIntentRewrittenConcurrently verifies that the
// workaround in MVCCIncrementalIterator to double-check for deleted intents
// properly handles cases where an intent originally in a time-bound iterator's
// time range is rewritten at a timestamp outside of its time range.
func TestMVCCIncrementalIteratorIntentRewrittenConcurrently(t *testing.T) {
defer leaktest.AfterTest(t)()

// Create a DB containing a single intent.
ctx := context.Background()
db := engine.NewInMem(roachpb.Attributes{}, 10<<20 /* 10 MB */)
defer db.Close()

kA := roachpb.Key("kA")
vA1 := roachpb.MakeValueFromString("vA1")
vA2 := roachpb.MakeValueFromString("vA2")
ts0 := hlc.Timestamp{WallTime: 0}
ts1 := hlc.Timestamp{WallTime: 1}
ts2 := hlc.Timestamp{WallTime: 2}
ts3 := hlc.Timestamp{WallTime: 3}
txn := &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
Key: roachpb.Key("b"),
ID: uuid.MakeV4(),
Epoch: 1,
Timestamp: ts1,
Sequence: 1,
},
OrigTimestamp: ts1,
}
if err := engine.MVCCPut(ctx, db, nil, kA, ts1, vA1, txn); err != nil {
t.Fatal(err)
}

// Concurrently iterate over the intent using a time-bound iterator and move
// the intent out of the time-bound iterator's time range by writing to it
// again at a higher timestamp.
g, _ := errgroup.WithContext(ctx)
g.Go(func() error {
// Re-write the intent with a higher timestamp.
txn.Timestamp = ts3
txn.Sequence = 2
return engine.MVCCPut(ctx, db, nil, kA, ts1, vA2, txn)
})
g.Go(func() error {
// Iterate with a time range that includes the initial intent but does
// not include the new intent.
kvs, err := slurpKVsInTimeRange(db, kA, ts0, ts2)

// There are two permissible outcomes from the scan. If the iteration
// wins the race with the put that moves the intent then it should
// observe the intent and return a write intent error. If the iteration
// loses the race with the put that moves the intent then it should
// observe and return nothing because there will be no committed or
// provisional keys in its time range.
if err != nil {
if !testutils.IsError(err, `conflicting intents on "kA"`) {
return err
}
} else {
if len(kvs) != 0 {
return errors.Errorf(`unexpected kvs: %v`, kvs)
}
}
return nil
})
if err := g.Wait(); err != nil {
t.Fatal(err)
}
}

0 comments on commit ab04e70

Please sign in to comment.