Skip to content

Commit

Permalink
Merge #51337
Browse files Browse the repository at this point in the history
51337: colexec: refactor hash aggregator r=yuzefovich a=yuzefovich

**colexec: minor cleanup of the hash table**

This commit improves some of the resetting behavior in the hash table
(now we copy over the whole slices rather than resetting one element at
a time) as well as introduces nicer shorter-named variables for some
slice accesses. It also moves one non-templated method outside of the
template into a regular file.

Release note: None

**colexec: refactor hash aggregator**

This commit refactors the hash aggregator to use the vectorized hash
table instead of Go's map and tightly couples the hash table with the
actual aggregation. The hash table is used in a somewhat similar mode to
how unordered distinct uses it with some crucial differences. Now the
algorithm for online aggregation is as follows:
1. read batch from the input
2. group all tuples from the batch into "equality chains" (this is done
by "probing" the batch against itself - similar to unordered distinct
- but instead of "head" tuples into the hash table, we populate special
"equality" selection vectors as well as a separate selection vector that
contains heads of the equality chains)
3. probe the heads of the equality chains against already existing
buckets in the hash table
4. if there are any matches, it means that all tuples in the
corresponding equality chains belong to existing groups and are
aggregated.
5. all unmatched equality chains form new groups, so we create
a separate bucket for each and aggregate the tuples into it.

The crucial observation here is that we're maintaining a 1-to-1 mapping
between the "heads" of the aggregation groups that are stored in the
hash table and the "bucket" of aggregate function in `buckets` slice.

This fundamental change to the hash aggregator's algorithm shows 4-5x
speedups when the group sizes are small and has tolerable 30-40% hit
when the group sizes are big. Such tradeoff is acceptable since the
absolute speed in the latter case is still very high.

```
Aggregator/MIN/hash/int/groupSize=1/hasNulls=false/numInputBatches=64-24     4.64MB/s ± 1%  24.34MB/s ± 1%  +424.66%  (p=0.000 n=10+10)
Aggregator/MIN/hash/int/groupSize=1/hasNulls=true/numInputBatches=64-24      4.64MB/s ± 1%  23.67MB/s ± 1%  +410.00%  (p=0.000 n=10+10)
Aggregator/MIN/hash/int/groupSize=2/hasNulls=false/numInputBatches=64-24     9.59MB/s ± 1%  45.78MB/s ± 1%  +377.31%  (p=0.000 n=10+10)
Aggregator/MIN/hash/int/groupSize=2/hasNulls=true/numInputBatches=64-24      9.60MB/s ± 1%  44.73MB/s ± 1%  +365.88%  (p=0.000 n=10+10)
Aggregator/MIN/hash/int/groupSize=32/hasNulls=false/numInputBatches=64-24     131MB/s ± 1%    211MB/s ± 0%   +61.13%  (p=0.000 n=10+10)
Aggregator/MIN/hash/int/groupSize=32/hasNulls=true/numInputBatches=64-24      124MB/s ± 1%    197MB/s ± 0%   +58.65%  (p=0.000 n=10+10)
Aggregator/MIN/hash/int/groupSize=128/hasNulls=false/numInputBatches=64-24    314MB/s ± 0%    266MB/s ± 0%   -15.28%  (p=0.000 n=10+10)
Aggregator/MIN/hash/int/groupSize=128/hasNulls=true/numInputBatches=64-24     280MB/s ± 0%    242MB/s ± 0%   -13.50%  (p=0.000 n=10+10)
Aggregator/MIN/hash/int/groupSize=512/hasNulls=false/numInputBatches=64-24    451MB/s ± 0%    282MB/s ± 0%   -37.51%  (p=0.000 n=9+10)
Aggregator/MIN/hash/int/groupSize=512/hasNulls=true/numInputBatches=64-24     382MB/s ± 1%    255MB/s ± 0%   -33.06%  (p=0.000 n=10+10)
Aggregator/MIN/hash/int/groupSize=1024/hasNulls=false/numInputBatches=64-24   471MB/s ± 1%    280MB/s ± 0%   -40.61%  (p=0.000 n=9+10)
Aggregator/MIN/hash/int/groupSize=1024/hasNulls=true/numInputBatches=64-24    400MB/s ± 0%    254MB/s ± 0%   -36.53%  (p=0.000 n=9+10)
```

Release note: None

**colexec: clean up aggregate functions**

This commit does the following:
1. changes the signature of `Flush` method to take in `outputIdx`
argument which is used by the hash aggregate functions to know where to
write its output (this argument is ignored by the ordered aggregate
functions). This allows us to remove one `int` from the hash aggregate
functions which can be noticeable in case of many groups.
2. changes the signature of `Compute` method to take in "disassembled"
batch (separate vectors, input length, and the selection vector). This
allows us to not perform the copies of the equality chains into the
selection vector of the batch in the hash aggregator.
3. extracts base structs that implement the common functionality of
aggregate functions.
4. hashAggregatorAllocSize has been retuned and is now 128 instead of
previous 64.

Note that I prototyped introduction of `aggregateFuncBase` interface
which would be implemented by `orderedAggregateFuncBase` and
`hashAggregateFuncBase` structs for the step 3 above, but it showed
worse performance (slower speed, slightly more allocations), so that
prototype was discarded.

It also moves a couple of cancel checks outside of the for loop as well
as cleans up a few logic test files.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Aug 6, 2020
2 parents d8352cf + 4e2c55b commit 29591de
Show file tree
Hide file tree
Showing 39 changed files with 3,739 additions and 9,299 deletions.
71 changes: 66 additions & 5 deletions pkg/sql/colexec/aggregate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -69,22 +70,82 @@ type aggregateFunc interface {
// Compute computes the aggregation on the input batch.
// Note: the implementations should be careful to account for their memory
// usage.
Compute(batch coldata.Batch, inputIdxs []uint32)
Compute(vecs []coldata.Vec, inputIdxs []uint32, inputLen int, sel []int)

// Flush flushes the result of aggregation on the last group. It should be
// called once after input batches have been Compute()'d.
// called once after input batches have been Compute()'d. outputIdx is only
// used in case of hash aggregation - for ordered aggregation the aggregate
// function itself should maintain the output index to write to.
// Note: the implementations are free to not account for the memory used
// for the result of aggregation of the last group.
Flush()
Flush(outputIdx int)

// HandleEmptyInputScalar populates the output for a case of an empty input
// when the aggregate function is in scalar context. The output must always
// be a single value (either null or zero, depending on the function).
// TODO(yuzefovich): we can pull scratch field of aggregates into a shared
// aggregator and implement this method once on the shared base.
HandleEmptyInputScalar()
}

type orderedAggregateFuncBase struct {
groups []bool
// curIdx tracks the current output index of this function.
curIdx int
// nulls is the nulls vector of the output vector of this function.
nulls *coldata.Nulls
}

func (o *orderedAggregateFuncBase) Init(groups []bool, vec coldata.Vec) {
o.groups = groups
o.nulls = vec.Nulls()
}

func (o *orderedAggregateFuncBase) Reset() {
o.curIdx = 0
o.nulls.UnsetNulls()
}

func (o *orderedAggregateFuncBase) CurrentOutputIndex() int {
return o.curIdx
}

func (o *orderedAggregateFuncBase) SetOutputIndex(idx int) {
o.curIdx = idx
}

func (o *orderedAggregateFuncBase) HandleEmptyInputScalar() {
// Most aggregate functions return a single NULL value on an empty input
// in the scalar context (the exceptions are COUNT aggregates which need
// to overwrite this method).
o.nulls.SetNull(0)
}

type hashAggregateFuncBase struct {
// nulls is the nulls vector of the output vector of this function.
nulls *coldata.Nulls
}

func (h *hashAggregateFuncBase) Init(_ []bool, vec coldata.Vec) {
h.nulls = vec.Nulls()
}

func (h *hashAggregateFuncBase) Reset() {
h.nulls.UnsetNulls()
}

func (h *hashAggregateFuncBase) CurrentOutputIndex() int {
colexecerror.InternalError("CurrentOutputIndex called with hash aggregation")
// This code is unreachable, but the compiler cannot infer that.
return 0
}

func (h *hashAggregateFuncBase) SetOutputIndex(int) {
colexecerror.InternalError("SetOutputIndex called with hash aggregation")
}

func (h *hashAggregateFuncBase) HandleEmptyInputScalar() {
colexecerror.InternalError("HandleEmptyInputScalar called with hash aggregation")
}

// aggregateFuncAlloc is an aggregate function allocator that pools allocations
// of the structs of the same statically-typed aggregate function.
type aggregateFuncAlloc interface {
Expand Down
31 changes: 31 additions & 0 deletions pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,37 @@ func TestAggregatorOneFunc(t *testing.T) {
name: "UnorderedWithNullsInGroupingCol",
unorderedInput: true,
},
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_COUNT_ROWS,
},
aggCols: [][]uint32{{0}, {}},
typs: []*types.T{types.Int},
unorderedInput: true,
input: tuples{
{1},
{2},
{1},
{nil},
{3},
{1},
{3},
{4},
{1},
{nil},
{2},
{4},
{2},
},
expected: tuples{
{nil, 2},
{1, 4},
{2, 3},
{3, 2},
{4, 2},
},
},
}

// Run tests with deliberate batch sizes and no selection vectors.
Expand Down
55 changes: 28 additions & 27 deletions pkg/sql/colexec/any_not_null_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ func newAnyNotNull_AGGKINDAggAlloc(
// anyNotNull_TYPE_AGGKINDAgg implements the ANY_NOT_NULL aggregate, returning the
// first non-null value in the input column.
type anyNotNull_TYPE_AGGKINDAgg struct {
allocator *colmem.Allocator
// {{if eq "_AGGKIND" "Ordered"}}
groups []bool
orderedAggregateFuncBase
// {{else}}
hashAggregateFuncBase
// {{end}}
allocator *colmem.Allocator
vec coldata.Vec
col _GOTYPESLICE
nulls *coldata.Nulls
curIdx int
curAgg _GOTYPE
foundNonNullForCurrentGroup bool
}
Expand All @@ -87,29 +87,27 @@ const sizeOfAnyNotNull_TYPE_AGGKINDAgg = int64(unsafe.Sizeof(anyNotNull_TYPE_AGG

func (a *anyNotNull_TYPE_AGGKINDAgg) Init(groups []bool, vec coldata.Vec) {
// {{if eq "_AGGKIND" "Ordered"}}
a.groups = groups
a.orderedAggregateFuncBase.Init(groups, vec)
// {{else}}
a.hashAggregateFuncBase.Init(groups, vec)
// {{end}}
a.vec = vec
a.col = vec.TemplateType()
a.nulls = vec.Nulls()
a.Reset()
}

func (a *anyNotNull_TYPE_AGGKINDAgg) Reset() {
a.curIdx = 0
// {{if eq "_AGGKIND" "Ordered"}}
a.orderedAggregateFuncBase.Reset()
// {{else}}
a.hashAggregateFuncBase.Reset()
// {{end}}
a.foundNonNullForCurrentGroup = false
a.nulls.UnsetNulls()
}

func (a *anyNotNull_TYPE_AGGKINDAgg) CurrentOutputIndex() int {
return a.curIdx
}

func (a *anyNotNull_TYPE_AGGKINDAgg) SetOutputIndex(idx int) {
a.curIdx = idx
}

func (a *anyNotNull_TYPE_AGGKINDAgg) Compute(b coldata.Batch, inputIdxs []uint32) {
func (a *anyNotNull_TYPE_AGGKINDAgg) Compute(
vecs []coldata.Vec, inputIdxs []uint32, inputLen int, sel []int,
) {
// {{if eq "_AGGKIND" "Hash"}}
if a.foundNonNullForCurrentGroup {
// We have already seen non-null for the current group, and since there
Expand All @@ -119,8 +117,7 @@ func (a *anyNotNull_TYPE_AGGKINDAgg) Compute(b coldata.Batch, inputIdxs []uint32
}
// {{end}}

inputLen := b.Length()
vec, sel := b.ColVec(int(inputIdxs[0])), b.Selection()
vec := vecs[inputIdxs[0]]
col, nulls := vec.TemplateType(), vec.Nulls()

a.allocator.PerformOperation(
Expand Down Expand Up @@ -166,19 +163,23 @@ func (a *anyNotNull_TYPE_AGGKINDAgg) Compute(b coldata.Batch, inputIdxs []uint32
)
}

func (a *anyNotNull_TYPE_AGGKINDAgg) Flush() {
func (a *anyNotNull_TYPE_AGGKINDAgg) Flush(outputIdx int) {
// If we haven't found any non-nulls for this group so far, the output for
// this group should be null.
// {{if eq "_AGGKIND" "Ordered"}}
// Go around "argument overwritten before first use" linter error.
_ = outputIdx
outputIdx = a.curIdx
a.curIdx++
// {{end}}
if !a.foundNonNullForCurrentGroup {
a.nulls.SetNull(a.curIdx)
a.nulls.SetNull(outputIdx)
} else {
execgen.SET(a.col, a.curIdx, a.curAgg)
// TODO(yuzefovich): think about whether it is ok for this SET call to
// not be registered with the allocator on types with variable sizes
// (e.g. Bytes).
execgen.SET(a.col, outputIdx, a.curAgg)
}
a.curIdx++
}

func (a *anyNotNull_TYPE_AGGKINDAgg) HandleEmptyInputScalar() {
a.nulls.SetNull(0)
}

type anyNotNull_TYPE_AGGKINDAggAlloc struct {
Expand Down
61 changes: 29 additions & 32 deletions pkg/sql/colexec/avg_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ func newAvg_AGGKINDAggAlloc(

type avg_TYPE_AGGKINDAgg struct {
// {{if eq "_AGGKIND" "Ordered"}}
groups []bool
orderedAggregateFuncBase
// {{else}}
hashAggregateFuncBase
// {{end}}
scratch struct {
curIdx int
// curSum keeps track of the sum of elements belonging to the current group,
// so we can index into the slice once per group, instead of on each
// iteration.
Expand All @@ -89,8 +90,6 @@ type avg_TYPE_AGGKINDAgg struct {
curCount int64
// vec points to the output vector.
vec []_RET_GOTYPE
// nulls points to the output null vector that we are updating.
nulls *coldata.Nulls
// foundNonNullForCurrentGroup tracks if we have seen any non-null values
// for the group that is currently being aggregated.
foundNonNullForCurrentGroup bool
Expand All @@ -109,32 +108,30 @@ var _ aggregateFunc = &avg_TYPE_AGGKINDAgg{}

const sizeOfAvg_TYPE_AGGKINDAgg = int64(unsafe.Sizeof(avg_TYPE_AGGKINDAgg{}))

func (a *avg_TYPE_AGGKINDAgg) Init(groups []bool, v coldata.Vec) {
func (a *avg_TYPE_AGGKINDAgg) Init(groups []bool, vec coldata.Vec) {
// {{if eq "_AGGKIND" "Ordered"}}
a.groups = groups
a.orderedAggregateFuncBase.Init(groups, vec)
// {{else}}
a.hashAggregateFuncBase.Init(groups, vec)
// {{end}}
a.scratch.vec = v._RET_TYPE()
a.scratch.nulls = v.Nulls()
a.scratch.vec = vec._RET_TYPE()
a.Reset()
}

func (a *avg_TYPE_AGGKINDAgg) Reset() {
a.scratch.curIdx = 0
// {{if eq "_AGGKIND" "Ordered"}}
a.orderedAggregateFuncBase.Reset()
// {{else}}
a.hashAggregateFuncBase.Reset()
// {{end}}
a.scratch.curSum = zero_RET_TYPEValue
a.scratch.curCount = 0
a.scratch.foundNonNullForCurrentGroup = false
a.scratch.nulls.UnsetNulls()
}

func (a *avg_TYPE_AGGKINDAgg) CurrentOutputIndex() int {
return a.scratch.curIdx
}

func (a *avg_TYPE_AGGKINDAgg) SetOutputIndex(idx int) {
a.scratch.curIdx = idx
}

func (a *avg_TYPE_AGGKINDAgg) Compute(b coldata.Batch, inputIdxs []uint32) {
func (a *avg_TYPE_AGGKINDAgg) Compute(
vecs []coldata.Vec, inputIdxs []uint32, inputLen int, sel []int,
) {
// {{if .NeedsHelper}}
// {{/*
// overloadHelper is used only when we perform the summation of integers
Expand All @@ -145,8 +142,7 @@ func (a *avg_TYPE_AGGKINDAgg) Compute(b coldata.Batch, inputIdxs []uint32) {
// "_overloadHelper" local variable of type "overloadHelper".
_overloadHelper := a.overloadHelper
// {{end}}
inputLen := b.Length()
vec, sel := b.ColVec(int(inputIdxs[0])), b.Selection()
vec := vecs[inputIdxs[0]]
col, nulls := vec.TemplateType(), vec.Nulls()
if nulls.MaybeHasNulls() {
if sel != nil {
Expand Down Expand Up @@ -175,20 +171,21 @@ func (a *avg_TYPE_AGGKINDAgg) Compute(b coldata.Batch, inputIdxs []uint32) {
}
}

func (a *avg_TYPE_AGGKINDAgg) Flush() {
func (a *avg_TYPE_AGGKINDAgg) Flush(outputIdx int) {
// The aggregation is finished. Flush the last value. If we haven't found
// any non-nulls for this group so far, the output for this group should be
// NULL.
// {{if eq "_AGGKIND" "Ordered"}}
// Go around "argument overwritten before first use" linter error.
_ = outputIdx
outputIdx = a.curIdx
a.curIdx++
// {{end}}
if !a.scratch.foundNonNullForCurrentGroup {
a.scratch.nulls.SetNull(a.scratch.curIdx)
a.nulls.SetNull(outputIdx)
} else {
_ASSIGN_DIV_INT64(a.scratch.vec[a.scratch.curIdx], a.scratch.curSum, a.scratch.curCount, a.scratch.vec, _, _)
_ASSIGN_DIV_INT64(a.scratch.vec[outputIdx], a.scratch.curSum, a.scratch.curCount, a.scratch.vec, _, _)
}
a.scratch.curIdx++
}

func (a *avg_TYPE_AGGKINDAgg) HandleEmptyInputScalar() {
a.scratch.nulls.SetNull(0)
}

type avg_TYPE_AGGKINDAggAlloc struct {
Expand Down Expand Up @@ -223,13 +220,13 @@ func _ACCUMULATE_AVG(a *_AGG_TYPE_AGGKINDAgg, nulls *coldata.Nulls, i int, _HAS_
// If we encounter a new group, and we haven't found any non-nulls for the
// current group, the output for this group should be null.
if !a.scratch.foundNonNullForCurrentGroup {
a.scratch.nulls.SetNull(a.scratch.curIdx)
a.nulls.SetNull(a.curIdx)
} else {
// {{with .Global}}
_ASSIGN_DIV_INT64(a.scratch.vec[a.scratch.curIdx], a.scratch.curSum, a.scratch.curCount, a.scratch.vec, _, _)
_ASSIGN_DIV_INT64(a.scratch.vec[a.curIdx], a.scratch.curSum, a.scratch.curCount, a.scratch.vec, _, _)
// {{end}}
}
a.scratch.curIdx++
a.curIdx++
// {{with .Global}}
a.scratch.curSum = zero_RET_TYPEValue
// {{end}}
Expand Down
Loading

0 comments on commit 29591de

Please sign in to comment.