From 08b282af5438eebc716c1fbecd0b3a5e6c4bb2dc Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 14 Aug 2020 13:50:52 -0700 Subject: [PATCH] colexec: add support for DISTINCT and FILTER hash aggregation This commit adds the support of DISTINCT and FILTERing hash aggregation. The approach is as follows: - to handle FILTER we run a selection operator on the input state - to handle DISTINCT we encode aggregation columns, one tuple at a time, and update the selection vector to include tuples we haven't yet seen - then we run the aggregation on the remaining selected tuples - and then restore the state with the original length and selection vector. Such handling of FILTER clause sounds good to me, but the handling of DISTINCT is somewhat unfortunate: we perform encoding one tuple at a time. Other approaches have been prototyped but showed worse performance: - using the vectorized hash table - the benefit of such approach is that we don't reduce ourselves to one tuple at a time (because we would be hashing the full columns at once), but the big disadvantage is that the full tuples are stored in the hash table (instead of an encoded representation) - using a single global map for a particular aggregate function that is shared among all aggregation groups - the benefit of such approach is that we only have a handful of map, but it turned out that such global map grows a lot bigger and has worse performance. Release note (sql change): Vectorized execution engine now natively supports DISTINCT and FILTERing hash aggregation. --- pkg/sql/colexec/aggregate_funcs.go | 6 +- pkg/sql/colexec/aggregators_test.go | 235 ++++++++++- pkg/sql/colexec/colbuilder/execplan.go | 51 ++- pkg/sql/colexec/hash_aggregator.go | 69 ++-- pkg/sql/colexec/hash_aggregator_util.go | 434 +++++++++++++++++++++ pkg/sql/colexec/ordered_aggregator.go | 8 + pkg/sql/colexec/vec_to_datum.eg.go | 20 +- pkg/sql/colexec/vec_to_datum_tmpl.go | 20 +- pkg/sql/distsql/columnar_operators_test.go | 283 ++++++++------ 9 files changed, 936 insertions(+), 190 deletions(-) create mode 100644 pkg/sql/colexec/hash_aggregator_util.go diff --git a/pkg/sql/colexec/aggregate_funcs.go b/pkg/sql/colexec/aggregate_funcs.go index 107b9e333fb4..ce0be3345717 100644 --- a/pkg/sql/colexec/aggregate_funcs.go +++ b/pkg/sql/colexec/aggregate_funcs.go @@ -308,8 +308,8 @@ func newAggregateFuncsAlloc( } // sizeOfAggregateFunc is the size of some aggregateFunc implementation. -// countAgg was chosen arbitrarily, but it's important that we use a pointer to -// the aggregate function struct. +// countHashAgg was chosen arbitrarily, but it's important that we use a +// pointer to the aggregate function struct. const sizeOfAggregateFunc = int64(unsafe.Sizeof(&countHashAgg{})) func (a *aggregateFuncsAlloc) makeAggregateFuncs() []aggregateFunc { @@ -319,7 +319,7 @@ func (a *aggregateFuncsAlloc) makeAggregateFuncs() []aggregateFunc { // of 'allocSize x number of funcs in schema' length. Every // aggFuncAlloc will allocate allocSize of objects on the newAggFunc // call below. - a.allocator.AdjustMemoryUsage(sizeOfAggregateFunc * a.allocSize) + a.allocator.AdjustMemoryUsage(sizeOfAggregateFunc * int64(len(a.aggFuncAllocs)) * a.allocSize) a.returnFuncs = make([]aggregateFunc, len(a.aggFuncAllocs)*int(a.allocSize)) } funcs := a.returnFuncs[:len(a.aggFuncAllocs)] diff --git a/pkg/sql/colexec/aggregators_test.go b/pkg/sql/colexec/aggregators_test.go index 54a1eddb4e53..38a6e568eea9 100644 --- a/pkg/sql/colexec/aggregators_test.go +++ b/pkg/sql/colexec/aggregators_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -51,6 +52,8 @@ type aggregatorTestCase struct { constArguments [][]execinfrapb.Expression // spec will be populated during init(). spec *execinfrapb.AggregatorSpec + aggDistinct []bool + aggFilter []int input tuples unorderedInput bool expected tuples @@ -95,7 +98,7 @@ var aggTypes = []aggType{ outputTypes []*types.T, _ bool, ) (colexecbase.Operator, error) { - return NewHashAggregator(allocator, input, inputTypes, spec, evalCtx, constructors, constArguments, outputTypes) + return NewHashAggregator(allocator, testMemAcc, input, inputTypes, spec, evalCtx, constructors, constArguments, outputTypes) }, name: "hash", }, @@ -154,6 +157,13 @@ func (tc *aggregatorTestCase) init() error { if tc.constArguments != nil { aggregations[i].Arguments = tc.constArguments[i] } + if tc.aggDistinct != nil { + aggregations[i].Distinct = tc.aggDistinct[i] + } + if tc.aggFilter != nil && tc.aggFilter[i] != tree.NoColumnIdx { + filterColIdx := uint32(tc.aggFilter[i]) + aggregations[i].FilterColIdx = &filterColIdx + } } tc.spec = &execinfrapb.AggregatorSpec{ GroupCols: tc.groupCols, @@ -679,12 +689,133 @@ func TestAggregatorAllFunctions(t *testing.T) { }, convToDecimal: true, }, + + // Test DISTINCT aggregation. + { + aggFns: []execinfrapb.AggregatorSpec_Func{ + execinfrapb.AggregatorSpec_ANY_NOT_NULL, + execinfrapb.AggregatorSpec_COUNT, + execinfrapb.AggregatorSpec_COUNT, + execinfrapb.AggregatorSpec_SUM_INT, + execinfrapb.AggregatorSpec_SUM_INT, + }, + aggCols: [][]uint32{{0}, {1}, {1}, {1}, {1}}, + aggDistinct: []bool{false, false, true, false, true}, + typs: []*types.T{types.Int, types.Int}, + input: tuples{ + {0, 1}, + {0, 2}, + {0, 2}, + {0, nil}, + {0, 1}, + {0, nil}, + {1, 1}, + {1, 2}, + {1, 2}, + }, + expected: tuples{ + {0, 4, 2, 6, 3}, + {1, 3, 2, 5, 3}, + }, + }, + + // Test aggregation with FILTERs. + { + aggFns: []execinfrapb.AggregatorSpec_Func{ + execinfrapb.AggregatorSpec_ANY_NOT_NULL, + execinfrapb.AggregatorSpec_COUNT_ROWS, + execinfrapb.AggregatorSpec_SUM_INT, + }, + aggCols: [][]uint32{{0}, {}, {1}}, + aggFilter: []int{tree.NoColumnIdx, 2, 2}, + typs: []*types.T{types.Int, types.Int, types.Bool}, + input: tuples{ + {0, 1, false}, + {0, 2, true}, + {0, 2, true}, + {0, nil, nil}, + {0, 1, nil}, + {0, nil, true}, + {1, 1, true}, + {1, 2, nil}, + {1, 2, true}, + }, + expected: tuples{ + {0, 3, 4}, + {1, 2, 3}, + }, + }, + + // Test aggregation with FILTERs when the whole groups are filtered out. + { + aggFns: []execinfrapb.AggregatorSpec_Func{ + execinfrapb.AggregatorSpec_ANY_NOT_NULL, + execinfrapb.AggregatorSpec_COUNT_ROWS, + execinfrapb.AggregatorSpec_SUM_INT, + }, + aggCols: [][]uint32{{0}, {}, {1}}, + aggFilter: []int{tree.NoColumnIdx, 2, 2}, + typs: []*types.T{types.Int, types.Int, types.Bool}, + input: tuples{ + {0, 1, false}, + {0, nil, nil}, + {0, 2, false}, + {1, 1, true}, + {1, 2, nil}, + {1, 2, true}, + {2, 1, false}, + {2, nil, nil}, + {2, 2, nil}, + }, + expected: tuples{ + {0, 0, nil}, + {1, 2, 3}, + {2, 0, nil}, + }, + }, + + // Test aggregation with FILTERs and DISTINCTs intertwined. + { + aggFns: []execinfrapb.AggregatorSpec_Func{ + execinfrapb.AggregatorSpec_ANY_NOT_NULL, + execinfrapb.AggregatorSpec_COUNT, + execinfrapb.AggregatorSpec_COUNT, + execinfrapb.AggregatorSpec_COUNT, + execinfrapb.AggregatorSpec_SUM_INT, + execinfrapb.AggregatorSpec_SUM_INT, + execinfrapb.AggregatorSpec_SUM_INT, + }, + aggCols: [][]uint32{{0}, {1}, {1}, {1}, {1}, {1}, {1}}, + aggDistinct: []bool{false, false, true, true, false, true, true}, + aggFilter: []int{tree.NoColumnIdx, 2, tree.NoColumnIdx, 2, 2, tree.NoColumnIdx, 2}, + typs: []*types.T{types.Int, types.Int, types.Bool}, + input: tuples{ + {0, 1, false}, + {0, 2, true}, + {0, 2, true}, + {0, nil, nil}, + {0, 1, nil}, + {0, nil, true}, + {1, 1, true}, + {1, 2, nil}, + {1, 2, true}, + }, + expected: tuples{ + {0, 2, 2, 1, 4, 3, 2}, + {1, 2, 2, 2, 3, 3, 3}, + }, + }, } evalCtx := tree.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) defer evalCtx.Stop(context.Background()) for _, agg := range aggTypes { for i, tc := range testCases { + if agg.name != "hash" && (tc.aggDistinct != nil || tc.aggFilter != nil) { + // Distinct or filtering aggregation is only supported with + // hash aggregator. + continue + } log.Infof(context.Background(), "%s/%d", agg.name, i) if err := tc.init(); err != nil { t.Fatal(err) @@ -993,6 +1124,104 @@ func BenchmarkAllOptimizedAggregateFunctions(b *testing.B) { } } +func BenchmarkDistinctAggregation(b *testing.B) { + rng, _ := randutil.NewPseudoRand() + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + evalCtx := tree.MakeTestingEvalContext(st) + defer evalCtx.Stop(ctx) + flowCtx := &execinfra.FlowCtx{ + EvalCtx: &evalCtx, + Cfg: &execinfra.ServerConfig{ + Settings: st, + }, + } + + typs := []*types.T{types.Int, types.Int} + aggFn := execinfrapb.AggregatorSpec_COUNT + aggSpec := &execinfrapb.AggregatorSpec{ + Type: execinfrapb.AggregatorSpec_NON_SCALAR, + GroupCols: []uint32{0}, + // TODO(yuzefovich): adjust the spec once we support distinct ordered + // aggregation. + Aggregations: []execinfrapb.AggregatorSpec_Aggregation{{ + Func: aggFn, + Distinct: true, + ColIdx: []uint32{1}, + }}, + } + spec := &execinfrapb.ProcessorSpec{ + Input: []execinfrapb.InputSyncSpec{{ColumnTypes: typs}}, + Core: execinfrapb.ProcessorCoreUnion{ + Aggregator: aggSpec, + }, + } + args := &NewColOperatorArgs{ + Spec: spec, + StreamingMemAccount: testMemAcc, + } + args.TestingKnobs.UseStreamingMemAccountForBuffering = true + + for _, groupSize := range []int{1, 2, 32, 128, coldata.BatchSize() / 2, coldata.BatchSize()} { + for _, distinctProbability := range []float64{0.01, 0.1, 1.0} { + distinctModulo := int(1.0 / distinctProbability) + if (groupSize == 1 && distinctProbability != 1.0) || float64(groupSize)/float64(distinctModulo) < 0.1 { + // We have a such combination of groupSize and + // distinctProbability parameters that we will be very + // unlikely to satisfy them (for example, with groupSize=1 + // and distinctProbability=0.01, every value will be + // distinct within the group), so we skip such + // configuration. + continue + } + for _, hasNulls := range []bool{false, true} { + for _, numInputBatches := range []int{64} { + // TODO(yuzefovich): refactor benchmarkAggregateFunction to + // be more configurable and reuse it here. + b.Run(fmt.Sprintf("%s/groupSize=%d/distinctProb=%.2f/nulls=%t", + aggFn, groupSize, distinctProbability, hasNulls), + func(b *testing.B) { + nTuples := numInputBatches * coldata.BatchSize() + cols := []coldata.Vec{ + testAllocator.NewMemColumn(typs[0], nTuples), + testAllocator.NewMemColumn(typs[1], nTuples), + } + groups := cols[0].Int64() + vals := cols[1].Int64() + nGroups := nTuples / groupSize + for i := 0; i < nTuples; i++ { + groups[i] = int64(rng.Intn(nGroups)) + vals[i] = int64(rng.Intn(distinctModulo)) + if hasNulls && rng.Float64() < nullProbability { + cols[1].Nulls().SetNull(i) + } + } + source := newChunkingBatchSource(typs, cols, nTuples) + args.Inputs = []colexecbase.Operator{source} + r, err := TestNewColOperator(ctx, flowCtx, args) + if err != nil { + b.Fatal(err) + } + + a := r.Op + a.Init() + b.ResetTimer() + // Only count the aggregation column. + b.SetBytes(int64(8 * nTuples)) + for i := 0; i < b.N; i++ { + // Exhaust aggregator until all batches have been read. + for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) { + } + a.(ResettableOperator).reset(ctx) + } + }, + ) + } + } + } + } +} + func TestHashAggregator(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1116,8 +1345,8 @@ func TestHashAggregator(t *testing.T) { log.Infof(context.Background(), "numOfHashBuckets=%d", numOfHashBuckets) runTests(t, []tuples{tc.input}, tc.expected, unorderedVerifier, func(sources []colexecbase.Operator) (colexecbase.Operator, error) { a, err := NewHashAggregator( - testAllocator, sources[0], tc.typs, tc.spec, &evalCtx, - constructors, constArguments, outputTypes, + testAllocator, testMemAcc, sources[0], tc.typs, tc.spec, + &evalCtx, constructors, constArguments, outputTypes, ) a.(*hashAggregator).testingKnobs.numOfHashBuckets = uint64(numOfHashBuckets) return a, err diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 89739fc49b90..f1b66d66bcfa 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -127,6 +127,23 @@ func (r *opResult) resetToState(ctx context.Context, arg colexec.NewColOperatorR *r.NewColOperatorResult = arg } +func needHashAggregator(aggSpec *execinfrapb.AggregatorSpec) (bool, error) { + var groupCols, orderedCols util.FastIntSet + for _, col := range aggSpec.OrderedGroupCols { + orderedCols.Add(int(col)) + } + for _, col := range aggSpec.GroupCols { + if !orderedCols.Contains(int(col)) { + return true, nil + } + groupCols.Add(int(col)) + } + if !orderedCols.SubsetOf(groupCols) { + return false, errors.AssertionFailedf("ordered cols must be a subset of grouping cols") + } + return false, nil +} + // isSupported checks whether we have a columnar operator equivalent to a // processor described by spec. Note that it doesn't perform any other checks // (like validity of the number of inputs). @@ -153,12 +170,16 @@ func isSupported(mode sessiondata.VectorizeExecMode, spec *execinfrapb.Processor case core.Aggregator != nil: aggSpec := core.Aggregator + needHash, err := needHashAggregator(aggSpec) + if err != nil { + return err + } for _, agg := range aggSpec.Aggregations { - if agg.Distinct { - return errors.Newf("distinct aggregation not supported") + if agg.Distinct && !needHash { + return errors.Newf("distinct ordered aggregation not supported") } - if agg.FilterColIdx != nil { - return errors.Newf("filtering aggregation not supported") + if agg.FilterColIdx != nil && !needHash { + return errors.Newf("filtering ordered aggregation not supported") } } return nil @@ -649,21 +670,11 @@ func NewColOperator( break } - var groupCols, orderedCols util.FastIntSet - for _, col := range aggSpec.OrderedGroupCols { - orderedCols.Add(int(col)) - } - needHash := false - for _, col := range aggSpec.GroupCols { - if !orderedCols.Contains(int(col)) { - needHash = true - } - groupCols.Add(int(col)) - } - if !orderedCols.SubsetOf(groupCols) { - return r, errors.AssertionFailedf("ordered cols must be a subset of grouping cols") + var needHash bool + needHash, err = needHashAggregator(aggSpec) + if err != nil { + return r, err } - inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes)) copy(inputTypes, spec.Input[0].ColumnTypes) evalCtx := flowCtx.NewEvalCtx() @@ -691,8 +702,8 @@ func NewColOperator( evalCtx.SingleDatumAggMemAccount = hashAggregatorMemAccount result.Op, err = colexec.NewHashAggregator( colmem.NewAllocator(ctx, hashAggregatorMemAccount, factory), - inputs[0], inputTypes, aggSpec, evalCtx, constructors, - constArguments, result.ColumnTypes, + hashAggregatorMemAccount, inputs[0], inputTypes, aggSpec, + evalCtx, constructors, constArguments, result.ColumnTypes, ) } else { evalCtx.SingleDatumAggMemAccount = streamingMemAccount diff --git a/pkg/sql/colexec/hash_aggregator.go b/pkg/sql/colexec/hash_aggregator.go index 84c7e3bc9fdd..caf6bd42f92f 100644 --- a/pkg/sql/colexec/hash_aggregator.go +++ b/pkg/sql/colexec/hash_aggregator.go @@ -20,7 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/mon" ) // hashAggregatorState represents the state of the hash aggregator operator. @@ -57,6 +59,7 @@ type hashAggregator struct { allocator *colmem.Allocator spec *execinfrapb.AggregatorSpec + aggHelper hashAggregatorHelper inputTypes []*types.T outputTypes []*types.T inputArgsConverter *vecToDatumConverter @@ -64,7 +67,7 @@ type hashAggregator struct { // buckets contains all aggregation groups that we have so far. There is // 1-to-1 mapping between buckets[i] and ht.vals[i]. Once the output from // the buckets has been flushed, buckets will be sliced up accordingly. - buckets []*hashAggFuncs + buckets []*hashAggBucket // ht stores tuples that are "heads" of the corresponding aggregation // groups ("head" here means the tuple that was first seen from the group). ht *hashTable @@ -96,7 +99,8 @@ type hashAggregator struct { } aggFnsAlloc *aggregateFuncsAlloc - hashAlloc hashAggFuncsAlloc + hashAlloc hashAggBucketAlloc + datumAlloc sqlbase.DatumAlloc toClose Closers } @@ -111,8 +115,11 @@ const hashAggregatorAllocSize = 128 // NewHashAggregator creates a hash aggregator on the given grouping columns. // The input specifications to this function are the same as that of the // NewOrderedAggregator function. +// memAccount should be the same as the one used by allocator and will be used +// by hashAggregatorHelper to handle DISTINCT clause. func NewHashAggregator( allocator *colmem.Allocator, + memAccount *mon.BoundAccount, input colexecbase.Operator, inputTypes []*types.T, spec *execinfrapb.AggregatorSpec, @@ -135,8 +142,10 @@ func NewHashAggregator( inputArgsConverter: inputArgsConverter, toClose: toClose, aggFnsAlloc: aggFnsAlloc, - hashAlloc: hashAggFuncsAlloc{allocator: allocator}, + hashAlloc: hashAggBucketAlloc{allocator: allocator}, } + hashAgg.datumAlloc.AllocSize = hashAggregatorAllocSize + hashAgg.aggHelper = newHashAggregatorHelper(allocator, memAccount, inputTypes, spec, &hashAgg.datumAlloc) return hashAgg, err } @@ -308,7 +317,7 @@ func (op *hashAggregator) onlineAgg(ctx context.Context, b coldata.Batch) { // group. eqChain := op.scratch.eqChains[eqChainsSlot] bucket := op.buckets[headID-1] - bucket.compute(inputVecs, op.spec, len(eqChain), eqChain) + op.aggHelper.performAggregation(ctx, inputVecs, len(eqChain), eqChain, bucket) // We have fully processed this equality chain, so we need to // reset its length. op.scratch.eqChains[eqChainsSlot] = op.scratch.eqChains[eqChainsSlot][:0] @@ -329,13 +338,10 @@ func (op *hashAggregator) onlineAgg(ctx context.Context, b coldata.Batch) { // so we'll create a new bucket and make sure that the head of this // equality chain is appended to the hash table in the // corresponding position. - bucket := op.hashAlloc.newHashAggFuncs() - bucket.fns = op.aggFnsAlloc.makeAggregateFuncs() + bucket := op.hashAlloc.newHashAggBucket() op.buckets = append(op.buckets, bucket) - // bucket knows that all selected tuples in b belong to the same - // single group, so we can pass 'nil' for the first argument. - bucket.init(nil /* group */, op.output) - bucket.compute(inputVecs, op.spec, len(eqChain), eqChain) + bucket.init(op.output, op.aggFnsAlloc.makeAggregateFuncs(), op.aggHelper.makeSeenMaps()) + op.aggHelper.performAggregation(ctx, inputVecs, len(eqChain), eqChain, bucket) newGroupsHeadsSel = append(newGroupsHeadsSel, eqChainsHeads[eqChainSlot]) // We need to compact the hash buffer according to the new groups // head tuples selection vector we're building. @@ -371,39 +377,40 @@ func (op *hashAggregator) Close(ctx context.Context) error { return op.toClose.Close(ctx) } -// hashAggFuncs stores the aggregation functions for the corresponding -// aggregation group. -type hashAggFuncs struct { +// hashAggBucket stores the aggregation functions for the corresponding +// aggregation group as well as other utility information. +type hashAggBucket struct { fns []aggregateFunc + // seen is a slice of maps used to handle distinct aggregation. A + // corresponding entry in the slice is nil if the function doesn't have a + // DISTINCT clause. The slice itself will be nil whenever no aggregate + // function has a DISTINCT clause. + seen []map[string]struct{} } -const sizeOfHashAggFuncs = unsafe.Sizeof(hashAggFuncs{}) +const sizeOfHashAggBucket = unsafe.Sizeof(hashAggBucket{}) -func (v *hashAggFuncs) init(group []bool, b coldata.Batch) { +func (v *hashAggBucket) init(b coldata.Batch, fns []aggregateFunc, seen []map[string]struct{}) { + v.fns = fns for fnIdx, fn := range v.fns { - fn.Init(group, b.ColVec(fnIdx)) + // We know that all selected tuples in b belong to the same single + // group, so we can pass 'nil' for the first argument. + fn.Init(nil /* groups */, b.ColVec(fnIdx)) } + v.seen = seen } -func (v *hashAggFuncs) compute( - vecs []coldata.Vec, spec *execinfrapb.AggregatorSpec, inputLen int, sel []int, -) { - for fnIdx, fn := range v.fns { - fn.Compute(vecs, spec.Aggregations[fnIdx].ColIdx, inputLen, sel) - } -} - -// hashAggFuncsAlloc is a utility struct that batches allocations of -// hashAggFuncs. -type hashAggFuncsAlloc struct { +// hashAggBucketAlloc is a utility struct that batches allocations of +// hashAggBuckets. +type hashAggBucketAlloc struct { allocator *colmem.Allocator - buf []hashAggFuncs + buf []hashAggBucket } -func (a *hashAggFuncsAlloc) newHashAggFuncs() *hashAggFuncs { +func (a *hashAggBucketAlloc) newHashAggBucket() *hashAggBucket { if len(a.buf) == 0 { - a.allocator.AdjustMemoryUsage(int64(hashAggregatorAllocSize * sizeOfHashAggFuncs)) - a.buf = make([]hashAggFuncs, hashAggregatorAllocSize) + a.allocator.AdjustMemoryUsage(int64(hashAggregatorAllocSize * sizeOfHashAggBucket)) + a.buf = make([]hashAggBucket, hashAggregatorAllocSize) } ret := &a.buf[0] a.buf = a.buf[1:] diff --git a/pkg/sql/colexec/hash_aggregator_util.go b/pkg/sql/colexec/hash_aggregator_util.go new file mode 100644 index 000000000000..6695b9a3a995 --- /dev/null +++ b/pkg/sql/colexec/hash_aggregator_util.go @@ -0,0 +1,434 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colexec + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/stringarena" +) + +// hashAggregatorHelper is a helper for the hash aggregator that facilitates +// the selection of tuples on which to perform the aggregation. +type hashAggregatorHelper interface { + // makeSeenMaps returns a slice of maps used to handle distinct aggregation + // of a single aggregation bucket. A corresponding entry in the slice is + // nil if the function doesn't have a DISTINCT clause. The slice itself + // will be nil whenever no aggregate function has a DISTINCT clause. + makeSeenMaps() []map[string]struct{} + // performAggregation performs aggregation of all functions in bucket on + // tuples in vecs that are relevant for each function (meaning that only + // tuples that pass the criteria - like DISTINCT and/or FILTER will be + // aggregated). + performAggregation(ctx context.Context, vecs []coldata.Vec, inputLen int, sel []int, bucket *hashAggBucket) +} + +// newHashAggregatorHelper creates a new hashAggregatorHelper based on the +// provided aggregator specification. If there are no functions that perform +// either DISTINCT or FILTER aggregation, then the defaultHashAggregatorHelper +// is returned which has negligible performance overhead. +func newHashAggregatorHelper( + allocator *colmem.Allocator, + memAccount *mon.BoundAccount, + inputTypes []*types.T, + spec *execinfrapb.AggregatorSpec, + datumAlloc *sqlbase.DatumAlloc, +) hashAggregatorHelper { + hasDistinct, hasFilterAgg := false, false + aggFilter := make([]int, len(spec.Aggregations)) + for i, aggFn := range spec.Aggregations { + if aggFn.Distinct { + hasDistinct = true + } + if aggFn.FilterColIdx != nil { + aggFilter[i] = int(*aggFn.FilterColIdx) + hasFilterAgg = true + } else { + aggFilter[i] = tree.NoColumnIdx + } + } + + if !hasDistinct && !hasFilterAgg { + return newDefaultHashAggregatorHelper(spec) + } + filters := make([]*filteringSingleFunctionHelper, len(spec.Aggregations)) + for i, filterIdx := range aggFilter { + filters[i] = newFilteringHashAggHelper(allocator, inputTypes, filterIdx) + } + if !hasDistinct { + return newFilteringHashAggregatorHelper(spec, filters) + } + return newFilteringDistinctHashAggregatorHelper(memAccount, inputTypes, spec, filters, datumAlloc) +} + +// defaultHashAggregatorHelper is the default hashAggregatorHelper for the case +// when no aggregate function is performing DISTINCT or FILTERing aggregation. +type defaultHashAggregatorHelper struct { + spec *execinfrapb.AggregatorSpec +} + +var _ hashAggregatorHelper = &defaultHashAggregatorHelper{} + +func newDefaultHashAggregatorHelper(spec *execinfrapb.AggregatorSpec) hashAggregatorHelper { + return &defaultHashAggregatorHelper{spec: spec} +} + +func (h *defaultHashAggregatorHelper) makeSeenMaps() []map[string]struct{} { + return nil +} + +func (h *defaultHashAggregatorHelper) performAggregation( + _ context.Context, vecs []coldata.Vec, inputLen int, sel []int, bucket *hashAggBucket, +) { + for fnIdx, fn := range bucket.fns { + fn.Compute(vecs, h.spec.Aggregations[fnIdx].ColIdx, inputLen, sel) + } +} + +// hashAggregatorHelperBase is a utility struct that provides non-default +// hashAggregatorHelpers with the logic necessary for saving/restoring the +// input state. +type hashAggregatorHelperBase struct { + spec *execinfrapb.AggregatorSpec + + vecs []coldata.Vec + usesSel bool + origSel []int + origLen int +} + +func newAggregatorHelperBase(spec *execinfrapb.AggregatorSpec) *hashAggregatorHelperBase { + b := &hashAggregatorHelperBase{spec: spec} + b.origSel = make([]int, coldata.BatchSize()) + return b +} + +func (h *hashAggregatorHelperBase) saveState(vecs []coldata.Vec, origLen int, origSel []int) { + h.vecs = vecs + h.origLen = origLen + h.usesSel = origSel != nil + if h.usesSel { + copy(h.origSel[:h.origLen], origSel[:h.origLen]) + } +} + +func (h *hashAggregatorHelperBase) restoreState() ([]coldata.Vec, int, []int) { + sel := h.origSel + if !h.usesSel { + sel = nil + } + return h.vecs, h.origLen, sel +} + +// filteringSingleFunctionHelper is a utility struct that helps with handling +// of a FILTER clause of a single aggregate function. +type filteringSingleFunctionHelper struct { + filter colexecbase.Operator + filterInput *singleBatchOperator +} + +var noFilterHashAggHelper = &filteringSingleFunctionHelper{} + +// newFilteringHashAggHelper returns a new filteringSingleFunctionHelper. +// tree.NoColumnIdx index can be used to indicate that there is no FILTER +// clause for the aggregate function. +func newFilteringHashAggHelper( + allocator *colmem.Allocator, typs []*types.T, filterIdx int, +) *filteringSingleFunctionHelper { + if filterIdx == tree.NoColumnIdx { + return noFilterHashAggHelper + } + filterInput := newSingleBatchOperator(allocator, typs) + h := &filteringSingleFunctionHelper{ + filter: newBoolVecToSelOp(filterInput, filterIdx), + filterInput: filterInput, + } + return h +} + +// applyFilter returns the updated selection vector that includes only tuples +// for which filtering column has 'true' value set. It also returns whether +// state might have been modified. +func (h *filteringSingleFunctionHelper) applyFilter( + ctx context.Context, vecs []coldata.Vec, inputLen int, sel []int, +) (_ []coldata.Vec, _ int, _ []int, maybeModified bool) { + if h.filter == nil { + return vecs, inputLen, sel, false + } + h.filterInput.reset(vecs, inputLen, sel) + newBatch := h.filter.Next(ctx) + return newBatch.ColVecs(), newBatch.Length(), newBatch.Selection(), true +} + +// filteringHashAggregatorHelper is a hashAggregatorHelper that handles the +// aggregate functions which have at least one FILTER clause but no DISTINCT +// clauses. +type filteringHashAggregatorHelper struct { + *hashAggregatorHelperBase + + filters []*filteringSingleFunctionHelper +} + +var _ hashAggregatorHelper = &filteringHashAggregatorHelper{} + +func newFilteringHashAggregatorHelper( + spec *execinfrapb.AggregatorSpec, filters []*filteringSingleFunctionHelper, +) hashAggregatorHelper { + h := &filteringHashAggregatorHelper{ + hashAggregatorHelperBase: newAggregatorHelperBase(spec), + filters: filters, + } + return h +} + +func (h *filteringHashAggregatorHelper) makeSeenMaps() []map[string]struct{} { + return nil +} + +func (h *filteringHashAggregatorHelper) performAggregation( + ctx context.Context, vecs []coldata.Vec, inputLen int, sel []int, bucket *hashAggBucket, +) { + h.saveState(vecs, inputLen, sel) + for fnIdx, fn := range bucket.fns { + var maybeModified bool + vecs, inputLen, sel, maybeModified = h.filters[fnIdx].applyFilter(ctx, vecs, inputLen, sel) + if inputLen > 0 { + // It is possible that all tuples to aggregate have been filtered + // out, so we need to check the length. + fn.Compute(vecs, h.spec.Aggregations[fnIdx].ColIdx, inputLen, sel) + } + if maybeModified { + // Restore the state so that the next iteration sees the input with + // the original selection vector and length. + vecs, inputLen, sel = h.restoreState() + } + } +} + +// filteringDistinctHashAggregatorHelper is a hashAggregatorHelper that handles +// the aggregate functions with any number of DISTINCT and/or FILTER clauses. +// The helper should be shared among all groups for aggregation. The filtering +// is delegated to filteringHashAggHelpers, and this struct handles the +// "distinctness" of aggregation. +// Note that the "distinctness" of tuples is handled by encoding aggregation +// columns of a tuple (one tuple at a time) and storing it in a seen map that +// is separate for each aggregation group and for each aggregate function with +// DISTINCT clause. +// Other approaches have been prototyped but showed worse performance: +// - using the vectorized hash table - the benefit of such approach is that we +// don't reduce ourselves to one tuple at a time (because we would be hashing +// the full columns at once), but the big disadvantage is that the full tuples +// are stored in the hash table (instead of an encoded representation). +// TODO(yuzefovich): reevaluate the vectorized hash table once it is +// dynamically resizable and can store only a subset of columns from the input. +// - using a single global map for a particular aggregate function that is +// shared among all aggregation groups - the benefit of such approach is that +// we only have a handful of map, but it turned out that such global map grows +// a lot bigger and has worse performance. +type filteringDistinctHashAggregatorHelper struct { + *hashAggregatorHelperBase + + inputTypes []*types.T + filters []*filteringSingleFunctionHelper + aggColsConverter *vecToDatumConverter + arena stringarena.Arena + datumAlloc *sqlbase.DatumAlloc + scratch struct { + ed sqlbase.EncDatum + encoded []byte + // converted is a scratch space for converting a single element. + converted []tree.Datum + sel []int + } +} + +var _ hashAggregatorHelper = &filteringDistinctHashAggregatorHelper{} + +func newFilteringDistinctHashAggregatorHelper( + memAccount *mon.BoundAccount, + inputTypes []*types.T, + spec *execinfrapb.AggregatorSpec, + filters []*filteringSingleFunctionHelper, + datumAlloc *sqlbase.DatumAlloc, +) hashAggregatorHelper { + h := &filteringDistinctHashAggregatorHelper{ + hashAggregatorHelperBase: newAggregatorHelperBase(spec), + inputTypes: inputTypes, + arena: stringarena.Make(memAccount), + datumAlloc: datumAlloc, + filters: filters, + } + var vecIdxsToConvert []int + for _, aggFn := range spec.Aggregations { + if aggFn.Distinct { + for _, aggCol := range aggFn.ColIdx { + found := false + for _, vecIdx := range vecIdxsToConvert { + if vecIdx == int(aggCol) { + found = true + break + } + } + if !found { + vecIdxsToConvert = append(vecIdxsToConvert, int(aggCol)) + } + } + } + } + h.aggColsConverter = newVecToDatumConverter(len(inputTypes), vecIdxsToConvert) + h.scratch.converted = []tree.Datum{nil} + h.scratch.sel = make([]int, coldata.BatchSize()) + return h +} + +func (h *filteringDistinctHashAggregatorHelper) makeSeenMaps() []map[string]struct{} { + // Note that we consciously don't account for the memory used under seen + // maps because that memory is likely noticeably smaller than the memory + // used (and accounted for) in other parts of the hash aggregation (the + // vectorized hash table and the aggregate functions). + seen := make([]map[string]struct{}, len(h.spec.Aggregations)) + for i, aggFn := range h.spec.Aggregations { + if aggFn.Distinct { + seen[i] = make(map[string]struct{}) + } + } + return seen +} + +// selectDistinctTuples returns new selection vector that contains only tuples +// that haven't been seen by the aggregate function yet when the function +// performs DISTINCT aggregation. aggColsConverter must have already done the +// conversion of the relevant aggregate columns *without* deselection. This +// function assumes that seen map is non-nil and is the same that is used for +// all batches from the same aggregation group. +func (h *filteringDistinctHashAggregatorHelper) selectDistinctTuples( + ctx context.Context, inputLen int, sel []int, inputIdxs []uint32, seen map[string]struct{}, +) (newLen int, newSel []int) { + newSel = h.scratch.sel + var ( + tupleIdx int + err error + s string + ) + for idx := 0; idx < inputLen; idx++ { + h.scratch.encoded = h.scratch.encoded[:0] + tupleIdx = idx + if sel != nil { + tupleIdx = sel[idx] + } + for _, colIdx := range inputIdxs { + h.scratch.ed.Datum = h.aggColsConverter.getDatumColumn(int(colIdx))[tupleIdx] + h.scratch.encoded, err = h.scratch.ed.Fingerprint( + h.inputTypes[colIdx], h.datumAlloc, h.scratch.encoded, + ) + if err != nil { + colexecerror.InternalError(err) + } + } + if _, seenPreviously := seen[string(h.scratch.encoded)]; !seenPreviously { + s, err = h.arena.AllocBytes(ctx, h.scratch.encoded) + if err != nil { + colexecerror.InternalError(err) + } + seen[s] = struct{}{} + newSel[newLen] = tupleIdx + newLen++ + } + } + return +} + +// performAggregation executes Compute on all fns paying attention to distinct +// tuples if the corresponding function performs DISTINCT aggregation (as well +// as to any present FILTER clauses). For such functions the approach is as +// follows: +// 1. Store the input state because we will be modifying some of it. +// 2. Convert all aggregate columns of functions that perform DISTINCT +// aggregation. +// 3. For every function: +// 1) Apply the filter to the selection vector of the input. +// 2) Update the (possibly updated) selection vector to include only tuples +// we haven't yet seen making sure to remember that new tuples we have +// just seen. +// 3) Execute Compute on the updated state. +// 4) Restore the state to the original state (if it might have been +// modified). +func (h *filteringDistinctHashAggregatorHelper) performAggregation( + ctx context.Context, vecs []coldata.Vec, inputLen int, sel []int, bucket *hashAggBucket, +) { + h.saveState(vecs, inputLen, sel) + h.aggColsConverter.convertVecs(vecs, inputLen, sel) + var maybeModified bool + for aggFnIdx, aggFn := range h.spec.Aggregations { + vecs, inputLen, sel, maybeModified = h.filters[aggFnIdx].applyFilter(ctx, vecs, inputLen, sel) + if inputLen > 0 && aggFn.Distinct { + inputLen, sel = h.selectDistinctTuples(ctx, inputLen, sel, aggFn.ColIdx, bucket.seen[aggFnIdx]) + maybeModified = true + } + if inputLen > 0 { + bucket.fns[aggFnIdx].Compute(vecs, aggFn.ColIdx, inputLen, sel) + } + if maybeModified { + vecs, inputLen, sel = h.restoreState() + } + } +} + +// singleBatchOperator is a helper colexecbase.Operator that returns the +// provided vectors as a batch on the first call to Next() and zero batch on +// all consequent calls (until it is reset). It must be reset before it can be +// used for the first time. +type singleBatchOperator struct { + colexecbase.ZeroInputNode + NonExplainable + + nexted bool + batch coldata.Batch +} + +var _ colexecbase.Operator = &singleBatchOperator{} + +func newSingleBatchOperator(allocator *colmem.Allocator, typs []*types.T) *singleBatchOperator { + return &singleBatchOperator{ + batch: allocator.NewMemBatchNoCols(typs, coldata.BatchSize()), + } +} + +func (o *singleBatchOperator) Init() {} + +func (o *singleBatchOperator) Next(context.Context) coldata.Batch { + if o.nexted { + return coldata.ZeroBatch + } + o.nexted = true + return o.batch +} + +func (o *singleBatchOperator) reset(vecs []coldata.Vec, inputLen int, sel []int) { + o.nexted = false + for i, vec := range vecs { + o.batch.ReplaceCol(vec, i) + } + o.batch.SetLength(inputLen) + o.batch.SetSelection(sel != nil) + if sel != nil { + copy(o.batch.Selection(), sel[:inputLen]) + } +} diff --git a/pkg/sql/colexec/ordered_aggregator.go b/pkg/sql/colexec/ordered_aggregator.go index 1a0d627509fb..0ab9f15e83ed 100644 --- a/pkg/sql/colexec/ordered_aggregator.go +++ b/pkg/sql/colexec/ordered_aggregator.go @@ -102,6 +102,14 @@ func NewOrderedAggregator( outputTypes []*types.T, isScalar bool, ) (colexecbase.Operator, error) { + for _, aggFn := range spec.Aggregations { + if aggFn.Distinct { + return nil, errors.AssertionFailedf("distinct ordered aggregation is not supported") + } + if aggFn.FilterColIdx != nil { + return nil, errors.AssertionFailedf("filtering ordered aggregation is not supported") + } + } op, groupCol, err := OrderedDistinctColsToOperators(input, spec.GroupCols, inputTypes) if err != nil { return nil, err diff --git a/pkg/sql/colexec/vec_to_datum.eg.go b/pkg/sql/colexec/vec_to_datum.eg.go index 1f2ef66b2fbb..c6ef9a0e8a54 100644 --- a/pkg/sql/colexec/vec_to_datum.eg.go +++ b/pkg/sql/colexec/vec_to_datum.eg.go @@ -94,19 +94,26 @@ func (c *vecToDatumConverter) convertBatchAndDeselect(batch coldata.Batch) { // convertBatch converts the selected vectors from the batch *without* // performing a deselection step. // NOTE: converted columns are "sparse" in regards to the selection vector - if -// there was a selection vector on the batch, only elements that were selected -// are converted, but the results are put at position sel[tupleIdx], so use +// there was a selection vector, only elements that were selected are +// converted, but the results are put at position sel[tupleIdx], so use // getDatumColumn(colIdx)[sel[tupleIdx]] and *NOT* // getDatumColumn(colIdx)[tupleIdx]. func (c *vecToDatumConverter) convertBatch(batch coldata.Batch) { + c.convertVecs(batch.ColVecs(), batch.Length(), batch.Selection()) +} + +// convertVecs converts the selected vectors from vecs *without* performing a +// deselection step. +// Note that this method is equivalent to convertBatch with the only difference +// being the fact that it takes in a "disassembled" batch and not coldata.Batch. +// Consider whether you should be using convertBatch instead. +func (c *vecToDatumConverter) convertVecs(vecs []coldata.Vec, inputLen int, sel []int) { if len(c.vecIdxsToConvert) == 0 { // No vectors were selected for conversion, so there is nothing to do. return } - batchLength := batch.Length() - sel := batch.Selection() // Ensure that convertedVecs are of sufficient length. - requiredLength := batchLength + requiredLength := inputLen if sel != nil { // When sel is non-nil, it might be something like sel = [1023], so we // need to allocate up to the full coldata.BatchSize(), regardless of @@ -125,10 +132,9 @@ func (c *vecToDatumConverter) convertBatch(batch coldata.Batch) { c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:requiredLength] } } - vecs := batch.ColVecs() for _, vecIdx := range c.vecIdxsToConvert { ColVecToDatum( - c.convertedVecs[vecIdx], vecs[vecIdx], batchLength, sel, &c.da, + c.convertedVecs[vecIdx], vecs[vecIdx], inputLen, sel, &c.da, ) } } diff --git a/pkg/sql/colexec/vec_to_datum_tmpl.go b/pkg/sql/colexec/vec_to_datum_tmpl.go index f305f3bb8b39..45c9e8953d07 100644 --- a/pkg/sql/colexec/vec_to_datum_tmpl.go +++ b/pkg/sql/colexec/vec_to_datum_tmpl.go @@ -97,19 +97,26 @@ func (c *vecToDatumConverter) convertBatchAndDeselect(batch coldata.Batch) { // convertBatch converts the selected vectors from the batch *without* // performing a deselection step. // NOTE: converted columns are "sparse" in regards to the selection vector - if -// there was a selection vector on the batch, only elements that were selected -// are converted, but the results are put at position sel[tupleIdx], so use +// there was a selection vector, only elements that were selected are +// converted, but the results are put at position sel[tupleIdx], so use // getDatumColumn(colIdx)[sel[tupleIdx]] and *NOT* // getDatumColumn(colIdx)[tupleIdx]. func (c *vecToDatumConverter) convertBatch(batch coldata.Batch) { + c.convertVecs(batch.ColVecs(), batch.Length(), batch.Selection()) +} + +// convertVecs converts the selected vectors from vecs *without* performing a +// deselection step. +// Note that this method is equivalent to convertBatch with the only difference +// being the fact that it takes in a "disassembled" batch and not coldata.Batch. +// Consider whether you should be using convertBatch instead. +func (c *vecToDatumConverter) convertVecs(vecs []coldata.Vec, inputLen int, sel []int) { if len(c.vecIdxsToConvert) == 0 { // No vectors were selected for conversion, so there is nothing to do. return } - batchLength := batch.Length() - sel := batch.Selection() // Ensure that convertedVecs are of sufficient length. - requiredLength := batchLength + requiredLength := inputLen if sel != nil { // When sel is non-nil, it might be something like sel = [1023], so we // need to allocate up to the full coldata.BatchSize(), regardless of @@ -128,10 +135,9 @@ func (c *vecToDatumConverter) convertBatch(batch coldata.Batch) { c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:requiredLength] } } - vecs := batch.ColVecs() for _, vecIdx := range c.vecIdxsToConvert { ColVecToDatum( - c.convertedVecs[vecIdx], vecs[vecIdx], batchLength, sel, &c.da, + c.convertedVecs[vecIdx], vecs[vecIdx], inputLen, sel, &c.da, ) } } diff --git a/pkg/sql/distsql/columnar_operators_test.go b/pkg/sql/distsql/columnar_operators_test.go index 15d07a76b838..607dca667d5e 100644 --- a/pkg/sql/distsql/columnar_operators_test.go +++ b/pkg/sql/distsql/columnar_operators_test.go @@ -132,140 +132,185 @@ func TestAggregatorAgainstProcessor(t *testing.T) { aggregations = append(aggregations, execinfrapb.AggregatorSpec_Aggregation{Func: aggFn}) } for _, hashAgg := range []bool{false, true} { - for numGroupingCols := 1; numGroupingCols <= maxNumGroupingCols; numGroupingCols++ { - // We will be grouping based on the first numGroupingCols columns - // (which will be of INT types) with the values for the columns set - // manually below. - inputTypes := make([]*types.T, 0, numGroupingCols+len(aggregations)) - for i := 0; i < numGroupingCols; i++ { - inputTypes = append(inputTypes, types.Int) + filteringAggOptions := []bool{false} + if hashAgg { + // We currently support filtering aggregation only for hash + // aggregator. + filteringAggOptions = []bool{false, true} + } + for _, filteringAgg := range filteringAggOptions { + numFilteringCols := 0 + if filteringAgg { + numFilteringCols = 1 } - // After all grouping columns, we will have input columns for each - // of the aggregate functions. Here, we will set up the column - // indices, and the types will be regenerated below - numColsSoFar := numGroupingCols - for i := range aggregations { - numArguments := aggregateFuncToNumArguments[aggregations[i].Func] - aggregations[i].ColIdx = make([]uint32, numArguments) - for j := range aggregations[i].ColIdx { - aggregations[i].ColIdx[j] = uint32(numColsSoFar) - numColsSoFar++ + for numGroupingCols := 1; numGroupingCols <= maxNumGroupingCols; numGroupingCols++ { + // We will be grouping based on the first numGroupingCols columns + // (which will be of INT types) with the values for the columns set + // manually below. + numUtilityCols := numGroupingCols + numFilteringCols + inputTypes := make([]*types.T, 0, numUtilityCols+len(aggregations)) + for i := 0; i < numGroupingCols; i++ { + inputTypes = append(inputTypes, types.Int) } - } - outputTypes := make([]*types.T, len(aggregations)) - - for run := 0; run < nRuns; run++ { - inputTypes = inputTypes[:numGroupingCols] - var rows sqlbase.EncDatumRows + // Check whether we want to add a column for FILTER clause. + var filteringColIdx uint32 + if filteringAgg { + filteringColIdx = uint32(len(inputTypes)) + inputTypes = append(inputTypes, types.Bool) + } + // After all utility columns, we will have input columns for each + // of the aggregate functions. Here, we will set up the column + // indices, and the types will be generated below. + numColsSoFar := numUtilityCols for i := range aggregations { - aggFn := aggregations[i].Func - aggFnInputTypes := make([]*types.T, len(aggregations[i].ColIdx)) - for { - for j := range aggFnInputTypes { - aggFnInputTypes[j] = sqlbase.RandType(rng) - } - // There is a special case for concat_agg, string_agg, st_extent - // and st_makeline when at least one argument is a - // tuple. Such cases pass GetAggregateInfo check below, - // but they are actually invalid, and during normal - // execution it is caught during type-checking. - // However, we don't want to do fully-fledged type - // checking, so we hard-code an exception here. - invalid := false - switch aggFn { - case execinfrapb.AggregatorSpec_CONCAT_AGG, - execinfrapb.AggregatorSpec_STRING_AGG, - execinfrapb.AggregatorSpec_ST_MAKELINE, - execinfrapb.AggregatorSpec_ST_EXTENT: - for _, typ := range aggFnInputTypes { - if typ.Family() == types.TupleFamily { - invalid = true - break + numArguments := aggregateFuncToNumArguments[aggregations[i].Func] + aggregations[i].ColIdx = make([]uint32, numArguments) + for j := range aggregations[i].ColIdx { + aggregations[i].ColIdx[j] = uint32(numColsSoFar) + numColsSoFar++ + } + } + outputTypes := make([]*types.T, len(aggregations)) + + for run := 0; run < nRuns; run++ { + inputTypes = inputTypes[:numUtilityCols] + var rows sqlbase.EncDatumRows + hasJSONColumn := false + for i := range aggregations { + aggFn := aggregations[i].Func + aggFnInputTypes := make([]*types.T, len(aggregations[i].ColIdx)) + for { + for j := range aggFnInputTypes { + aggFnInputTypes[j] = sqlbase.RandType(rng) + } + // There is a special case for concat_agg, string_agg, + // st_makeline, and st_extent when at least one argument is a + // tuple. Such cases pass GetAggregateInfo check below, + // but they are actually invalid, and during normal + // execution it is caught during type-checking. + // However, we don't want to do fully-fledged type + // checking, so we hard-code an exception here. + invalid := false + switch aggFn { + case execinfrapb.AggregatorSpec_CONCAT_AGG, + execinfrapb.AggregatorSpec_STRING_AGG, + execinfrapb.AggregatorSpec_ST_MAKELINE, + execinfrapb.AggregatorSpec_ST_EXTENT: + for _, typ := range aggFnInputTypes { + if typ.Family() == types.TupleFamily { + invalid = true + break + } } } + if invalid { + continue + } + for _, typ := range aggFnInputTypes { + hasJSONColumn = hasJSONColumn || typ.Family() == types.JsonFamily + } + if _, outputType, err := execinfrapb.GetAggregateInfo(aggFn, aggFnInputTypes...); err == nil { + outputTypes[i] = outputType + break + } } - if invalid { - continue - } - if _, outputType, err := execinfrapb.GetAggregateInfo(aggFn, aggFnInputTypes...); err == nil { - outputTypes[i] = outputType - break - } + inputTypes = append(inputTypes, aggFnInputTypes...) } - inputTypes = append(inputTypes, aggFnInputTypes...) - } - rows = sqlbase.RandEncDatumRowsOfTypes(rng, nRows, inputTypes) - groupIdx := 0 - for _, row := range rows { - for i := 0; i < numGroupingCols; i++ { - if rng.Float64() < nullProbability { - row[i] = sqlbase.EncDatum{Datum: tree.DNull} - } else { - row[i] = sqlbase.EncDatum{Datum: tree.NewDInt(tree.DInt(groupIdx))} - if rng.Float64() < nextGroupProb { - groupIdx++ + rows = sqlbase.RandEncDatumRowsOfTypes(rng, nRows, inputTypes) + groupIdx := 0 + for _, row := range rows { + for i := 0; i < numGroupingCols; i++ { + if rng.Float64() < nullProbability { + row[i] = sqlbase.EncDatum{Datum: tree.DNull} + } else { + row[i] = sqlbase.EncDatum{Datum: tree.NewDInt(tree.DInt(groupIdx))} + if rng.Float64() < nextGroupProb { + groupIdx++ + } } } } - } - aggregatorSpec := &execinfrapb.AggregatorSpec{ - Type: execinfrapb.AggregatorSpec_NON_SCALAR, - GroupCols: groupingCols[:numGroupingCols], - Aggregations: aggregations, - } - if hashAgg { - // Let's shuffle the rows for the hash aggregator. - rand.Shuffle(nRows, func(i, j int) { - rows[i], rows[j] = rows[j], rows[i] - }) - } else { - aggregatorSpec.OrderedGroupCols = groupingCols[:numGroupingCols] - orderedCols := execinfrapb.ConvertToColumnOrdering( - execinfrapb.Ordering{Columns: orderingCols[:numGroupingCols]}, - ) - // Although we build the input rows in "non-decreasing" order, it is - // possible that some NULL values are present here and there, so we - // need to sort the rows to satisfy the ordering conditions. - sort.Slice(rows, func(i, j int) bool { - cmp, err := rows[i].Compare(inputTypes, &da, orderedCols, &evalCtx, rows[j]) - if err != nil { - t.Fatal(err) + // Update the specifications of aggregate functions to + // possibly include DISTINCT and/or FILTER clauses. + for _, aggFn := range aggregations { + distinctProb := 0.5 + if !hashAgg { + // We currently support distinct aggregation only + // for hash aggregator. + distinctProb = 0 + } + if hasJSONColumn { + // We currently cannot encode json columns, so we + // don't support distinct aggregation in both + // row-by-row and vectorized engines. + distinctProb = 0 + } + aggFn.Distinct = rng.Float64() < distinctProb + if filteringAgg { + aggFn.FilterColIdx = &filteringColIdx + } else { + aggFn.FilterColIdx = nil } - return cmp < 0 - }) - } - pspec := &execinfrapb.ProcessorSpec{ - Input: []execinfrapb.InputSyncSpec{{ColumnTypes: inputTypes}}, - Core: execinfrapb.ProcessorCoreUnion{Aggregator: aggregatorSpec}, - } - args := verifyColOperatorArgs{ - anyOrder: hashAgg, - inputTypes: [][]*types.T{inputTypes}, - inputs: []sqlbase.EncDatumRows{rows}, - outputTypes: outputTypes, - pspec: pspec, - } - if err := verifyColOperator(args); err != nil { - if strings.Contains(err.Error(), "different errors returned") { - // Columnar and row-based aggregators are likely to hit - // different errors, and we will swallow those and move - // on. - continue } - fmt.Printf("--- seed = %d run = %d hash = %t ---\n", - seed, run, hashAgg) - var aggFnNames string - for i, agg := range aggregations { - if i > 0 { - aggFnNames += " " + aggregatorSpec := &execinfrapb.AggregatorSpec{ + Type: execinfrapb.AggregatorSpec_NON_SCALAR, + GroupCols: groupingCols[:numGroupingCols], + Aggregations: aggregations, + } + if hashAgg { + // Let's shuffle the rows for the hash aggregator. + rand.Shuffle(nRows, func(i, j int) { + rows[i], rows[j] = rows[j], rows[i] + }) + } else { + aggregatorSpec.OrderedGroupCols = groupingCols[:numGroupingCols] + orderedCols := execinfrapb.ConvertToColumnOrdering( + execinfrapb.Ordering{Columns: orderingCols[:numGroupingCols]}, + ) + // Although we build the input rows in "non-decreasing" order, it is + // possible that some NULL values are present here and there, so we + // need to sort the rows to satisfy the ordering conditions. + sort.Slice(rows, func(i, j int) bool { + cmp, err := rows[i].Compare(inputTypes, &da, orderedCols, &evalCtx, rows[j]) + if err != nil { + t.Fatal(err) + } + return cmp < 0 + }) + } + pspec := &execinfrapb.ProcessorSpec{ + Input: []execinfrapb.InputSyncSpec{{ColumnTypes: inputTypes}}, + Core: execinfrapb.ProcessorCoreUnion{Aggregator: aggregatorSpec}, + } + args := verifyColOperatorArgs{ + anyOrder: hashAgg, + inputTypes: [][]*types.T{inputTypes}, + inputs: []sqlbase.EncDatumRows{rows}, + outputTypes: outputTypes, + pspec: pspec, + } + if err := verifyColOperator(args); err != nil { + if strings.Contains(err.Error(), "different errors returned") { + // Columnar and row-based aggregators are likely to hit + // different errors, and we will swallow those and move + // on. + continue + } + fmt.Printf("--- seed = %d run = %d filter = %t hash = %t ---\n", + seed, run, filteringAgg, hashAgg) + var aggFnNames string + for i, agg := range aggregations { + if i > 0 { + aggFnNames += " " + } + aggFnNames += agg.Func.String() } - aggFnNames += agg.Func.String() + fmt.Printf("--- %s ---\n", aggFnNames) + prettyPrintTypes(inputTypes, "t" /* tableName */) + prettyPrintInput(rows, inputTypes, "t" /* tableName */) + t.Fatal(err) } - fmt.Printf("--- %s ---\n", aggFnNames) - prettyPrintTypes(inputTypes, "t" /* tableName */) - prettyPrintInput(rows, inputTypes, "t" /* tableName */) - t.Fatal(err) } } }