diff --git a/pkg/sql/colexec/aggregate_funcs.go b/pkg/sql/colexec/aggregate_funcs.go index 107b9e333fb4..ce0be3345717 100644 --- a/pkg/sql/colexec/aggregate_funcs.go +++ b/pkg/sql/colexec/aggregate_funcs.go @@ -308,8 +308,8 @@ func newAggregateFuncsAlloc( } // sizeOfAggregateFunc is the size of some aggregateFunc implementation. -// countAgg was chosen arbitrarily, but it's important that we use a pointer to -// the aggregate function struct. +// countHashAgg was chosen arbitrarily, but it's important that we use a +// pointer to the aggregate function struct. const sizeOfAggregateFunc = int64(unsafe.Sizeof(&countHashAgg{})) func (a *aggregateFuncsAlloc) makeAggregateFuncs() []aggregateFunc { @@ -319,7 +319,7 @@ func (a *aggregateFuncsAlloc) makeAggregateFuncs() []aggregateFunc { // of 'allocSize x number of funcs in schema' length. Every // aggFuncAlloc will allocate allocSize of objects on the newAggFunc // call below. - a.allocator.AdjustMemoryUsage(sizeOfAggregateFunc * a.allocSize) + a.allocator.AdjustMemoryUsage(sizeOfAggregateFunc * int64(len(a.aggFuncAllocs)) * a.allocSize) a.returnFuncs = make([]aggregateFunc, len(a.aggFuncAllocs)*int(a.allocSize)) } funcs := a.returnFuncs[:len(a.aggFuncAllocs)] diff --git a/pkg/sql/colexec/aggregators_test.go b/pkg/sql/colexec/aggregators_test.go index 64936aeb3cfd..7be572aad8d0 100644 --- a/pkg/sql/colexec/aggregators_test.go +++ b/pkg/sql/colexec/aggregators_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -51,6 +52,8 @@ type aggregatorTestCase struct { constArguments [][]execinfrapb.Expression // spec will be populated during init(). spec *execinfrapb.AggregatorSpec + aggDistinct []bool + aggFilter []int input tuples unorderedInput bool expected tuples @@ -160,6 +163,13 @@ func (tc *aggregatorTestCase) init() error { if tc.constArguments != nil { aggregations[i].Arguments = tc.constArguments[i] } + if tc.aggDistinct != nil { + aggregations[i].Distinct = tc.aggDistinct[i] + } + if tc.aggFilter != nil && tc.aggFilter[i] != tree.NoColumnIdx { + filterColIdx := uint32(tc.aggFilter[i]) + aggregations[i].FilterColIdx = &filterColIdx + } } tc.spec = &execinfrapb.AggregatorSpec{ GroupCols: tc.groupCols, @@ -693,12 +703,133 @@ func TestAggregatorAllFunctions(t *testing.T) { }, convToDecimal: true, }, + + // Test DISTINCT aggregation. + { + aggFns: []execinfrapb.AggregatorSpec_Func{ + execinfrapb.AggregatorSpec_ANY_NOT_NULL, + execinfrapb.AggregatorSpec_COUNT, + execinfrapb.AggregatorSpec_COUNT, + execinfrapb.AggregatorSpec_SUM_INT, + execinfrapb.AggregatorSpec_SUM_INT, + }, + aggCols: [][]uint32{{0}, {1}, {1}, {1}, {1}}, + aggDistinct: []bool{false, false, true, false, true}, + typs: []*types.T{types.Int, types.Int}, + input: tuples{ + {0, 1}, + {0, 2}, + {0, 2}, + {0, nil}, + {0, 1}, + {0, nil}, + {1, 1}, + {1, 2}, + {1, 2}, + }, + expected: tuples{ + {0, 4, 2, 6, 3}, + {1, 3, 2, 5, 3}, + }, + }, + + // Test aggregation with FILTERs. + { + aggFns: []execinfrapb.AggregatorSpec_Func{ + execinfrapb.AggregatorSpec_ANY_NOT_NULL, + execinfrapb.AggregatorSpec_COUNT_ROWS, + execinfrapb.AggregatorSpec_SUM_INT, + }, + aggCols: [][]uint32{{0}, {}, {1}}, + aggFilter: []int{tree.NoColumnIdx, 2, 2}, + typs: []*types.T{types.Int, types.Int, types.Bool}, + input: tuples{ + {0, 1, false}, + {0, 2, true}, + {0, 2, true}, + {0, nil, nil}, + {0, 1, nil}, + {0, nil, true}, + {1, 1, true}, + {1, 2, nil}, + {1, 2, true}, + }, + expected: tuples{ + {0, 3, 4}, + {1, 2, 3}, + }, + }, + + // Test aggregation with FILTERs when the whole groups are filtered out. + { + aggFns: []execinfrapb.AggregatorSpec_Func{ + execinfrapb.AggregatorSpec_ANY_NOT_NULL, + execinfrapb.AggregatorSpec_COUNT_ROWS, + execinfrapb.AggregatorSpec_SUM_INT, + }, + aggCols: [][]uint32{{0}, {}, {1}}, + aggFilter: []int{tree.NoColumnIdx, 2, 2}, + typs: []*types.T{types.Int, types.Int, types.Bool}, + input: tuples{ + {0, 1, false}, + {0, nil, nil}, + {0, 2, false}, + {1, 1, true}, + {1, 2, nil}, + {1, 2, true}, + {2, 1, false}, + {2, nil, nil}, + {2, 2, nil}, + }, + expected: tuples{ + {0, 0, nil}, + {1, 2, 3}, + {2, 0, nil}, + }, + }, + + // Test aggregation with FILTERs and DISTINCTs intertwined. + { + aggFns: []execinfrapb.AggregatorSpec_Func{ + execinfrapb.AggregatorSpec_ANY_NOT_NULL, + execinfrapb.AggregatorSpec_COUNT, + execinfrapb.AggregatorSpec_COUNT, + execinfrapb.AggregatorSpec_COUNT, + execinfrapb.AggregatorSpec_SUM_INT, + execinfrapb.AggregatorSpec_SUM_INT, + execinfrapb.AggregatorSpec_SUM_INT, + }, + aggCols: [][]uint32{{0}, {1}, {1}, {1}, {1}, {1}, {1}}, + aggDistinct: []bool{false, false, true, true, false, true, true}, + aggFilter: []int{tree.NoColumnIdx, 2, tree.NoColumnIdx, 2, 2, tree.NoColumnIdx, 2}, + typs: []*types.T{types.Int, types.Int, types.Bool}, + input: tuples{ + {0, 1, false}, + {0, 2, true}, + {0, 2, true}, + {0, nil, nil}, + {0, 1, nil}, + {0, nil, true}, + {1, 1, true}, + {1, 2, nil}, + {1, 2, true}, + }, + expected: tuples{ + {0, 2, 2, 1, 4, 3, 2}, + {1, 2, 2, 2, 3, 3, 3}, + }, + }, } evalCtx := tree.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) defer evalCtx.Stop(context.Background()) for _, agg := range aggTypes { for i, tc := range testCases { + if agg.name != "hash" && (tc.aggDistinct != nil || tc.aggFilter != nil) { + // Distinct or filtering aggregation is only supported with + // hash aggregator. + continue + } log.Infof(context.Background(), "%s/%d", agg.name, i) if err := tc.init(); err != nil { t.Fatal(err) @@ -1007,6 +1138,104 @@ func BenchmarkAllOptimizedAggregateFunctions(b *testing.B) { } } +func BenchmarkDistinctAggregation(b *testing.B) { + rng, _ := randutil.NewPseudoRand() + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + evalCtx := tree.MakeTestingEvalContext(st) + defer evalCtx.Stop(ctx) + flowCtx := &execinfra.FlowCtx{ + EvalCtx: &evalCtx, + Cfg: &execinfra.ServerConfig{ + Settings: st, + }, + } + + typs := []*types.T{types.Int, types.Int} + aggFn := execinfrapb.AggregatorSpec_COUNT + aggSpec := &execinfrapb.AggregatorSpec{ + Type: execinfrapb.AggregatorSpec_NON_SCALAR, + GroupCols: []uint32{0}, + // TODO(yuzefovich): adjust the spec once we support distinct ordered + // aggregation. + Aggregations: []execinfrapb.AggregatorSpec_Aggregation{{ + Func: aggFn, + Distinct: true, + ColIdx: []uint32{1}, + }}, + } + spec := &execinfrapb.ProcessorSpec{ + Input: []execinfrapb.InputSyncSpec{{ColumnTypes: typs}}, + Core: execinfrapb.ProcessorCoreUnion{ + Aggregator: aggSpec, + }, + } + args := &NewColOperatorArgs{ + Spec: spec, + StreamingMemAccount: testMemAcc, + } + args.TestingKnobs.UseStreamingMemAccountForBuffering = true + + for _, groupSize := range []int{1, 2, 32, 128, coldata.BatchSize() / 2, coldata.BatchSize()} { + for _, distinctProbability := range []float64{0.01, 0.1, 1.0} { + distinctModulo := int(1.0 / distinctProbability) + if (groupSize == 1 && distinctProbability != 1.0) || float64(groupSize)/float64(distinctModulo) < 0.1 { + // We have a such combination of groupSize and + // distinctProbability parameters that we will be very + // unlikely to satisfy them (for example, with groupSize=1 + // and distinctProbability=0.01, every value will be + // distinct within the group), so we skip such + // configuration. + continue + } + for _, hasNulls := range []bool{false, true} { + for _, numInputBatches := range []int{64} { + // TODO(yuzefovich): refactor benchmarkAggregateFunction to + // be more configurable and reuse it here. + b.Run(fmt.Sprintf("%s/groupSize=%d/distinctProb=%.2f/nulls=%t", + aggFn, groupSize, distinctProbability, hasNulls), + func(b *testing.B) { + nTuples := numInputBatches * coldata.BatchSize() + cols := []coldata.Vec{ + testAllocator.NewMemColumn(typs[0], nTuples), + testAllocator.NewMemColumn(typs[1], nTuples), + } + groups := cols[0].Int64() + vals := cols[1].Int64() + nGroups := nTuples / groupSize + for i := 0; i < nTuples; i++ { + groups[i] = int64(rng.Intn(nGroups)) + vals[i] = int64(rng.Intn(distinctModulo)) + if hasNulls && rng.Float64() < nullProbability { + cols[1].Nulls().SetNull(i) + } + } + source := newChunkingBatchSource(typs, cols, nTuples) + args.Inputs = []colexecbase.Operator{source} + r, err := TestNewColOperator(ctx, flowCtx, args) + if err != nil { + b.Fatal(err) + } + + a := r.Op + a.Init() + b.ResetTimer() + // Only count the aggregation column. + b.SetBytes(int64(8 * nTuples)) + for i := 0; i < b.N; i++ { + // Exhaust aggregator until all batches have been read. + for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) { + } + a.(ResettableOperator).reset(ctx) + } + }, + ) + } + } + } + } +} + func TestHashAggregator(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 89739fc49b90..c319935de11a 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -127,6 +127,23 @@ func (r *opResult) resetToState(ctx context.Context, arg colexec.NewColOperatorR *r.NewColOperatorResult = arg } +func needHashAggregator(aggSpec *execinfrapb.AggregatorSpec) (bool, error) { + var groupCols, orderedCols util.FastIntSet + for _, col := range aggSpec.OrderedGroupCols { + orderedCols.Add(int(col)) + } + for _, col := range aggSpec.GroupCols { + if !orderedCols.Contains(int(col)) { + return true, nil + } + groupCols.Add(int(col)) + } + if !orderedCols.SubsetOf(groupCols) { + return false, errors.AssertionFailedf("ordered cols must be a subset of grouping cols") + } + return false, nil +} + // isSupported checks whether we have a columnar operator equivalent to a // processor described by spec. Note that it doesn't perform any other checks // (like validity of the number of inputs). @@ -153,12 +170,16 @@ func isSupported(mode sessiondata.VectorizeExecMode, spec *execinfrapb.Processor case core.Aggregator != nil: aggSpec := core.Aggregator + needHash, err := needHashAggregator(aggSpec) + if err != nil { + return err + } for _, agg := range aggSpec.Aggregations { - if agg.Distinct { - return errors.Newf("distinct aggregation not supported") + if agg.Distinct && !needHash { + return errors.Newf("distinct ordered aggregation not supported") } - if agg.FilterColIdx != nil { - return errors.Newf("filtering aggregation not supported") + if agg.FilterColIdx != nil && !needHash { + return errors.Newf("filtering ordered aggregation not supported") } } return nil @@ -649,21 +670,11 @@ func NewColOperator( break } - var groupCols, orderedCols util.FastIntSet - for _, col := range aggSpec.OrderedGroupCols { - orderedCols.Add(int(col)) - } - needHash := false - for _, col := range aggSpec.GroupCols { - if !orderedCols.Contains(int(col)) { - needHash = true - } - groupCols.Add(int(col)) - } - if !orderedCols.SubsetOf(groupCols) { - return r, errors.AssertionFailedf("ordered cols must be a subset of grouping cols") + var needHash bool + needHash, err = needHashAggregator(aggSpec) + if err != nil { + return r, err } - inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes)) copy(inputTypes, spec.Input[0].ColumnTypes) evalCtx := flowCtx.NewEvalCtx() diff --git a/pkg/sql/colexec/hash_aggregator.go b/pkg/sql/colexec/hash_aggregator.go index 4b72d24b06c5..868cf7f39f0c 100644 --- a/pkg/sql/colexec/hash_aggregator.go +++ b/pkg/sql/colexec/hash_aggregator.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" ) @@ -57,6 +58,7 @@ type hashAggregator struct { allocator *colmem.Allocator spec *execinfrapb.AggregatorSpec + aggHelper hashAggregatorHelper inputTypes []*types.T outputTypes []*types.T inputArgsConverter *vecToDatumConverter @@ -97,6 +99,7 @@ type hashAggregator struct { aggFnsAlloc *aggregateFuncsAlloc hashAlloc hashAggFuncsAlloc + datumAlloc sqlbase.DatumAlloc toClose Closers } @@ -137,6 +140,8 @@ func NewHashAggregator( aggFnsAlloc: aggFnsAlloc, hashAlloc: hashAggFuncsAlloc{allocator: allocator}, } + hashAgg.datumAlloc.AllocSize = hashAggregatorAllocSize + hashAgg.aggHelper = newHashAggregatorHelper(allocator, inputTypes, spec, &hashAgg.datumAlloc) return hashAgg, err } @@ -303,7 +308,7 @@ func (op *hashAggregator) onlineAgg(ctx context.Context, b coldata.Batch) { // group. eqChain := op.scratch.eqChains[eqChainsSlot] bucket := op.buckets[headID-1] - bucket.compute(inputVecs, op.spec, len(eqChain), eqChain) + op.aggHelper.performAggregation(ctx, inputVecs, len(eqChain), eqChain, bucket.fns, bucket.seen) // We have fully processed this equality chain, so we need to // reset its length. op.scratch.eqChains[eqChainsSlot] = op.scratch.eqChains[eqChainsSlot][:0] @@ -327,10 +332,8 @@ func (op *hashAggregator) onlineAgg(ctx context.Context, b coldata.Batch) { bucket := op.hashAlloc.newHashAggFuncs() bucket.fns = op.aggFnsAlloc.makeAggregateFuncs() op.buckets = append(op.buckets, bucket) - // bucket knows that all selected tuples in b belong to the same - // single group, so we can pass 'nil' for the first argument. - bucket.init(nil /* group */, op.output) - bucket.compute(inputVecs, op.spec, len(eqChain), eqChain) + bucket.init(op.output, op.aggHelper.makeSeenMaps()) + op.aggHelper.performAggregation(ctx, inputVecs, len(eqChain), eqChain, bucket.fns, bucket.seen) newGroupsHeadsSel = append(newGroupsHeadsSel, eqChainsHeads[eqChainSlot]) // We need to compact the hash buffer according to the new groups // head tuples selection vector we're building. @@ -370,22 +373,21 @@ func (op *hashAggregator) Close(ctx context.Context) error { // aggregation group. type hashAggFuncs struct { fns []aggregateFunc + // seen is a dense slice of maps used to handle distinct aggregation (it is + // of the same length as the number of functions with DISTINCT clause). It + // will be nil whenever no aggregate function has a DISTINCT clause. + seen []map[string]struct{} } const sizeOfHashAggFuncs = unsafe.Sizeof(hashAggFuncs{}) -func (v *hashAggFuncs) init(group []bool, b coldata.Batch) { +func (v *hashAggFuncs) init(b coldata.Batch, seen []map[string]struct{}) { for fnIdx, fn := range v.fns { - fn.Init(group, b.ColVec(fnIdx)) - } -} - -func (v *hashAggFuncs) compute( - vecs []coldata.Vec, spec *execinfrapb.AggregatorSpec, inputLen int, sel []int, -) { - for fnIdx, fn := range v.fns { - fn.Compute(vecs, spec.Aggregations[fnIdx].ColIdx, inputLen, sel) + // We know that all selected tuples in b belong to the same single + // group, so we can pass 'nil' for the first argument. + fn.Init(nil /* groups */, b.ColVec(fnIdx)) } + v.seen = seen } // hashAggFuncsAlloc is a utility struct that batches allocations of diff --git a/pkg/sql/colexec/hash_aggregator_util.go b/pkg/sql/colexec/hash_aggregator_util.go new file mode 100644 index 000000000000..7079237a5007 --- /dev/null +++ b/pkg/sql/colexec/hash_aggregator_util.go @@ -0,0 +1,501 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colexec + +import ( + "context" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/stringarena" +) + +// hashAggregatorHelper is a helper for the hash aggregator that facilitates +// the selection of tuples on which to perform the aggregation. +type hashAggregatorHelper interface { + // makeSeenMaps returns a dense slice of maps used to handle distinct + // aggregation of a single aggregation group (it is of the same length as + // the number of functions with DISTINCT clause). It will be nil whenever + // no aggregate function has a DISTINCT clause. + makeSeenMaps() []map[string]struct{} + // performAggregation performs aggregation of all functions in fns on + // tuples in vecs that are relevant for each function (meaning that only + // tuples that pass the criteria - like DISTINCT and/or FILTER will be + // aggregated). seen is a dense slice of maps used for storing encoded + // tuples that have already been seen by the corresponding group (it can be + // nil when no aggregate function performs distinct aggregation). + performAggregation(ctx context.Context, vecs []coldata.Vec, inputLen int, sel []int, fns []aggregateFunc, seen []map[string]struct{}) +} + +// newHashAggregatorHelper creates a new hashAggregatorHelper based on provided +// aggregator specification. If there are no functions that perform either +// DISTINCT or FILTER aggregation, then the defaultHashAggregatorHelper is +// returned which has negligible performance overhead. +func newHashAggregatorHelper( + allocator *colmem.Allocator, + inputTypes []*types.T, + spec *execinfrapb.AggregatorSpec, + datumAlloc *sqlbase.DatumAlloc, +) hashAggregatorHelper { + // distinctAggIdxs is a dense list of function indices that perform + // distinct aggregation. + var distinctAggIdxs []int + hasFilterAgg := false + aggFilter := make([]int, len(spec.Aggregations)) + for i, aggFn := range spec.Aggregations { + if aggFn.Distinct { + distinctAggIdxs = append(distinctAggIdxs, i) + } + if aggFn.FilterColIdx != nil { + aggFilter[i] = int(*aggFn.FilterColIdx) + hasFilterAgg = true + } else { + aggFilter[i] = tree.NoColumnIdx + } + } + + if len(distinctAggIdxs) == 0 && !hasFilterAgg { + return newDefaultHashAggregatorHelper(spec) + } + filters := make([]*filteringHashAggHelper, len(spec.Aggregations)) + for i, filterIdx := range aggFilter { + filters[i] = newFilteringHashAggHelper(allocator, inputTypes, filterIdx) + } + if len(distinctAggIdxs) == 0 { + return newFilteringHashAggregatorHelper(spec, filters) + } + return newFilteringDistinctHashAggregatorHelper(allocator, inputTypes, spec, distinctAggIdxs, filters, datumAlloc) +} + +// defaultHashAggregatorHelper is the default hashAggregatorHelper for the case +// when no aggregate function is performing DISTINCT or FILTERing aggregation. +type defaultHashAggregatorHelper struct { + spec *execinfrapb.AggregatorSpec +} + +var _ hashAggregatorHelper = &defaultHashAggregatorHelper{} + +func newDefaultHashAggregatorHelper(spec *execinfrapb.AggregatorSpec) hashAggregatorHelper { + return &defaultHashAggregatorHelper{spec: spec} +} + +func (h *defaultHashAggregatorHelper) makeSeenMaps() []map[string]struct{} { + return nil +} + +func (h *defaultHashAggregatorHelper) performAggregation( + _ context.Context, + vecs []coldata.Vec, + inputLen int, + sel []int, + fns []aggregateFunc, + _ []map[string]struct{}, +) { + for fnIdx, fn := range fns { + fn.Compute(vecs, h.spec.Aggregations[fnIdx].ColIdx, inputLen, sel) + } +} + +// hashAggregatorHelperBase is a utility struct that provides non-default +// hashAggregatorHelpers with the logic necessary for saving/restoring the +// input state. +type hashAggregatorHelperBase struct { + spec *execinfrapb.AggregatorSpec + + vecs []coldata.Vec + usesSel bool + origSel []int + origLen int +} + +func newAggregatorHelperBase(spec *execinfrapb.AggregatorSpec) *hashAggregatorHelperBase { + b := &hashAggregatorHelperBase{spec: spec} + b.origSel = make([]int, coldata.BatchSize()) + return b +} + +func (h *hashAggregatorHelperBase) saveState(vecs []coldata.Vec, origLen int, origSel []int) { + h.vecs = vecs + h.origLen = origLen + h.usesSel = origSel != nil + if h.usesSel { + copy(h.origSel[:h.origLen], origSel[:h.origLen]) + } +} + +func (h *hashAggregatorHelperBase) restoreState() ([]coldata.Vec, int, []int) { + sel := h.origSel + if !h.usesSel { + sel = nil + } + return h.vecs, h.origLen, sel +} + +// filteringHashAggHelper is a utility struct that helps with handling of a +// FILTER clause of a single aggregate function. +type filteringHashAggHelper struct { + filter colexecbase.Operator + filterInput *singleBatchOperator +} + +var noFilterHashAggHelper = &filteringHashAggHelper{} + +// newFilteringHashAggHelper returns a new filteringHashAggHelper. +// tree.NoColumnIdx index can be used to indicate that there is no FILTER +// clause for the aggregate function. +func newFilteringHashAggHelper( + allocator *colmem.Allocator, typs []*types.T, filterIdx int, +) *filteringHashAggHelper { + if filterIdx == tree.NoColumnIdx { + return noFilterHashAggHelper + } + filterInput := newSingleBatchOperator(allocator, typs) + h := &filteringHashAggHelper{ + filter: newBoolVecToSelOp(filterInput, filterIdx), + filterInput: filterInput, + } + return h +} + +// applyFilter returns the updated selection vector that includes only tuples +// for which filtering column has 'true' value set. It also returns whether +// state might have been modified. +func (h *filteringHashAggHelper) applyFilter( + ctx context.Context, vecs []coldata.Vec, inputLen int, sel []int, +) (_ []coldata.Vec, _ int, _ []int, maybeModified bool) { + if h.filter == nil { + return vecs, inputLen, sel, false + } + h.filterInput.reset(vecs, inputLen, sel) + newBatch := h.filter.Next(ctx) + return newBatch.ColVecs(), newBatch.Length(), newBatch.Selection(), true +} + +// filteringHashAggregatorHelper is a hashAggregatorHelper that handles the +// aggregate functions which have at least one FILTER clause but no DISTINCT +// clauses. +type filteringHashAggregatorHelper struct { + *hashAggregatorHelperBase + + filters []*filteringHashAggHelper +} + +var _ hashAggregatorHelper = &filteringHashAggregatorHelper{} + +func newFilteringHashAggregatorHelper( + spec *execinfrapb.AggregatorSpec, filters []*filteringHashAggHelper, +) hashAggregatorHelper { + h := &filteringHashAggregatorHelper{ + hashAggregatorHelperBase: newAggregatorHelperBase(spec), + filters: filters, + } + return h +} + +func (h *filteringHashAggregatorHelper) makeSeenMaps() []map[string]struct{} { + return nil +} + +func (h *filteringHashAggregatorHelper) performAggregation( + ctx context.Context, + vecs []coldata.Vec, + inputLen int, + sel []int, + fns []aggregateFunc, + _ []map[string]struct{}, +) { + h.saveState(vecs, inputLen, sel) + for fnIdx, fn := range fns { + var maybeModified bool + vecs, inputLen, sel, maybeModified = h.filters[fnIdx].applyFilter(ctx, vecs, inputLen, sel) + if inputLen > 0 { + // It is possible that all tuples to aggregate have been filtered + // out, so we need to check the length. + fn.Compute(vecs, h.spec.Aggregations[fnIdx].ColIdx, inputLen, sel) + } + if maybeModified { + // Restore the state so that the next iteration sees the input with + // the original selection vector and length. + vecs, inputLen, sel = h.restoreState() + } + } +} + +// filteringDistinctHashAggregatorHelper is a hashAggregatorHelper that handles +// the aggregate functions with any number of DISTINCT and/or FILTER clauses. +// The helper should be shared among all groups for aggregation. The filtering +// is delegated to filteringHashAggHelpers, and this struct handles the +// "distinctness" of aggregation. +// Note that the "distinctness" of tuples is handled by encoding aggregation +// columns of a tuple (one tuple at a time) and storing it in a seen map that +// is separate for each aggregation group and for each aggregate function with +// DISTINCT clause. +// Another approaches have been prototyped but showed worse performance: +// - using the vectorized hash table - the benefit of such approach is that we +// don't reduce ourselves to one tuple at a time (because we would be hashing +// the full columns at once), but the big disadvantage is that the full tuples +// are stored in the hash table (instead of an encoded representation) +// - using a single global map for a particular aggregate function that is +// shared among all aggregation groups - the benefit of such approach is that +// we only have a handful of map, but it turned out that such global map grows +// a lot bigger and has worse performance. +type filteringDistinctHashAggregatorHelper struct { + *hashAggregatorHelperBase + + inputTypes []*types.T + nonDistinctAggIdxs []int + distinctAggIdxs []int + filters []*filteringHashAggHelper + aggColsConverter *vecToDatumConverter + seenAlloc *seenMapsAlloc + arena stringarena.Arena + datumAlloc *sqlbase.DatumAlloc + scratch struct { + ed sqlbase.EncDatum + encoded []byte + // converted is a scratch space for converting a single element. + converted []tree.Datum + sel []int + } +} + +var _ hashAggregatorHelper = &filteringDistinctHashAggregatorHelper{} + +func newFilteringDistinctHashAggregatorHelper( + allocator *colmem.Allocator, + inputTypes []*types.T, + spec *execinfrapb.AggregatorSpec, + distinctAggIdxs []int, + filters []*filteringHashAggHelper, + datumAlloc *sqlbase.DatumAlloc, +) hashAggregatorHelper { + h := &filteringDistinctHashAggregatorHelper{ + hashAggregatorHelperBase: newAggregatorHelperBase(spec), + inputTypes: inputTypes, + nonDistinctAggIdxs: make([]int, 0, len(spec.Aggregations)-len(distinctAggIdxs)), + distinctAggIdxs: distinctAggIdxs, + seenAlloc: newSeenMapsAlloc(allocator, len(distinctAggIdxs)), + arena: stringarena.Make(allocator.GetAccount()), + datumAlloc: datumAlloc, + filters: filters, + } + for aggIdx := range spec.Aggregations { + isDistinct := false + for _, distinctAggIdx := range distinctAggIdxs { + if aggIdx == distinctAggIdx { + isDistinct = true + break + } + } + if !isDistinct { + h.nonDistinctAggIdxs = append(h.nonDistinctAggIdxs, aggIdx) + } + } + var vecIdxsToConvert []int + for _, aggIdx := range distinctAggIdxs { + for _, aggCol := range spec.Aggregations[aggIdx].ColIdx { + found := false + for _, vecIdx := range vecIdxsToConvert { + if vecIdx == int(aggCol) { + found = true + break + } + } + if !found { + vecIdxsToConvert = append(vecIdxsToConvert, int(aggCol)) + } + } + } + h.aggColsConverter = newVecToDatumConverter(len(inputTypes), vecIdxsToConvert) + h.scratch.converted = []tree.Datum{nil} + h.scratch.sel = make([]int, coldata.BatchSize()) + return h +} + +func (h *filteringDistinctHashAggregatorHelper) makeSeenMaps() []map[string]struct{} { + seen := h.seenAlloc.newSeenMapsSlice() + for i := range h.distinctAggIdxs { + seen[i] = make(map[string]struct{}) + } + return seen +} + +// selectDistinctTuples returns new selection vector that contains only tuples +// that haven't been seen by the aggregate function yet when the function +// performs DISTINCT aggregation. aggColsConverter must have already done the +// conversion of the relevant aggregate columns *without* deselection. This +// function assumes that seen map is non-nil and is the same that is used for +// all batches from the same aggregation group. +func (h *filteringDistinctHashAggregatorHelper) selectDistinctTuples( + ctx context.Context, inputLen int, sel []int, aggFnIdx int, seen map[string]struct{}, +) (newLen int, newSel []int) { + newSel = h.scratch.sel + var ( + tupleIdx int + err error + s string + ) + for idx := 0; idx < inputLen; idx++ { + h.scratch.encoded = h.scratch.encoded[:0] + tupleIdx = idx + if sel != nil { + tupleIdx = sel[idx] + } + for _, colIdx := range h.spec.Aggregations[aggFnIdx].ColIdx { + h.scratch.ed.Datum = h.aggColsConverter.getDatumColumn(int(colIdx))[tupleIdx] + h.scratch.encoded, err = h.scratch.ed.Fingerprint( + h.inputTypes[colIdx], h.datumAlloc, h.scratch.encoded, + ) + if err != nil { + colexecerror.InternalError(err) + } + } + if _, seenPreviously := seen[string(h.scratch.encoded)]; !seenPreviously { + s, err = h.arena.AllocBytes(ctx, h.scratch.encoded) + if err != nil { + colexecerror.InternalError(err) + } + seen[s] = struct{}{} + newSel[newLen] = tupleIdx + newLen++ + } + } + return +} + +// performAggregation executes Compute on all fns paying attention to distinct +// tuples if the corresponding function performs DISTINCT aggregation (as well +// as to any present FILTER clauses). For such functions the approach is as +// follows: +// 1. store the input state because we will be modifying some of it +// 2. convert all aggregate columns of functions that perform DISTINCT +// aggregation +// 3. for every function: +// 1) apply the filter to the selection vector of the input +// 2) update the (possibly updated) selection vector to include only tuples +// we haven't yet seen making sure to remember that new tuples we have +// just seen +// 3) execute Compute on the updated state +// 4) restore the state to the original state (if it might have been +// modified). +func (h *filteringDistinctHashAggregatorHelper) performAggregation( + ctx context.Context, + vecs []coldata.Vec, + inputLen int, + sel []int, + fns []aggregateFunc, + seen []map[string]struct{}, +) { + h.saveState(vecs, inputLen, sel) + h.aggColsConverter.convertVecs(vecs, inputLen, sel) + var maybeModified bool + // First compute all non-distinct aggregations. + for _, fnIdx := range h.nonDistinctAggIdxs { + vecs, inputLen, sel, maybeModified = h.filters[fnIdx].applyFilter(ctx, vecs, inputLen, sel) + if inputLen > 0 { + fns[fnIdx].Compute(vecs, h.spec.Aggregations[fnIdx].ColIdx, inputLen, sel) + } + if maybeModified { + vecs, inputLen, sel = h.restoreState() + } + } + // Now compute all distinct aggregations restoring the state after each one. + for distinctAggSlot, fnIdx := range h.distinctAggIdxs { + vecs, inputLen, sel, _ = h.filters[fnIdx].applyFilter(ctx, vecs, inputLen, sel) + if inputLen > 0 { + inputLen, sel = h.selectDistinctTuples(ctx, inputLen, sel, fnIdx, seen[distinctAggSlot]) + if inputLen > 0 { + fns[fnIdx].Compute(vecs, h.spec.Aggregations[fnIdx].ColIdx, inputLen, sel) + } + } + vecs, inputLen, sel = h.restoreState() + } +} + +// seenMapsAlloc is a utility struct that batches allocations of seen map +// slices. +type seenMapsAlloc struct { + allocator *colmem.Allocator + numDistinctAggFns int + newAllocCount int + newAllocMemSize int64 + buf []map[string]struct{} +} + +func newSeenMapsAlloc(allocator *colmem.Allocator, numDistinctAggFns int) *seenMapsAlloc { + return &seenMapsAlloc{ + allocator: allocator, + numDistinctAggFns: numDistinctAggFns, + newAllocCount: numDistinctAggFns * hashAggregatorAllocSize, + newAllocMemSize: int64(numDistinctAggFns * hashAggregatorAllocSize * int(sizeOfSeenMap)), + } +} + +const sizeOfSeenMap = unsafe.Sizeof(map[string]struct{}{}) + +func (a *seenMapsAlloc) newSeenMapsSlice() []map[string]struct{} { + if len(a.buf) == 0 { + a.allocator.AdjustMemoryUsage(a.newAllocMemSize) + a.buf = make([]map[string]struct{}, a.newAllocCount) + } + ret := a.buf[0:a.numDistinctAggFns] + a.buf = a.buf[a.numDistinctAggFns:] + return ret +} + +// singleBatchOperator is a helper colexecbase.Operator that returns the +// provided vectors as a batch on the first call to Next() and zero batch on +// all consequent calls (until it is reset). It must be reset before it can be +// used for the first time. +type singleBatchOperator struct { + colexecbase.ZeroInputNode + NonExplainable + + nexted bool + batch coldata.Batch +} + +var _ colexecbase.Operator = &singleBatchOperator{} + +func newSingleBatchOperator(allocator *colmem.Allocator, typs []*types.T) *singleBatchOperator { + return &singleBatchOperator{ + batch: allocator.NewMemBatchNoCols(typs, coldata.BatchSize()), + } +} + +func (o *singleBatchOperator) Init() {} + +func (o *singleBatchOperator) Next(context.Context) coldata.Batch { + if o.nexted { + return coldata.ZeroBatch + } + o.nexted = true + return o.batch +} + +func (o *singleBatchOperator) reset(vecs []coldata.Vec, inputLen int, sel []int) { + o.nexted = false + for i, vec := range vecs { + o.batch.ReplaceCol(vec, i) + } + o.batch.SetLength(inputLen) + o.batch.SetSelection(sel != nil) + if sel != nil { + copy(o.batch.Selection(), sel[:inputLen]) + } +} diff --git a/pkg/sql/colexec/ordered_aggregator.go b/pkg/sql/colexec/ordered_aggregator.go index d58e187ba137..54d4ca2ed3a4 100644 --- a/pkg/sql/colexec/ordered_aggregator.go +++ b/pkg/sql/colexec/ordered_aggregator.go @@ -107,6 +107,14 @@ func NewOrderedAggregator( outputTypes []*types.T, isScalar bool, ) (colexecbase.Operator, error) { + for _, aggFn := range spec.Aggregations { + if aggFn.Distinct { + return nil, errors.AssertionFailedf("distinct ordered aggregation is not supported") + } + if aggFn.FilterColIdx != nil { + return nil, errors.AssertionFailedf("filtering ordered aggregation is not supported") + } + } op, groupCol, err := OrderedDistinctColsToOperators(input, spec.GroupCols, inputTypes) if err != nil { return nil, err diff --git a/pkg/sql/colexec/vec_to_datum.eg.go b/pkg/sql/colexec/vec_to_datum.eg.go index 1f2ef66b2fbb..d426c032c68a 100644 --- a/pkg/sql/colexec/vec_to_datum.eg.go +++ b/pkg/sql/colexec/vec_to_datum.eg.go @@ -64,29 +64,32 @@ func newVecToDatumConverter(batchWidth int, vecIdxsToConvert []int) *vecToDatumC // getDatumColumn(colIdx)[tupleIdx] and *NOT* // getDatumColumn(colIdx)[sel[tupleIdx]]. func (c *vecToDatumConverter) convertBatchAndDeselect(batch coldata.Batch) { + c.convertVecsAndDeselect(batch.ColVecs(), batch.Length(), batch.Selection()) +} + +// convertVecsAndDeselect converts the selected vectors from vecs while +// performing a deselection step. +func (c *vecToDatumConverter) convertVecsAndDeselect(vecs []coldata.Vec, inputLen int, sel []int) { if len(c.vecIdxsToConvert) == 0 { // No vectors were selected for conversion, so there is nothing to do. return } - batchLength := batch.Length() // Ensure that convertedVecs are of sufficient length. - if cap(c.convertedVecs[c.vecIdxsToConvert[0]]) < batchLength { + if cap(c.convertedVecs[c.vecIdxsToConvert[0]]) < inputLen { for _, vecIdx := range c.vecIdxsToConvert { - c.convertedVecs[vecIdx] = make([]tree.Datum, batchLength) + c.convertedVecs[vecIdx] = make([]tree.Datum, inputLen) } // Adjust the datum alloc according to the length of the batch since // this batch is the longest we've seen so far. - c.da.AllocSize = batchLength + c.da.AllocSize = inputLen } else { for _, vecIdx := range c.vecIdxsToConvert { - c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:batchLength] + c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:inputLen] } } - sel := batch.Selection() - vecs := batch.ColVecs() for _, vecIdx := range c.vecIdxsToConvert { ColVecToDatumAndDeselect( - c.convertedVecs[vecIdx], vecs[vecIdx], batchLength, sel, &c.da, + c.convertedVecs[vecIdx], vecs[vecIdx], inputLen, sel, &c.da, ) } } @@ -94,19 +97,23 @@ func (c *vecToDatumConverter) convertBatchAndDeselect(batch coldata.Batch) { // convertBatch converts the selected vectors from the batch *without* // performing a deselection step. // NOTE: converted columns are "sparse" in regards to the selection vector - if -// there was a selection vector on the batch, only elements that were selected -// are converted, but the results are put at position sel[tupleIdx], so use +// there was a selection vector, only elements that were selected are +// converted, but the results are put at position sel[tupleIdx], so use // getDatumColumn(colIdx)[sel[tupleIdx]] and *NOT* // getDatumColumn(colIdx)[tupleIdx]. func (c *vecToDatumConverter) convertBatch(batch coldata.Batch) { + c.convertVecs(batch.ColVecs(), batch.Length(), batch.Selection()) +} + +// convertVecs converts the selected vectors from vecs *without* performing a +// deselection step. +func (c *vecToDatumConverter) convertVecs(vecs []coldata.Vec, inputLen int, sel []int) { if len(c.vecIdxsToConvert) == 0 { // No vectors were selected for conversion, so there is nothing to do. return } - batchLength := batch.Length() - sel := batch.Selection() // Ensure that convertedVecs are of sufficient length. - requiredLength := batchLength + requiredLength := inputLen if sel != nil { // When sel is non-nil, it might be something like sel = [1023], so we // need to allocate up to the full coldata.BatchSize(), regardless of @@ -125,10 +132,9 @@ func (c *vecToDatumConverter) convertBatch(batch coldata.Batch) { c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:requiredLength] } } - vecs := batch.ColVecs() for _, vecIdx := range c.vecIdxsToConvert { ColVecToDatum( - c.convertedVecs[vecIdx], vecs[vecIdx], batchLength, sel, &c.da, + c.convertedVecs[vecIdx], vecs[vecIdx], inputLen, sel, &c.da, ) } } diff --git a/pkg/sql/colexec/vec_to_datum_tmpl.go b/pkg/sql/colexec/vec_to_datum_tmpl.go index f305f3bb8b39..dd680ab8f5ae 100644 --- a/pkg/sql/colexec/vec_to_datum_tmpl.go +++ b/pkg/sql/colexec/vec_to_datum_tmpl.go @@ -67,29 +67,32 @@ func newVecToDatumConverter(batchWidth int, vecIdxsToConvert []int) *vecToDatumC // getDatumColumn(colIdx)[tupleIdx] and *NOT* // getDatumColumn(colIdx)[sel[tupleIdx]]. func (c *vecToDatumConverter) convertBatchAndDeselect(batch coldata.Batch) { + c.convertVecsAndDeselect(batch.ColVecs(), batch.Length(), batch.Selection()) +} + +// convertVecsAndDeselect converts the selected vectors from vecs while +// performing a deselection step. +func (c *vecToDatumConverter) convertVecsAndDeselect(vecs []coldata.Vec, inputLen int, sel []int) { if len(c.vecIdxsToConvert) == 0 { // No vectors were selected for conversion, so there is nothing to do. return } - batchLength := batch.Length() // Ensure that convertedVecs are of sufficient length. - if cap(c.convertedVecs[c.vecIdxsToConvert[0]]) < batchLength { + if cap(c.convertedVecs[c.vecIdxsToConvert[0]]) < inputLen { for _, vecIdx := range c.vecIdxsToConvert { - c.convertedVecs[vecIdx] = make([]tree.Datum, batchLength) + c.convertedVecs[vecIdx] = make([]tree.Datum, inputLen) } // Adjust the datum alloc according to the length of the batch since // this batch is the longest we've seen so far. - c.da.AllocSize = batchLength + c.da.AllocSize = inputLen } else { for _, vecIdx := range c.vecIdxsToConvert { - c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:batchLength] + c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:inputLen] } } - sel := batch.Selection() - vecs := batch.ColVecs() for _, vecIdx := range c.vecIdxsToConvert { ColVecToDatumAndDeselect( - c.convertedVecs[vecIdx], vecs[vecIdx], batchLength, sel, &c.da, + c.convertedVecs[vecIdx], vecs[vecIdx], inputLen, sel, &c.da, ) } } @@ -97,19 +100,23 @@ func (c *vecToDatumConverter) convertBatchAndDeselect(batch coldata.Batch) { // convertBatch converts the selected vectors from the batch *without* // performing a deselection step. // NOTE: converted columns are "sparse" in regards to the selection vector - if -// there was a selection vector on the batch, only elements that were selected -// are converted, but the results are put at position sel[tupleIdx], so use +// there was a selection vector, only elements that were selected are +// converted, but the results are put at position sel[tupleIdx], so use // getDatumColumn(colIdx)[sel[tupleIdx]] and *NOT* // getDatumColumn(colIdx)[tupleIdx]. func (c *vecToDatumConverter) convertBatch(batch coldata.Batch) { + c.convertVecs(batch.ColVecs(), batch.Length(), batch.Selection()) +} + +// convertVecs converts the selected vectors from vecs *without* performing a +// deselection step. +func (c *vecToDatumConverter) convertVecs(vecs []coldata.Vec, inputLen int, sel []int) { if len(c.vecIdxsToConvert) == 0 { // No vectors were selected for conversion, so there is nothing to do. return } - batchLength := batch.Length() - sel := batch.Selection() // Ensure that convertedVecs are of sufficient length. - requiredLength := batchLength + requiredLength := inputLen if sel != nil { // When sel is non-nil, it might be something like sel = [1023], so we // need to allocate up to the full coldata.BatchSize(), regardless of @@ -128,10 +135,9 @@ func (c *vecToDatumConverter) convertBatch(batch coldata.Batch) { c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:requiredLength] } } - vecs := batch.ColVecs() for _, vecIdx := range c.vecIdxsToConvert { ColVecToDatum( - c.convertedVecs[vecIdx], vecs[vecIdx], batchLength, sel, &c.da, + c.convertedVecs[vecIdx], vecs[vecIdx], inputLen, sel, &c.da, ) } } diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index 5ce0a815f653..f72736961682 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -238,6 +238,11 @@ func (a *Allocator) PerformOperation(destVecs []coldata.Vec, operation func()) { a.AdjustMemoryUsage(after - before) } +// GetAccount returns the memory account that this allocator is working with. +func (a *Allocator) GetAccount() *mon.BoundAccount { + return a.acc +} + // Used returns the number of bytes currently allocated through this allocator. func (a *Allocator) Used() int64 { return a.acc.Used() diff --git a/pkg/sql/distsql/columnar_operators_test.go b/pkg/sql/distsql/columnar_operators_test.go index f9a9daff6b6a..654e740cb6be 100644 --- a/pkg/sql/distsql/columnar_operators_test.go +++ b/pkg/sql/distsql/columnar_operators_test.go @@ -131,139 +131,184 @@ func TestAggregatorAgainstProcessor(t *testing.T) { aggregations = append(aggregations, execinfrapb.AggregatorSpec_Aggregation{Func: aggFn}) } for _, hashAgg := range []bool{false, true} { - for numGroupingCols := 1; numGroupingCols <= maxNumGroupingCols; numGroupingCols++ { - // We will be grouping based on the first numGroupingCols columns - // (which will be of INT types) with the values for the columns set - // manually below. - inputTypes := make([]*types.T, 0, numGroupingCols+len(aggregations)) - for i := 0; i < numGroupingCols; i++ { - inputTypes = append(inputTypes, types.Int) + filteringAggOptions := []bool{false} + if hashAgg { + // We currently support filtering aggregation only for hash + // aggregator. + filteringAggOptions = []bool{false, true} + } + for _, filteringAgg := range filteringAggOptions { + numFilteringCols := 0 + if filteringAgg { + numFilteringCols = 1 } - // After all grouping columns, we will have input columns for each - // of the aggregate functions. Here, we will set up the column - // indices, and the types will be regenerated below - numColsSoFar := numGroupingCols - for i := range aggregations { - numArguments := aggregateFuncToNumArguments[aggregations[i].Func] - aggregations[i].ColIdx = make([]uint32, numArguments) - for j := range aggregations[i].ColIdx { - aggregations[i].ColIdx[j] = uint32(numColsSoFar) - numColsSoFar++ + for numGroupingCols := 1; numGroupingCols <= maxNumGroupingCols; numGroupingCols++ { + // We will be grouping based on the first numGroupingCols columns + // (which will be of INT types) with the values for the columns set + // manually below. + numUtilityCols := numGroupingCols + numFilteringCols + inputTypes := make([]*types.T, 0, numUtilityCols+len(aggregations)) + for i := 0; i < numGroupingCols; i++ { + inputTypes = append(inputTypes, types.Int) } - } - outputTypes := make([]*types.T, len(aggregations)) - - for run := 0; run < nRuns; run++ { - inputTypes = inputTypes[:numGroupingCols] - var rows sqlbase.EncDatumRows + // Check whether we want to add a column for FILTER clause. + var filteringColIdx uint32 + if filteringAgg { + filteringColIdx = uint32(len(inputTypes)) + inputTypes = append(inputTypes, types.Bool) + } + // After all utility columns, we will have input columns for each + // of the aggregate functions. Here, we will set up the column + // indices, and the types will be generated below. + numColsSoFar := numUtilityCols for i := range aggregations { - aggFn := aggregations[i].Func - aggFnInputTypes := make([]*types.T, len(aggregations[i].ColIdx)) - for { - for j := range aggFnInputTypes { - aggFnInputTypes[j] = sqlbase.RandType(rng) - } - // There is a special case for concat_agg, string_agg, - // and st_makeline when at least one argument is a - // tuple. Such cases pass GetAggregateInfo check below, - // but they are actually invalid, and during normal - // execution it is caught during type-checking. - // However, we don't want to do fully-fledged type - // checking, so we hard-code an exception here. - invalid := false - switch aggFn { - case execinfrapb.AggregatorSpec_CONCAT_AGG, - execinfrapb.AggregatorSpec_STRING_AGG, - execinfrapb.AggregatorSpec_ST_MAKELINE: - for _, typ := range aggFnInputTypes { - if typ.Family() == types.TupleFamily { - invalid = true - break + numArguments := aggregateFuncToNumArguments[aggregations[i].Func] + aggregations[i].ColIdx = make([]uint32, numArguments) + for j := range aggregations[i].ColIdx { + aggregations[i].ColIdx[j] = uint32(numColsSoFar) + numColsSoFar++ + } + } + outputTypes := make([]*types.T, len(aggregations)) + + for run := 0; run < nRuns; run++ { + inputTypes = inputTypes[:numUtilityCols] + var rows sqlbase.EncDatumRows + hasJSONColumn := false + for i := range aggregations { + aggFn := aggregations[i].Func + aggFnInputTypes := make([]*types.T, len(aggregations[i].ColIdx)) + for { + for j := range aggFnInputTypes { + aggFnInputTypes[j] = sqlbase.RandType(rng) + } + // There is a special case for concat_agg, string_agg, + // and st_makeline when at least one argument is a + // tuple. Such cases pass GetAggregateInfo check below, + // but they are actually invalid, and during normal + // execution it is caught during type-checking. + // However, we don't want to do fully-fledged type + // checking, so we hard-code an exception here. + invalid := false + switch aggFn { + case execinfrapb.AggregatorSpec_CONCAT_AGG, + execinfrapb.AggregatorSpec_STRING_AGG, + execinfrapb.AggregatorSpec_ST_MAKELINE: + for _, typ := range aggFnInputTypes { + if typ.Family() == types.TupleFamily { + invalid = true + break + } } } + if invalid { + continue + } + for _, typ := range aggFnInputTypes { + hasJSONColumn = hasJSONColumn || typ.Family() == types.JsonFamily + } + if _, outputType, err := execinfrapb.GetAggregateInfo(aggFn, aggFnInputTypes...); err == nil { + outputTypes[i] = outputType + break + } } - if invalid { - continue - } - if _, outputType, err := execinfrapb.GetAggregateInfo(aggFn, aggFnInputTypes...); err == nil { - outputTypes[i] = outputType - break - } + inputTypes = append(inputTypes, aggFnInputTypes...) } - inputTypes = append(inputTypes, aggFnInputTypes...) - } - rows = sqlbase.RandEncDatumRowsOfTypes(rng, nRows, inputTypes) - groupIdx := 0 - for _, row := range rows { - for i := 0; i < numGroupingCols; i++ { - if rng.Float64() < nullProbability { - row[i] = sqlbase.EncDatum{Datum: tree.DNull} - } else { - row[i] = sqlbase.EncDatum{Datum: tree.NewDInt(tree.DInt(groupIdx))} - if rng.Float64() < nextGroupProb { - groupIdx++ + rows = sqlbase.RandEncDatumRowsOfTypes(rng, nRows, inputTypes) + groupIdx := 0 + for _, row := range rows { + for i := 0; i < numGroupingCols; i++ { + if rng.Float64() < nullProbability { + row[i] = sqlbase.EncDatum{Datum: tree.DNull} + } else { + row[i] = sqlbase.EncDatum{Datum: tree.NewDInt(tree.DInt(groupIdx))} + if rng.Float64() < nextGroupProb { + groupIdx++ + } } } } - } - aggregatorSpec := &execinfrapb.AggregatorSpec{ - Type: execinfrapb.AggregatorSpec_NON_SCALAR, - GroupCols: groupingCols[:numGroupingCols], - Aggregations: aggregations, - } - if hashAgg { - // Let's shuffle the rows for the hash aggregator. - rand.Shuffle(nRows, func(i, j int) { - rows[i], rows[j] = rows[j], rows[i] - }) - } else { - aggregatorSpec.OrderedGroupCols = groupingCols[:numGroupingCols] - orderedCols := execinfrapb.ConvertToColumnOrdering( - execinfrapb.Ordering{Columns: orderingCols[:numGroupingCols]}, - ) - // Although we build the input rows in "non-decreasing" order, it is - // possible that some NULL values are present here and there, so we - // need to sort the rows to satisfy the ordering conditions. - sort.Slice(rows, func(i, j int) bool { - cmp, err := rows[i].Compare(inputTypes, &da, orderedCols, &evalCtx, rows[j]) - if err != nil { - t.Fatal(err) + // Update the specifications of aggregate functions to + // possibly include DISTINCT and/or FILTER clauses. + for _, aggFn := range aggregations { + distinctProb := 0.5 + if !hashAgg { + // We currently support distinct aggregation only + // for hash aggregator. + distinctProb = 0 + } + if hasJSONColumn { + // We currently cannot encode json columns, so we + // don't support distinct aggregation in both + // row-by-row and vectorized engines. + distinctProb = 0 + } + aggFn.Distinct = rng.Float64() < distinctProb + if filteringAgg { + aggFn.FilterColIdx = &filteringColIdx + } else { + aggFn.FilterColIdx = nil } - return cmp < 0 - }) - } - pspec := &execinfrapb.ProcessorSpec{ - Input: []execinfrapb.InputSyncSpec{{ColumnTypes: inputTypes}}, - Core: execinfrapb.ProcessorCoreUnion{Aggregator: aggregatorSpec}, - } - args := verifyColOperatorArgs{ - anyOrder: hashAgg, - inputTypes: [][]*types.T{inputTypes}, - inputs: []sqlbase.EncDatumRows{rows}, - outputTypes: outputTypes, - pspec: pspec, - } - if err := verifyColOperator(args); err != nil { - if strings.Contains(err.Error(), "different errors returned") { - // Columnar and row-based aggregators are likely to hit - // different errors, and we will swallow those and move - // on. - continue } - fmt.Printf("--- seed = %d run = %d hash = %t ---\n", - seed, run, hashAgg) - var aggFnNames string - for i, agg := range aggregations { - if i > 0 { - aggFnNames += " " + aggregatorSpec := &execinfrapb.AggregatorSpec{ + Type: execinfrapb.AggregatorSpec_NON_SCALAR, + GroupCols: groupingCols[:numGroupingCols], + Aggregations: aggregations, + } + if hashAgg { + // Let's shuffle the rows for the hash aggregator. + rand.Shuffle(nRows, func(i, j int) { + rows[i], rows[j] = rows[j], rows[i] + }) + } else { + aggregatorSpec.OrderedGroupCols = groupingCols[:numGroupingCols] + orderedCols := execinfrapb.ConvertToColumnOrdering( + execinfrapb.Ordering{Columns: orderingCols[:numGroupingCols]}, + ) + // Although we build the input rows in "non-decreasing" order, it is + // possible that some NULL values are present here and there, so we + // need to sort the rows to satisfy the ordering conditions. + sort.Slice(rows, func(i, j int) bool { + cmp, err := rows[i].Compare(inputTypes, &da, orderedCols, &evalCtx, rows[j]) + if err != nil { + t.Fatal(err) + } + return cmp < 0 + }) + } + pspec := &execinfrapb.ProcessorSpec{ + Input: []execinfrapb.InputSyncSpec{{ColumnTypes: inputTypes}}, + Core: execinfrapb.ProcessorCoreUnion{Aggregator: aggregatorSpec}, + } + args := verifyColOperatorArgs{ + anyOrder: hashAgg, + inputTypes: [][]*types.T{inputTypes}, + inputs: []sqlbase.EncDatumRows{rows}, + outputTypes: outputTypes, + pspec: pspec, + } + if err := verifyColOperator(args); err != nil { + if strings.Contains(err.Error(), "different errors returned") { + // Columnar and row-based aggregators are likely to hit + // different errors, and we will swallow those and move + // on. + continue + } + fmt.Printf("--- seed = %d run = %d filter = %t hash = %t ---\n", + seed, run, filteringAgg, hashAgg) + var aggFnNames string + for i, agg := range aggregations { + if i > 0 { + aggFnNames += " " + } + aggFnNames += agg.Func.String() } - aggFnNames += agg.Func.String() + fmt.Printf("--- %s ---\n", aggFnNames) + prettyPrintTypes(inputTypes, "t" /* tableName */) + prettyPrintInput(rows, inputTypes, "t" /* tableName */) + t.Fatal(err) } - fmt.Printf("--- %s ---\n", aggFnNames) - prettyPrintTypes(inputTypes, "t" /* tableName */) - prettyPrintInput(rows, inputTypes, "t" /* tableName */) - t.Fatal(err) } } }