Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
52799: colexec: implement resizing in the hash table r=yuzefovich a=yuzefovich

**colexec: improve benchmarks of hash aggregation and some cleanup**

Previously, we would populate groups in ordered fashion for both ordered
and hash aggregators, but this is not very realistic in case of the hash
aggregation. This commit makes the groups distribution random (but still
adhering to the target group sizes). It also adds variation of number of
input batches to the benchmark of the aggregators.

This commit also removes an unused parameter from the tuple distributor.

Release note: None

**colexec: clean up mixed-type equality checks in the joins**

We currently only support the cases of same-type as well as integers of
mixed widths in the equality conditions (all other allowed mixed-type
comparisons are pushed into the ON condition, see 43060), so we will
now generate the code only for same-type comparisons and for integer
ones in the hash table.

This limitation became apparent when the resizing of the hash table was
put in place, and TestHashJoinerAgainstProcessor became flaky. Note that
the test could flake before, but it was impossibly unlikely - we had
a fixed large number of hash buckets, so the chance of a hash collision
was a lot lower.

This commit updates the test to deflake it (to generate comparable types
only of the cases that could occur in non-test environment) as well as
updates the hash table template to not generate some dead code.

Release note: None

**colexec: implement resizing in the hash table**

This commit implements the resizing logic in the vectorized hash table.
Previously, the number of hash buckets has been fixed at 2^16 and now
it'll start out at coldata.BatchSize() and will grow exponentitally while
maintaining the target load factor (the average number of tuples per
bucket).

Note that we don't have a maximum number of buckets because this commit
also adds memory accounting for the internal auxiliary slices of the hash
table. Not having a maximum is acceptable because every bucket takes up
16 bytes, and if we are going to hit the memory limit, mostly likely
it'll be because of the tuples stored in `ht.vals` and
`buildScratch.next` slice (which is of the same length as `ht.vals`).

All users of the hash table (hash joiner, hash aggregator, and unordered
distinct) have been benchmarked using microbenchmarks and TPCH queries
and the load factor has been chosen separately. I also tried out different
initial numbers of buckets, but using coldata.BatchSize() showed the most
stable performance. The reasoning for such choice:
- on one hand, we make several other allocations that have to be at
least coldata.BatchSize() in size, so we don't win much in the case of
the input with small number of tuples;
- on the other hand, if we start out with a larger number, we won't be
using vast of majority of the buckets on the input with small number of
tuples (a downside) while not gaining much in the case of the input with
large number of tuples.

#Fixes: #52257.

Release note: None

52817: kvcoord: key range cache entries by regular keys, not meta keys r=andreimatei a=andreimatei

Before this patch, entries in the range cache were keyed by
RangeMetaKey(desc.EndKey). There were no good reasons why either
the RangeMetaKey() transformation was used, or why the EndKey was used
instead of the StartKey. Presumably this has all been done in order to
match how descriptors are stored in the meta ranges, but this was
misguided since it's confusing.

This patch makes cache entries simply be keyed on desc.Start. This fixes
a problem with querying the cache with RKeyMin.Next(). Before this
patch, this query was broken since it was failing to find the cached
Meta1 range: the range was [RKeyMin, Meta2Start), and so it was keyed at
RangeMetaKey(Meta2Start) = Meta1/Meta2Start. The cache lookup was using
RangeMetaKey(RKeyMin.Next), which results in /Meta2/<something> (which
is too high) because of arguably a bug in the implementation of
RangeMetaKey. Fixing that bug proves to not be quite straigh-forward
(#52786). This patch fixes the problem by not needing to apply neither
.Next() nor RangeMetaKey() when performing cache lookups.

This patch fixes the issues referenced below; what the restores in those
issues were experiencing was a spectacular consequence of the failure to
lookup the cache described above: evicting the Meta1 descriptor from the
cache was always failing (because the lookup was failing) which, in
turn, was causing the DistSender to believe that it was perpetually
receiving old lease information and so it would loop around contacting
the same replica over and over. There's something to improve in the
DistSender, coming separately.
While the underlying RangeMetaKey() bug is old, I believe this bug was
exposed more by the recent range cache changes. It used to be that we
would generally evict the Meta1 descriptor from the cache based on the
key that caused that descriptor to be inserted in the cache. Now we
generally evict the descriptor based on the range's start key (which is
RKeyMin, and suffers from the RKeyMin.Next() issue).

Fixes #52468
Fixes #52391
Fixes #50987

Release note: None

53105: intentresolver: misc cleanups r=andreimatei a=andreimatei

See individual commits.

53134: tree: remove pointer definition for DBox2D r=sumeerbhola a=otan

Box2D should never actually be nil inside, so remove that requirement.

Resolves #53013.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
4 people committed Aug 20, 2020
5 parents 979127c + 1397b60 + 5ea819f + 75dbf3d + 6a9db4a commit 8bc4417
Show file tree
Hide file tree
Showing 53 changed files with 2,677 additions and 25,261 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
n1.QueryRow(t, `SELECT id from system.namespace2 WHERE name='test'`).Scan(&tableID)
tablePrefix := keys.MustAddr(keys.SystemSQLCodec.TablePrefix(tableID))
n4Cache := tc.Server(3).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
entry := n4Cache.GetCached(tablePrefix, false /* inverted */)
entry := n4Cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
Expand All @@ -291,7 +291,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
rec := <-recCh
require.False(t, kv.OnlyFollowerReads(rec), "query was not served through follower reads: %s", rec)
// Check that the cache was properly updated.
entry = n4Cache.GetCached(tablePrefix, false /* inverted */)
entry = n4Cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func runDebugGCCmd(cmd *cobra.Command, args []string) error {
now, thresh, policy,
gc.NoopGCer{},
func(_ context.Context, _ []roachpb.Intent) error { return nil },
func(_ context.Context, _ *roachpb.Transaction, _ []roachpb.LockUpdate) error { return nil },
func(_ context.Context, _ *roachpb.Transaction) error { return nil },
)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/geo/bbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ func (b *CartesianBoundingBox) Repr() string {
}

// ParseCartesianBoundingBox parses a box2d string into a bounding box.
func ParseCartesianBoundingBox(s string) (*CartesianBoundingBox, error) {
b := &CartesianBoundingBox{}
func ParseCartesianBoundingBox(s string) (CartesianBoundingBox, error) {
b := CartesianBoundingBox{}
var prefix string
numScanned, err := fmt.Sscanf(s, "%3s(%f %f,%f %f)", &prefix, &b.LoX, &b.LoY, &b.HiX, &b.HiY)
if err != nil {
return nil, errors.Wrapf(err, "error parsing box2d")
return b, errors.Wrapf(err, "error parsing box2d")
}
if numScanned != 5 || strings.ToLower(prefix) != "box" {
return nil, errors.Newf("expected format 'box(min_x min_y,max_x max_y)'")
return b, errors.Newf("expected format 'box(min_x min_y,max_x max_y)'")
}
return b, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/geo/bbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
func TestParseCartesianBoundingBox(t *testing.T) {
testCases := []struct {
s string
expected *CartesianBoundingBox
expected CartesianBoundingBox
expectedError bool
}{
{
s: "box(1 2,3 4)",
expected: &CartesianBoundingBox{
expected: CartesianBoundingBox{
BoundingBox: geopb.BoundingBox{
LoX: 1,
LoY: 2,
Expand All @@ -39,7 +39,7 @@ func TestParseCartesianBoundingBox(t *testing.T) {
},
{
s: "BOX(1 2,3 4)",
expected: &CartesianBoundingBox{
expected: CartesianBoundingBox{
BoundingBox: geopb.BoundingBox{
LoX: 1,
LoY: 2,
Expand Down
6 changes: 1 addition & 5 deletions pkg/geo/geomfn/topology_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package geomfn
import (
"github.com/cockroachdb/cockroach/pkg/geo"
"github.com/cockroachdb/cockroach/pkg/geo/geos"
"github.com/cockroachdb/errors"
)

// Centroid returns the Centroid of a given Geometry.
Expand All @@ -26,13 +25,10 @@ func Centroid(g *geo.Geometry) (*geo.Geometry, error) {
}

// ClipByRect clips a given Geometry by the given BoundingBox.
func ClipByRect(g *geo.Geometry, b *geo.CartesianBoundingBox) (*geo.Geometry, error) {
func ClipByRect(g *geo.Geometry, b geo.CartesianBoundingBox) (*geo.Geometry, error) {
if g.Empty() {
return g, nil
}
if b == nil {
return nil, errors.Newf("expected not null bounding box")
}
clipByRectEWKB, err := geos.ClipByRect(g.EWKB(), b.LoX, b.LoY, b.HiX, b.HiY)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err
if k, err := keys.Addr(nextKey); err != nil {
log.Warningf(ctx, "failed to get RKey for flush key lookup")
} else {
r := b.rc.GetCached(k, false /* inverted */)
r := b.rc.GetCached(ctx, k, false /* inverted */)
if r != nil {
b.flushKey = r.Desc().EndKey.AsRawKey()
log.VEventf(ctx, 3, "building sstable that will flush before %v", b.flushKey)
Expand Down
28 changes: 14 additions & 14 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ func TestImmutableBatchArgs(t *testing.T) {
func TestRetryOnNotLeaseHolderError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

recognizedLeaseHolder := testUserRangeDescriptor3Replicas.Replicas().Voters()[1]
unrecognizedLeaseHolder := roachpb.ReplicaDescriptor{
Expand Down Expand Up @@ -632,7 +633,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
defer stopper.Stop(ctx)

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
Expand Down Expand Up @@ -678,13 +679,13 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
ds := NewDistSender(cfg)
v := roachpb.MakeValueFromString("value")
put := roachpb.NewPut(roachpb.Key("a"), v)
if _, pErr := kv.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") {
if _, pErr := kv.SendWrapped(ctx, ds, put); !testutils.IsPError(pErr, "boom") {
t.Fatalf("unexpected error: %v", pErr)
}
if first {
t.Fatal("the request did not retry")
}
rng := ds.rangeCache.GetCached(testUserRangeDescriptor.StartKey, false /* inverted */)
rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
require.NotNil(t, rng)

if tc.expLeaseholder != nil {
Expand Down Expand Up @@ -892,7 +893,7 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
t.Errorf("contacted n1: %t, contacted n2: %t", contacted1, contacted2)
}

rng := ds.rangeCache.GetCached(testUserRangeDescriptor.StartKey, false /* inverted */)
rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
require.Equal(t, roachpb.StoreID(2), rng.Lease().Replica.StoreID)
}

Expand Down Expand Up @@ -1162,7 +1163,7 @@ func TestEvictCacheOnError(t *testing.T) {
if _, pErr := kv.SendWrapped(ctx, ds, put); pErr != nil && !testutils.IsPError(pErr, errString) && !testutils.IsError(pErr.GoError(), ctx.Err().Error()) {
t.Errorf("put encountered unexpected error: %s", pErr)
}
rng := ds.rangeCache.GetCached(testUserRangeDescriptor.StartKey, false /* inverted */)
rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
if tc.shouldClearReplica {
require.Nil(t, rng)
} else if tc.shouldClearLeaseHolder {
Expand Down Expand Up @@ -1674,8 +1675,7 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) {
// Check that the cache has the updated descriptor returned by the RPC.
for _, ri := range tc {
rk := ri.Desc.StartKey
entry, err := ds.rangeCache.Lookup(ctx, rk)
require.NoError(t, err)
entry := ds.rangeCache.GetCached(ctx, rk, false /* inverted */)
require.NotNil(t, entry)
require.Equal(t, &ri.Desc, entry.Desc())
if ri.Lease.Empty() {
Expand Down Expand Up @@ -1746,7 +1746,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) {
if len(seen) == 1 {
// Pretend that this replica is the leaseholder in the cache to verify
// that the response evicts it.
rng := ds.rangeCache.GetCached(descriptor.StartKey, false /* inverse */)
rng := ds.rangeCache.GetCached(ctx, descriptor.StartKey, false /* inverse */)
ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: *rng.Desc(),
Lease: roachpb.Lease{Replica: ba.Replica},
Expand Down Expand Up @@ -1783,7 +1783,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) {
t.Fatal(err)
}

rng := ds.rangeCache.GetCached(descriptor.StartKey, false /* inverted */)
rng := ds.rangeCache.GetCached(ctx, descriptor.StartKey, false /* inverted */)
require.NotNil(t, rng)
require.Equal(t, leaseholderStoreID, rng.Lease().Replica.StoreID)
}
Expand Down Expand Up @@ -3406,7 +3406,7 @@ func TestCanSendToFollower(t *testing.T) {
// we've had where we were always updating the leaseholder on successful
// RPCs because we erroneously assumed that a success must come from the
// leaseholder.
rng := ds.rangeCache.GetCached(testUserRangeDescriptor.StartKey, false /* inverted */)
rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
require.NotNil(t, rng)
require.NotNil(t, rng.Lease())
require.Equal(t, roachpb.StoreID(2), rng.Lease().Replica.StoreID)
Expand Down Expand Up @@ -3571,7 +3571,7 @@ func TestEvictMetaRange(t *testing.T) {
}

// Verify that there is one meta2 cached range.
cachedRange := ds.rangeCache.GetCached(keys.RangeMetaKey(roachpb.RKey("a")), false)
cachedRange := ds.rangeCache.GetCached(ctx, keys.RangeMetaKey(roachpb.RKey("a")), false)
if !cachedRange.Desc().StartKey.Equal(keys.Meta2Prefix) || !cachedRange.Desc().EndKey.Equal(testMetaEndKey) {
t.Fatalf("expected cached meta2 range to be [%s, %s), actual [%s, %s)",
keys.Meta2Prefix, testMetaEndKey, cachedRange.Desc().StartKey, cachedRange.Desc().EndKey)
Expand All @@ -3586,12 +3586,12 @@ func TestEvictMetaRange(t *testing.T) {
}

// Verify that there are two meta2 cached ranges.
cachedRange = ds.rangeCache.GetCached(keys.RangeMetaKey(roachpb.RKey("a")), false)
cachedRange = ds.rangeCache.GetCached(ctx, keys.RangeMetaKey(roachpb.RKey("a")), false)
if !cachedRange.Desc().StartKey.Equal(keys.Meta2Prefix) || !cachedRange.Desc().EndKey.Equal(splitKey) {
t.Fatalf("expected cached meta2 range to be [%s, %s), actual [%s, %s)",
keys.Meta2Prefix, splitKey, cachedRange.Desc().StartKey, cachedRange.Desc().EndKey)
}
cachedRange = ds.rangeCache.GetCached(keys.RangeMetaKey(roachpb.RKey("b")), false)
cachedRange = ds.rangeCache.GetCached(ctx, keys.RangeMetaKey(roachpb.RKey("b")), false)
if !cachedRange.Desc().StartKey.Equal(splitKey) || !cachedRange.Desc().EndKey.Equal(testMetaEndKey) {
t.Fatalf("expected cached meta2 range to be [%s, %s), actual [%s, %s)",
splitKey, testMetaEndKey, cachedRange.Desc().StartKey, cachedRange.Desc().EndKey)
Expand Down Expand Up @@ -4139,7 +4139,7 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
_, err = ds.sendToReplicas(ctx, ba, tok, false /* withCommit */)
require.IsType(t, sendError{}, err)
require.Regexp(t, "NotLeaseHolderError", err)
cached := rc.GetCached(desc.StartKey, false /* inverted */)
cached := rc.GetCached(ctx, desc.StartKey, false /* inverted */)
if tc.expLeaseholder == 0 {
// Check that the descriptor was removed from the cache.
require.Nil(t, cached)
Expand Down
Loading

0 comments on commit 8bc4417

Please sign in to comment.