Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvcoord: key range cache entries by regular keys, not meta keys #52817

Merged
merged 4 commits into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
n1.QueryRow(t, `SELECT id from system.namespace2 WHERE name='test'`).Scan(&tableID)
tablePrefix := keys.MustAddr(keys.SystemSQLCodec.TablePrefix(tableID))
n4Cache := tc.Server(3).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
entry := n4Cache.GetCached(tablePrefix, false /* inverted */)
entry := n4Cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
Expand All @@ -291,7 +291,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
rec := <-recCh
require.False(t, kv.OnlyFollowerReads(rec), "query was not served through follower reads: %s", rec)
// Check that the cache was properly updated.
entry = n4Cache.GetCached(tablePrefix, false /* inverted */)
entry = n4Cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err
if k, err := keys.Addr(nextKey); err != nil {
log.Warningf(ctx, "failed to get RKey for flush key lookup")
} else {
r := b.rc.GetCached(k, false /* inverted */)
r := b.rc.GetCached(ctx, k, false /* inverted */)
if r != nil {
b.flushKey = r.Desc().EndKey.AsRawKey()
log.VEventf(ctx, 3, "building sstable that will flush before %v", b.flushKey)
Expand Down
28 changes: 14 additions & 14 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ func TestImmutableBatchArgs(t *testing.T) {
func TestRetryOnNotLeaseHolderError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

recognizedLeaseHolder := testUserRangeDescriptor3Replicas.Replicas().Voters()[1]
unrecognizedLeaseHolder := roachpb.ReplicaDescriptor{
Expand Down Expand Up @@ -632,7 +633,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
defer stopper.Stop(ctx)

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
Expand Down Expand Up @@ -678,13 +679,13 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
ds := NewDistSender(cfg)
v := roachpb.MakeValueFromString("value")
put := roachpb.NewPut(roachpb.Key("a"), v)
if _, pErr := kv.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") {
if _, pErr := kv.SendWrapped(ctx, ds, put); !testutils.IsPError(pErr, "boom") {
t.Fatalf("unexpected error: %v", pErr)
}
if first {
t.Fatal("the request did not retry")
}
rng := ds.rangeCache.GetCached(testUserRangeDescriptor.StartKey, false /* inverted */)
rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
require.NotNil(t, rng)

if tc.expLeaseholder != nil {
Expand Down Expand Up @@ -892,7 +893,7 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
t.Errorf("contacted n1: %t, contacted n2: %t", contacted1, contacted2)
}

rng := ds.rangeCache.GetCached(testUserRangeDescriptor.StartKey, false /* inverted */)
rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
require.Equal(t, roachpb.StoreID(2), rng.Lease().Replica.StoreID)
}

Expand Down Expand Up @@ -1162,7 +1163,7 @@ func TestEvictCacheOnError(t *testing.T) {
if _, pErr := kv.SendWrapped(ctx, ds, put); pErr != nil && !testutils.IsPError(pErr, errString) && !testutils.IsError(pErr.GoError(), ctx.Err().Error()) {
t.Errorf("put encountered unexpected error: %s", pErr)
}
rng := ds.rangeCache.GetCached(testUserRangeDescriptor.StartKey, false /* inverted */)
rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
if tc.shouldClearReplica {
require.Nil(t, rng)
} else if tc.shouldClearLeaseHolder {
Expand Down Expand Up @@ -1674,8 +1675,7 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) {
// Check that the cache has the updated descriptor returned by the RPC.
for _, ri := range tc {
rk := ri.Desc.StartKey
entry, err := ds.rangeCache.Lookup(ctx, rk)
require.NoError(t, err)
entry := ds.rangeCache.GetCached(ctx, rk, false /* inverted */)
require.NotNil(t, entry)
require.Equal(t, &ri.Desc, entry.Desc())
if ri.Lease.Empty() {
Expand Down Expand Up @@ -1746,7 +1746,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) {
if len(seen) == 1 {
// Pretend that this replica is the leaseholder in the cache to verify
// that the response evicts it.
rng := ds.rangeCache.GetCached(descriptor.StartKey, false /* inverse */)
rng := ds.rangeCache.GetCached(ctx, descriptor.StartKey, false /* inverse */)
ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: *rng.Desc(),
Lease: roachpb.Lease{Replica: ba.Replica},
Expand Down Expand Up @@ -1783,7 +1783,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) {
t.Fatal(err)
}

rng := ds.rangeCache.GetCached(descriptor.StartKey, false /* inverted */)
rng := ds.rangeCache.GetCached(ctx, descriptor.StartKey, false /* inverted */)
require.NotNil(t, rng)
require.Equal(t, leaseholderStoreID, rng.Lease().Replica.StoreID)
}
Expand Down Expand Up @@ -3406,7 +3406,7 @@ func TestCanSendToFollower(t *testing.T) {
// we've had where we were always updating the leaseholder on successful
// RPCs because we erroneously assumed that a success must come from the
// leaseholder.
rng := ds.rangeCache.GetCached(testUserRangeDescriptor.StartKey, false /* inverted */)
rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
require.NotNil(t, rng)
require.NotNil(t, rng.Lease())
require.Equal(t, roachpb.StoreID(2), rng.Lease().Replica.StoreID)
Expand Down Expand Up @@ -3571,7 +3571,7 @@ func TestEvictMetaRange(t *testing.T) {
}

// Verify that there is one meta2 cached range.
cachedRange := ds.rangeCache.GetCached(keys.RangeMetaKey(roachpb.RKey("a")), false)
cachedRange := ds.rangeCache.GetCached(ctx, keys.RangeMetaKey(roachpb.RKey("a")), false)
if !cachedRange.Desc().StartKey.Equal(keys.Meta2Prefix) || !cachedRange.Desc().EndKey.Equal(testMetaEndKey) {
t.Fatalf("expected cached meta2 range to be [%s, %s), actual [%s, %s)",
keys.Meta2Prefix, testMetaEndKey, cachedRange.Desc().StartKey, cachedRange.Desc().EndKey)
Expand All @@ -3586,12 +3586,12 @@ func TestEvictMetaRange(t *testing.T) {
}

// Verify that there are two meta2 cached ranges.
cachedRange = ds.rangeCache.GetCached(keys.RangeMetaKey(roachpb.RKey("a")), false)
cachedRange = ds.rangeCache.GetCached(ctx, keys.RangeMetaKey(roachpb.RKey("a")), false)
if !cachedRange.Desc().StartKey.Equal(keys.Meta2Prefix) || !cachedRange.Desc().EndKey.Equal(splitKey) {
t.Fatalf("expected cached meta2 range to be [%s, %s), actual [%s, %s)",
keys.Meta2Prefix, splitKey, cachedRange.Desc().StartKey, cachedRange.Desc().EndKey)
}
cachedRange = ds.rangeCache.GetCached(keys.RangeMetaKey(roachpb.RKey("b")), false)
cachedRange = ds.rangeCache.GetCached(ctx, keys.RangeMetaKey(roachpb.RKey("b")), false)
if !cachedRange.Desc().StartKey.Equal(splitKey) || !cachedRange.Desc().EndKey.Equal(testMetaEndKey) {
t.Fatalf("expected cached meta2 range to be [%s, %s), actual [%s, %s)",
splitKey, testMetaEndKey, cachedRange.Desc().StartKey, cachedRange.Desc().EndKey)
Expand Down Expand Up @@ -4139,7 +4139,7 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
_, err = ds.sendToReplicas(ctx, ba, tok, false /* withCommit */)
require.IsType(t, sendError{}, err)
require.Regexp(t, "NotLeaseHolderError", err)
cached := rc.GetCached(desc.StartKey, false /* inverted */)
cached := rc.GetCached(ctx, desc.StartKey, false /* inverted */)
if tc.expLeaseholder == 0 {
// Check that the descriptor was removed from the cache.
require.Nil(t, cached)
Expand Down
87 changes: 54 additions & 33 deletions pkg/kv/kvclient/kvcoord/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
// RangeCache.
type rangeCacheKey roachpb.RKey

var maxCacheKey interface{} = rangeCacheKey(roachpb.RKeyMax)
var minCacheKey interface{} = rangeCacheKey(roachpb.RKeyMin)

func (a rangeCacheKey) String() string {
return roachpb.Key(a).String()
Expand Down Expand Up @@ -535,7 +535,7 @@ func (rdc *RangeDescriptorCache) Lookup(
}

// GetCachedOverlapping returns all the cached entries which overlap a given
// span.
// span [Key, EndKey). The results are sorted ascendingly.
func (rdc *RangeDescriptorCache) GetCachedOverlapping(
ctx context.Context, span roachpb.RSpan,
) []kvbase.RangeCacheEntry {
Expand All @@ -552,23 +552,24 @@ func (rdc *RangeDescriptorCache) GetCachedOverlapping(
func (rdc *RangeDescriptorCache) getCachedOverlappingRLocked(
ctx context.Context, span roachpb.RSpan,
) []*cache.Entry {
start := rangeCacheKey(keys.RangeMetaKey(span.Key).Next())
var res []*cache.Entry
// We iterate from the range meta key after RangeMetaKey(desc.StartKey) to the
// end of the key space and we stop when we hit a descriptor to the right of
// span. Notice the Next() we use for the start key to avoid clearing the
// descriptor that ends where span starts: for example, if we are inserting
// ["b", "c"), we should not evict ["a", "b").
rdc.rangeCache.cache.DoRangeEntry(func(e *cache.Entry) (exit bool) {
cached := rdc.getValue(e)
// Stop when we hit a descriptor to the right of span. The end key is
// exclusive, so if span is [a,b) and we hit [b,c), we stop.
if span.EndKey.Compare(cached.Desc().StartKey) <= 0 {
rdc.rangeCache.cache.DoRangeReverseEntry(func(e *cache.Entry) (exit bool) {
desc := rdc.getValue(e).Desc()
if desc.StartKey.Equal(span.EndKey) {
// Skip over descriptor starting at the end key, who'd supposed to be exclusive.
return false
}
// Stop when we get to a lower range.
if desc.EndKey.Compare(span.Key) <= 0 {
return true
}
res = append(res, e)
return false // continue iterating
}, start, maxCacheKey)
}, rangeCacheKey(span.EndKey), minCacheKey)
// Invert the results so the get sorted ascendingly.
for i, j := 0, len(res)-1; i < j; i, j = i+1, j-1 {
res[i], res[j] = res[j], res[i]
}
return res
}

Expand Down Expand Up @@ -622,7 +623,7 @@ func (rdc *RangeDescriptorCache) tryLookup(
ctx context.Context, key roachpb.RKey, evictToken EvictionToken, useReverseScan bool,
) (EvictionToken, error) {
rdc.rangeCache.RLock()
if entry, _ := rdc.getCachedLocked(key, useReverseScan); entry != nil {
if entry, _ := rdc.getCachedRLocked(ctx, key, useReverseScan); entry != nil {
rdc.rangeCache.RUnlock()
returnToken := rdc.makeEvictionToken(entry, nil /* nextDesc */)
return returnToken, nil
Expand Down Expand Up @@ -817,7 +818,7 @@ func (rdc *RangeDescriptorCache) EvictByKey(ctx context.Context, descKey roachpb
rdc.rangeCache.Lock()
defer rdc.rangeCache.Unlock()

cachedDesc, entry := rdc.getCachedLocked(descKey, false /* inverted */)
cachedDesc, entry := rdc.getCachedRLocked(ctx, descKey, false /* inverted */)
if cachedDesc == nil {
return false
}
Expand All @@ -839,7 +840,7 @@ func (rdc *RangeDescriptorCache) EvictByKey(ctx context.Context, descKey roachpb
func (rdc *RangeDescriptorCache) evictLocked(
ctx context.Context, entry *rangeCacheEntry,
) (ok bool, updatedEntry *rangeCacheEntry) {
cachedEntry, rawEntry := rdc.getCachedLocked(entry.desc.StartKey, false /* inverted */)
cachedEntry, rawEntry := rdc.getCachedRLocked(ctx, entry.desc.StartKey, false /* inverted */)
if cachedEntry != entry {
if cachedEntry != nil && descsCompatible(cachedEntry.Desc(), entry.Desc()) {
return false, cachedEntry
Expand Down Expand Up @@ -868,48 +869,68 @@ func (rdc *RangeDescriptorCache) mustEvictLocked(ctx context.Context, entry *ran
// `inverted` determines the behavior at the range boundary: If set to true
// and `key` is the EndKey and StartKey of two adjacent ranges, the first range
// is returned instead of the second (which technically contains the given key).
func (rdc *RangeDescriptorCache) GetCached(key roachpb.RKey, inverted bool) kvbase.RangeCacheEntry {
func (rdc *RangeDescriptorCache) GetCached(
ctx context.Context, key roachpb.RKey, inverted bool,
) kvbase.RangeCacheEntry {
rdc.rangeCache.RLock()
defer rdc.rangeCache.RUnlock()
entry, _ := rdc.getCachedLocked(key, inverted)
entry, _ := rdc.getCachedRLocked(ctx, key, inverted)
if entry == nil {
// This return avoids boxing a nil into a non-nil iface.
return nil
}
return kvbase.RangeCacheEntry(entry)
}

// getCachedLocked is like GetCached, but it assumes that the caller holds a
// getCachedRLocked is like GetCached, but it assumes that the caller holds a
// read lock on rdc.rangeCache.
//
// In addition to GetCached, it also returns an internal cache Entry that can be
// used for descriptor eviction.
func (rdc *RangeDescriptorCache) getCachedLocked(
key roachpb.RKey, inverted bool,
func (rdc *RangeDescriptorCache) getCachedRLocked(
ctx context.Context, key roachpb.RKey, inverted bool,
) (*rangeCacheEntry, *cache.Entry) {
// The cache is indexed using the end-key of the range, but the
// end-key is non-inverted by default.
var metaKey roachpb.RKey
// rawEntry will be the range containing key, or the first cached entry around
// key, in the direction indicated by inverted.
var rawEntry *cache.Entry
if !inverted {
metaKey = keys.RangeMetaKey(key.Next())
var ok bool
rawEntry, ok = rdc.rangeCache.cache.FloorEntry(rangeCacheKey(key))
if !ok {
return nil, nil
}
} else {
metaKey = keys.RangeMetaKey(key)
rdc.rangeCache.cache.DoRangeReverseEntry(func(e *cache.Entry) bool {
startKey := roachpb.RKey(e.Key.(rangeCacheKey))
if key.Equal(startKey) {
// DoRangeReverseEntry is inclusive on the higher key. We're iterating
// backwards and we got a range that starts at key. We're not interested
// in this range; we're interested in the range before it that ends at
// key.
return false // continue iterating
}
rawEntry = e
return true
}, rangeCacheKey(key), minCacheKey)
// DoRangeReverseEntry is exclusive on the "to" part, so we need to check
// manually if there's an entry for RKeyMin.
if rawEntry == nil {
rawEntry, _ = rdc.rangeCache.cache.FloorEntry(minCacheKey)
}
}

rawEntry, ok := rdc.rangeCache.cache.CeilEntry(rangeCacheKey(metaKey))
if !ok {
if rawEntry == nil {
return nil, nil
}
entry := rdc.getValue(rawEntry)
desc := &entry.desc

containsFn := (*roachpb.RangeDescriptor).ContainsKey
if inverted {
containsFn = (*roachpb.RangeDescriptor).ContainsKeyInverted
}

// Return nil if the key does not belong to the range.
if !containsFn(desc, key) {
if !containsFn(entry.Desc(), key) {
return nil, nil
}
return entry, rawEntry
Expand Down Expand Up @@ -988,9 +1009,9 @@ func (rdc *RangeDescriptorCache) insertLockedInner(
entries[i] = newerEntry
continue
}
rangeKey := keys.RangeMetaKey(ent.Desc().EndKey)
rangeKey := ent.Desc().StartKey
if log.V(2) {
log.Infof(ctx, "adding cache entry: key=%s value=%s", rangeKey, ent)
log.Infof(ctx, "adding cache entry: value=%s", ent)
}
rdc.rangeCache.cache.Add(rangeCacheKey(rangeKey), ent)
entries[i] = ent
Expand Down
Loading