Skip to content

Commit

Permalink
Merge pull request cockroachdb#22965 from spencerkimball/fix-22543
Browse files Browse the repository at this point in the history
storage: refresh spans on InitPut; heed tombstones on Refresh / Refre…
  • Loading branch information
spencerkimball authored Feb 23, 2018
2 parents eadf651 + 8faa7bb commit 43e20cc
Show file tree
Hide file tree
Showing 20 changed files with 154 additions and 46 deletions.
4 changes: 2 additions & 2 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ typedef struct {
} DBScanResults;

DBScanResults MVCCGet(DBIterator* iter, DBSlice key, DBTimestamp timestamp, DBTxn txn,
bool consistent);
bool consistent, bool tombstones);
DBScanResults MVCCScan(DBIterator* iter, DBSlice start, DBSlice end, DBTimestamp timestamp,
int64_t max_keys, DBTxn txn, bool consistent, bool reverse);
int64_t max_keys, DBTxn txn, bool consistent, bool reverse, bool tombstones);

// DBStatsResult contains various runtime stats for RocksDB.
typedef struct {
Expand Down
10 changes: 5 additions & 5 deletions c-deps/libroach/mvcc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ DBStatus MVCCFindSplitKey(DBIterator* iter, DBKey start, DBKey end, DBKey min_sp
}

DBScanResults MVCCGet(DBIterator* iter, DBSlice key, DBTimestamp timestamp, DBTxn txn,
bool consistent) {
bool consistent, bool tombstones) {
// Get is implemented as a scan where we retrieve a single key. Note
// that the semantics of max_keys is that we retrieve one more key
// than is specified in order to maintain the existing semantics of
Expand All @@ -270,17 +270,17 @@ DBScanResults MVCCGet(DBIterator* iter, DBSlice key, DBTimestamp timestamp, DBTx
// don't retrieve a key different than the start key. This is a bit
// of a hack.
const DBSlice end = {0, 0};
mvccForwardScanner scanner(iter, key, end, timestamp, 0 /* max_keys */, txn, consistent);
mvccForwardScanner scanner(iter, key, end, timestamp, 0 /* max_keys */, txn, consistent, tombstones);
return scanner.get();
}

DBScanResults MVCCScan(DBIterator* iter, DBSlice start, DBSlice end, DBTimestamp timestamp,
int64_t max_keys, DBTxn txn, bool consistent, bool reverse) {
int64_t max_keys, DBTxn txn, bool consistent, bool reverse, bool tombstones) {
if (reverse) {
mvccReverseScanner scanner(iter, end, start, timestamp, max_keys, txn, consistent);
mvccReverseScanner scanner(iter, end, start, timestamp, max_keys, txn, consistent, tombstones);
return scanner.scan();
} else {
mvccForwardScanner scanner(iter, start, end, timestamp, max_keys, txn, consistent);
mvccForwardScanner scanner(iter, start, end, timestamp, max_keys, txn, consistent, tombstones);
return scanner.scan();
}
}
8 changes: 6 additions & 2 deletions c-deps/libroach/mvcc.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ static const int kMaxItersBeforeSeek = 10;
template <bool reverse> class mvccScanner {
public:
mvccScanner(DBIterator* iter, DBSlice start, DBSlice end, DBTimestamp timestamp, int64_t max_keys,
DBTxn txn, bool consistent)
DBTxn txn, bool consistent, bool tombstones)
: iter_(iter),
iter_rep_(iter->rep.get()),
start_key_(ToSlice(start)),
Expand All @@ -61,6 +61,7 @@ template <bool reverse> class mvccScanner {
txn_epoch_(txn.epoch),
txn_max_timestamp_(txn.max_timestamp),
consistent_(consistent),
tombstones_(tombstones),
check_uncertainty_(timestamp < txn.max_timestamp),
kvs_(new chunkedBuffer),
intents_(new rocksdb::WriteBatch),
Expand Down Expand Up @@ -418,7 +419,9 @@ template <bool reverse> class mvccScanner {
}

bool addAndAdvance(const rocksdb::Slice& value) {
if (value.size() > 0) {
// Don't include deleted versions (value.size() == 0), unless we've been
// instructed to include tombstones in the results.
if (value.size() > 0 || tombstones_) {
kvs_->Put(cur_raw_key_, value);
if (kvs_->Count() > max_keys_) {
return false;
Expand Down Expand Up @@ -605,6 +608,7 @@ template <bool reverse> class mvccScanner {
const uint32_t txn_epoch_;
const DBTimestamp txn_max_timestamp_;
const bool consistent_;
const bool tombstones_;
const bool check_uncertainty_;
DBScanResults results_;
std::unique_ptr<chunkedBuffer> kvs_;
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func runDebugGCCmd(cmd *cobra.Command, args []string) error {
var descs []roachpb.RangeDescriptor

if _, err := engine.MVCCIterate(context.Background(), db, start, end, hlc.MaxTimestamp,
false /* consistent */, nil, /* txn */
false /* consistent */, false /* tombstones */, nil, /* txn */
false /* reverse */, func(kv roachpb.KeyValue) (bool, error) {
var desc roachpb.RangeDescriptor
_, suffix, _, err := keys.DecodeRangeKey(kv.Key)
Expand Down Expand Up @@ -705,7 +705,7 @@ func runDebugCheckStoreRaft(ctx context.Context, db *engine.RocksDB) error {
}

if _, err := engine.MVCCIterate(ctx, db, start, end, hlc.MaxTimestamp,
false /* consistent */, nil, /* txn */
false /* consistent */, false /* tombstones */, nil, /* txn */
false /* reverse */, func(kv roachpb.KeyValue) (bool, error) {
rangeID, _, suffix, detail, err := keys.DecodeRangeIDKey(kv.Key)
if err != nil {
Expand Down
33 changes: 32 additions & 1 deletion pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2146,7 +2146,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
expFailure: "unexpected value", // the failure we get is a condition failed error
},
{
name: "write too old with initput failing on tombstone",
name: "write too old with initput failing on tombstone before",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Del(ctx, "iput")
},
Expand All @@ -2158,6 +2158,37 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
expFailure: "unexpected value", // condition failed error when failing on tombstones
},
{
name: "write too old with initput failing on tombstone after",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "iput", "put")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
// The first time through, this will be a noop, as put is the existing value.
if err := txn.InitPut(ctx, "iput", "put", true); err != nil {
return err
}
// Create an out-of-band tombstone on the iput, which must be refreshed
// when the put below experiences a write-too-old error.
if err := txn.DB().Del(ctx, "iput"); err != nil {
return err
}
// Write the version of "a" which triggers write-too-old
// *after* the tombstone at the "iput" key, to ensure we see the
// tombstone when refreshing the iput span.
if err := txn.DB().Put(ctx, "a", "value"); err != nil {
return err
}
// This command will get a write too old and refresh the init
// put, forcing a client-retry. On the retry, the init put
// will fail with a condition failed error.
return txn.Put(ctx, "a", "value")
},
clientRetry: true,
// Would get a condition failed error when failing on
// tombstones, but the retryable is not re-executed in the
// test fixture.
},
{
name: "write too old with put in batch commit",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
Expand Down
8 changes: 5 additions & 3 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,10 +966,12 @@ func (*ConditionalPutRequest) flags() int {

// InitPut, like ConditionalPut, effectively reads and may not write.
// It also may return the actual data read on ConditionFailedErrors,
// so must update the timestamp cache on errors. Like CPut, InitPuts
// do not require a refresh because they return errors on write-too-old.
// so must update the timestamp cache on errors. Unlike CPut, InitPuts
// require a refresh because they may execute successfully without
// leaving an intent (i.e., when the existing value is equal to the
// proposed value).
func (*InitPutRequest) flags() int {
return isRead | isWrite | isTxn | isTxnWrite | updatesReadTSCache | updatesTSCacheOnError | consultsTSCache
return isRead | isWrite | isTxn | isTxnWrite | updatesReadTSCache | updatesTSCacheOnError | needsRefresh | consultsTSCache
}

// Increment reads the existing value, but always leaves an intent so
Expand Down
3 changes: 2 additions & 1 deletion pkg/roachpb/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func TestRefreshSpanIterate(t *testing.T) {
}
ba.RefreshSpanIterate(&br, fn)
// Only the conditional put isn't considered a read span.
expReadSpans := []Span{testCases[4].span, testCases[5].span, testCases[6].span}
expReadSpans := []Span{testCases[2].span, testCases[4].span, testCases[5].span, testCases[6].span}
expWriteSpans := []Span{testCases[7].span}
if !reflect.DeepEqual(expReadSpans, readSpans) {
t.Fatalf("unexpected read spans: expected %+v, found = %+v", expReadSpans, readSpans)
Expand All @@ -284,6 +284,7 @@ func TestRefreshSpanIterate(t *testing.T) {
writeSpans = []Span{}
ba.RefreshSpanIterate(&br, fn)
expReadSpans = []Span{
{Key: Key("a-initput")},
{Key("a"), Key("b")},
{Key: Key("b")},
{Key("e"), Key("f")},
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/abortspan/abortspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (sc *AbortSpan) Iterate(
ctx context.Context, e engine.Reader, f func([]byte, roachpb.AbortSpanEntry),
) {
_, _ = engine.MVCCIterate(ctx, e, sc.min(), sc.max(), hlc.Timestamp{},
true /* consistent */, nil /* txn */, false, /* reverse */
true /* consistent */, false /* tombstones */, nil /* txn */, false, /* reverse */
func(kv roachpb.KeyValue) (bool, error) {
var entry roachpb.AbortSpanEntry
if _, err := keys.DecodeAbortSpanKey(kv.Key, nil); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/batcheval/cmd_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ func Refresh(
}

// Get the most recent committed value and return any intent by
// specifying consistent=false.
// specifying consistent=false. Note that we include tombstones,
// which must be considered as updates on refresh.
log.VEventf(ctx, 2, "refresh %s @[%s-%s]", args.Header(), h.Txn.OrigTimestamp, h.Txn.Timestamp)
val, intents, err := engine.MVCCGet(
val, intents, err := engine.MVCCGetWithTombstone(
ctx, batch, args.Key, h.Txn.Timestamp, false /* consistent */, nil, /* txn */
)

if err != nil {
return result.Result{}, err
} else if val != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/batcheval/cmd_refresh_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func RefreshRange(
// timestamp. Note that we do not iterate using the txn and the
// iteration is done with consistent=false. This reads only
// committed values and returns all intents, including those from
// the txn itself.
// the txn itself. Note that we include tombstones, which must be
// considered as updates on refresh.
log.VEventf(ctx, 2, "refresh %s @[%s-%s]", args.Header(), h.Txn.OrigTimestamp, h.Txn.Timestamp)
intents, err := engine.MVCCIterateUsingIter(
ctx,
Expand All @@ -60,6 +61,7 @@ func RefreshRange(
args.EndKey,
h.Txn.Timestamp,
false, /* consistent */
true, /* tombstones */
nil, /* txn */
false, /* reverse */
iter,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestStoreRangeMergeMetadataCleanup(t *testing.T) {
store := createTestStoreWithConfig(t, stopper, storeCfg)

scan := func(f func(roachpb.KeyValue) (bool, error)) {
if _, err := engine.MVCCIterate(context.Background(), store.Engine(), roachpb.KeyMin, roachpb.KeyMax, hlc.Timestamp{}, true, nil, false, f); err != nil {
if _, err := engine.MVCCIterate(context.Background(), store.Engine(), roachpb.KeyMin, roachpb.KeyMax, hlc.Timestamp{}, true, false, nil, false, f); err != nil {
t.Fatal(err)
}
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,19 @@ type Iterator interface {
// value is returned in batch repr format with the key being present as the
// empty string. If an intent exists at the specified key, it will be
// returned in batch repr format in the separate intent return value.
// Specify true for tombstones to return a value if the key has been
// deleted (Value.RawBytes will be empty).
MVCCGet(key roachpb.Key, timestamp hlc.Timestamp,
txn *roachpb.Transaction, consistent bool,
txn *roachpb.Transaction, consistent, tombstones bool,
) (*roachpb.Value, []roachpb.Intent, error)
// MVCCScan scans the underlying engine from start to end keys and returns
// key/value pairs which have a timestamp less than or equal to the supplied
// timestamp, up to a max rows. The key/value pairs are returned as a buffer
// of varint-prefixed slices, alternating from key to value, numKvs pairs.
// Specify true for tombstones to return deleted values (the value portion
// will be empty).
MVCCScan(start, end roachpb.Key, max int64, timestamp hlc.Timestamp,
txn *roachpb.Transaction, consistent, reverse bool,
txn *roachpb.Transaction, consistent, reverse, tombstone bool,
) (kvs []byte, numKvs int64, intents []byte, err error)
}

Expand Down
34 changes: 27 additions & 7 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ func (b *getBuffer) release() {
// MVCCGet returns the value for the key specified in the request,
// while satisfying the given timestamp condition. The key may contain
// arbitrary bytes. If no value for the key exists, or it has been
// deleted, returns nil for value.
// deleted returns nil for value.
//
// The values of multiple versions for the given key should
// be organized as follows:
Expand All @@ -697,7 +697,24 @@ func MVCCGet(
txn *roachpb.Transaction,
) (*roachpb.Value, []roachpb.Intent, error) {
iter := engine.NewIterator(true)
value, intents, err := iter.MVCCGet(key, timestamp, txn, consistent)
value, intents, err := iter.MVCCGet(key, timestamp, txn, consistent, false /* tombstones */)
iter.Close()
return value, intents, err
}

// MVCCGetWithTombstone is like MVCCGet (see comments above), but if
// the value has been deleted, returns a non-nil value with RawBytes
// set to nil for tombstones.
func MVCCGetWithTombstone(
ctx context.Context,
engine Reader,
key roachpb.Key,
timestamp hlc.Timestamp,
consistent bool,
txn *roachpb.Transaction,
) (*roachpb.Value, []roachpb.Intent, error) {
iter := engine.NewIterator(true)
value, intents, err := iter.MVCCGet(key, timestamp, txn, consistent, true /* tombstones */)
iter.Close()
return value, intents, err
}
Expand Down Expand Up @@ -1627,6 +1644,7 @@ func mvccScanInternal(
max int64,
timestamp hlc.Timestamp,
consistent bool,
tombstones bool,
txn *roachpb.Transaction,
reverse bool,
) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) {
Expand All @@ -1640,7 +1658,7 @@ func mvccScanInternal(
ownIter = true
}
kvData, numKvs, intentData, err := iter.MVCCScan(
key, endKey, max, timestamp, txn, consistent, reverse)
key, endKey, max, timestamp, txn, consistent, reverse, tombstones)
if ownIter {
iter.Close()
}
Expand Down Expand Up @@ -1797,7 +1815,7 @@ func MVCCScan(
txn *roachpb.Transaction,
) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) {
return mvccScanInternal(ctx, engine, nil, key, endKey, max, timestamp,
consistent, txn, false /* reverse */)
consistent, false /* tombstones */, txn, false /* reverse */)
}

// MVCCReverseScan scans the key range [start,end) key up to some maximum
Expand All @@ -1814,7 +1832,7 @@ func MVCCReverseScan(
txn *roachpb.Transaction,
) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) {
return mvccScanInternal(ctx, engine, nil, key, endKey, max, timestamp,
consistent, txn, true /* reverse */)
consistent, false /* tombstones */, txn, true /* reverse */)
}

// MVCCIterate iterates over the key range [start,end). At each step of the
Expand All @@ -1827,6 +1845,7 @@ func MVCCIterate(
startKey, endKey roachpb.Key,
timestamp hlc.Timestamp,
consistent bool,
tombstones bool,
txn *roachpb.Transaction,
reverse bool,
f func(roachpb.KeyValue) (bool, error),
Expand All @@ -1836,7 +1855,7 @@ func MVCCIterate(
defer iter.Close()

return MVCCIterateUsingIter(
ctx, engine, startKey, endKey, timestamp, consistent, txn, reverse, iter, f,
ctx, engine, startKey, endKey, timestamp, consistent, tombstones, txn, reverse, iter, f,
)
}

Expand All @@ -1848,6 +1867,7 @@ func MVCCIterateUsingIter(
startKey, endKey roachpb.Key,
timestamp hlc.Timestamp,
consistent bool,
tombstones bool,
txn *roachpb.Transaction,
reverse bool,
iter Iterator,
Expand All @@ -1859,7 +1879,7 @@ func MVCCIterateUsingIter(
for {
const maxKeysPerScan = 1000
kvs, resume, newIntents, err := mvccScanInternal(
ctx, engine, iter, startKey, endKey, maxKeysPerScan, timestamp, consistent, txn, reverse)
ctx, engine, iter, startKey, endKey, maxKeysPerScan, timestamp, consistent, tombstones, txn, reverse)
if err != nil {
switch tErr := err.(type) {
case *roachpb.WriteIntentError:
Expand Down
Loading

0 comments on commit 43e20cc

Please sign in to comment.