Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: add support for DISTINCT and FILTER hash aggregation #50721

Merged
merged 1 commit into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/sql/colexec/aggregate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ func newAggregateFuncsAlloc(
}

// sizeOfAggregateFunc is the size of some aggregateFunc implementation.
// countAgg was chosen arbitrarily, but it's important that we use a pointer to
// the aggregate function struct.
// countHashAgg was chosen arbitrarily, but it's important that we use a
// pointer to the aggregate function struct.
const sizeOfAggregateFunc = int64(unsafe.Sizeof(&countHashAgg{}))

func (a *aggregateFuncsAlloc) makeAggregateFuncs() []aggregateFunc {
Expand All @@ -319,7 +319,7 @@ func (a *aggregateFuncsAlloc) makeAggregateFuncs() []aggregateFunc {
// of 'allocSize x number of funcs in schema' length. Every
// aggFuncAlloc will allocate allocSize of objects on the newAggFunc
// call below.
a.allocator.AdjustMemoryUsage(sizeOfAggregateFunc * a.allocSize)
a.allocator.AdjustMemoryUsage(sizeOfAggregateFunc * int64(len(a.aggFuncAllocs)) * a.allocSize)
a.returnFuncs = make([]aggregateFunc, len(a.aggFuncAllocs)*int(a.allocSize))
}
funcs := a.returnFuncs[:len(a.aggFuncAllocs)]
Expand Down
235 changes: 232 additions & 3 deletions pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -51,6 +52,8 @@ type aggregatorTestCase struct {
constArguments [][]execinfrapb.Expression
// spec will be populated during init().
spec *execinfrapb.AggregatorSpec
aggDistinct []bool
aggFilter []int
input tuples
unorderedInput bool
expected tuples
Expand Down Expand Up @@ -95,7 +98,7 @@ var aggTypes = []aggType{
outputTypes []*types.T,
_ bool,
) (colexecbase.Operator, error) {
return NewHashAggregator(allocator, input, inputTypes, spec, evalCtx, constructors, constArguments, outputTypes)
return NewHashAggregator(allocator, testMemAcc, input, inputTypes, spec, evalCtx, constructors, constArguments, outputTypes)
},
name: "hash",
},
Expand Down Expand Up @@ -154,6 +157,13 @@ func (tc *aggregatorTestCase) init() error {
if tc.constArguments != nil {
aggregations[i].Arguments = tc.constArguments[i]
}
if tc.aggDistinct != nil {
aggregations[i].Distinct = tc.aggDistinct[i]
}
if tc.aggFilter != nil && tc.aggFilter[i] != tree.NoColumnIdx {
filterColIdx := uint32(tc.aggFilter[i])
aggregations[i].FilterColIdx = &filterColIdx
}
}
tc.spec = &execinfrapb.AggregatorSpec{
GroupCols: tc.groupCols,
Expand Down Expand Up @@ -679,12 +689,133 @@ func TestAggregatorAllFunctions(t *testing.T) {
},
convToDecimal: true,
},

// Test DISTINCT aggregation.
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_SUM_INT,
execinfrapb.AggregatorSpec_SUM_INT,
},
aggCols: [][]uint32{{0}, {1}, {1}, {1}, {1}},
aggDistinct: []bool{false, false, true, false, true},
typs: []*types.T{types.Int, types.Int},
input: tuples{
{0, 1},
{0, 2},
{0, 2},
{0, nil},
{0, 1},
{0, nil},
{1, 1},
{1, 2},
{1, 2},
},
expected: tuples{
{0, 4, 2, 6, 3},
{1, 3, 2, 5, 3},
},
},

// Test aggregation with FILTERs.
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_COUNT_ROWS,
execinfrapb.AggregatorSpec_SUM_INT,
},
aggCols: [][]uint32{{0}, {}, {1}},
aggFilter: []int{tree.NoColumnIdx, 2, 2},
typs: []*types.T{types.Int, types.Int, types.Bool},
input: tuples{
{0, 1, false},
{0, 2, true},
{0, 2, true},
{0, nil, nil},
{0, 1, nil},
{0, nil, true},
{1, 1, true},
{1, 2, nil},
{1, 2, true},
},
expected: tuples{
{0, 3, 4},
{1, 2, 3},
},
},

// Test aggregation with FILTERs when the whole groups are filtered out.
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_COUNT_ROWS,
execinfrapb.AggregatorSpec_SUM_INT,
},
aggCols: [][]uint32{{0}, {}, {1}},
aggFilter: []int{tree.NoColumnIdx, 2, 2},
typs: []*types.T{types.Int, types.Int, types.Bool},
input: tuples{
{0, 1, false},
{0, nil, nil},
{0, 2, false},
{1, 1, true},
{1, 2, nil},
{1, 2, true},
{2, 1, false},
{2, nil, nil},
{2, 2, nil},
},
expected: tuples{
{0, 0, nil},
{1, 2, 3},
{2, 0, nil},
},
},

// Test aggregation with FILTERs and DISTINCTs intertwined.
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_SUM_INT,
execinfrapb.AggregatorSpec_SUM_INT,
execinfrapb.AggregatorSpec_SUM_INT,
},
aggCols: [][]uint32{{0}, {1}, {1}, {1}, {1}, {1}, {1}},
aggDistinct: []bool{false, false, true, true, false, true, true},
aggFilter: []int{tree.NoColumnIdx, 2, tree.NoColumnIdx, 2, 2, tree.NoColumnIdx, 2},
typs: []*types.T{types.Int, types.Int, types.Bool},
input: tuples{
{0, 1, false},
{0, 2, true},
{0, 2, true},
{0, nil, nil},
{0, 1, nil},
{0, nil, true},
{1, 1, true},
{1, 2, nil},
{1, 2, true},
},
expected: tuples{
{0, 2, 2, 1, 4, 3, 2},
{1, 2, 2, 2, 3, 3, 3},
},
},
}

evalCtx := tree.MakeTestingEvalContext(cluster.MakeTestingClusterSettings())
defer evalCtx.Stop(context.Background())
for _, agg := range aggTypes {
for i, tc := range testCases {
if agg.name != "hash" && (tc.aggDistinct != nil || tc.aggFilter != nil) {
// Distinct or filtering aggregation is only supported with
// hash aggregator.
continue
}
log.Infof(context.Background(), "%s/%d", agg.name, i)
if err := tc.init(); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -993,6 +1124,104 @@ func BenchmarkAllOptimizedAggregateFunctions(b *testing.B) {
}
}

func BenchmarkDistinctAggregation(b *testing.B) {
rng, _ := randutil.NewPseudoRand()
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
}

typs := []*types.T{types.Int, types.Int}
aggFn := execinfrapb.AggregatorSpec_COUNT
aggSpec := &execinfrapb.AggregatorSpec{
Type: execinfrapb.AggregatorSpec_NON_SCALAR,
GroupCols: []uint32{0},
// TODO(yuzefovich): adjust the spec once we support distinct ordered
// aggregation.
Aggregations: []execinfrapb.AggregatorSpec_Aggregation{{
Func: aggFn,
Distinct: true,
ColIdx: []uint32{1},
}},
}
spec := &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: typs}},
Core: execinfrapb.ProcessorCoreUnion{
Aggregator: aggSpec,
},
}
args := &NewColOperatorArgs{
Spec: spec,
StreamingMemAccount: testMemAcc,
}
args.TestingKnobs.UseStreamingMemAccountForBuffering = true

for _, groupSize := range []int{1, 2, 32, 128, coldata.BatchSize() / 2, coldata.BatchSize()} {
for _, distinctProbability := range []float64{0.01, 0.1, 1.0} {
distinctModulo := int(1.0 / distinctProbability)
if (groupSize == 1 && distinctProbability != 1.0) || float64(groupSize)/float64(distinctModulo) < 0.1 {
// We have a such combination of groupSize and
// distinctProbability parameters that we will be very
// unlikely to satisfy them (for example, with groupSize=1
// and distinctProbability=0.01, every value will be
// distinct within the group), so we skip such
// configuration.
continue
}
for _, hasNulls := range []bool{false, true} {
for _, numInputBatches := range []int{64} {
// TODO(yuzefovich): refactor benchmarkAggregateFunction to
// be more configurable and reuse it here.
b.Run(fmt.Sprintf("%s/groupSize=%d/distinctProb=%.2f/nulls=%t",
aggFn, groupSize, distinctProbability, hasNulls),
func(b *testing.B) {
nTuples := numInputBatches * coldata.BatchSize()
cols := []coldata.Vec{
testAllocator.NewMemColumn(typs[0], nTuples),
testAllocator.NewMemColumn(typs[1], nTuples),
}
groups := cols[0].Int64()
vals := cols[1].Int64()
nGroups := nTuples / groupSize
for i := 0; i < nTuples; i++ {
groups[i] = int64(rng.Intn(nGroups))
vals[i] = int64(rng.Intn(distinctModulo))
if hasNulls && rng.Float64() < nullProbability {
cols[1].Nulls().SetNull(i)
}
}
source := newChunkingBatchSource(typs, cols, nTuples)
args.Inputs = []colexecbase.Operator{source}
r, err := TestNewColOperator(ctx, flowCtx, args)
if err != nil {
b.Fatal(err)
}

a := r.Op
a.Init()
b.ResetTimer()
// Only count the aggregation column.
b.SetBytes(int64(8 * nTuples))
for i := 0; i < b.N; i++ {
// Exhaust aggregator until all batches have been read.
for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) {
}
a.(ResettableOperator).reset(ctx)
}
},
)
}
}
}
}
}

func TestHashAggregator(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1116,8 +1345,8 @@ func TestHashAggregator(t *testing.T) {
log.Infof(context.Background(), "numOfHashBuckets=%d", numOfHashBuckets)
runTests(t, []tuples{tc.input}, tc.expected, unorderedVerifier, func(sources []colexecbase.Operator) (colexecbase.Operator, error) {
a, err := NewHashAggregator(
testAllocator, sources[0], tc.typs, tc.spec, &evalCtx,
constructors, constArguments, outputTypes,
testAllocator, testMemAcc, sources[0], tc.typs, tc.spec,
&evalCtx, constructors, constArguments, outputTypes,
)
a.(*hashAggregator).testingKnobs.numOfHashBuckets = uint64(numOfHashBuckets)
return a, err
Expand Down
51 changes: 31 additions & 20 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,23 @@ func (r *opResult) resetToState(ctx context.Context, arg colexec.NewColOperatorR
*r.NewColOperatorResult = arg
}

func needHashAggregator(aggSpec *execinfrapb.AggregatorSpec) (bool, error) {
var groupCols, orderedCols util.FastIntSet
for _, col := range aggSpec.OrderedGroupCols {
orderedCols.Add(int(col))
}
for _, col := range aggSpec.GroupCols {
if !orderedCols.Contains(int(col)) {
return true, nil
}
groupCols.Add(int(col))
}
if !orderedCols.SubsetOf(groupCols) {
return false, errors.AssertionFailedf("ordered cols must be a subset of grouping cols")
}
return false, nil
}

// isSupported checks whether we have a columnar operator equivalent to a
// processor described by spec. Note that it doesn't perform any other checks
// (like validity of the number of inputs).
Expand All @@ -153,12 +170,16 @@ func isSupported(mode sessiondata.VectorizeExecMode, spec *execinfrapb.Processor

case core.Aggregator != nil:
aggSpec := core.Aggregator
needHash, err := needHashAggregator(aggSpec)
if err != nil {
return err
}
for _, agg := range aggSpec.Aggregations {
if agg.Distinct {
return errors.Newf("distinct aggregation not supported")
if agg.Distinct && !needHash {
return errors.Newf("distinct ordered aggregation not supported")
}
if agg.FilterColIdx != nil {
return errors.Newf("filtering aggregation not supported")
if agg.FilterColIdx != nil && !needHash {
return errors.Newf("filtering ordered aggregation not supported")
}
}
return nil
Expand Down Expand Up @@ -649,21 +670,11 @@ func NewColOperator(
break
}

var groupCols, orderedCols util.FastIntSet
for _, col := range aggSpec.OrderedGroupCols {
orderedCols.Add(int(col))
}
needHash := false
for _, col := range aggSpec.GroupCols {
if !orderedCols.Contains(int(col)) {
needHash = true
}
groupCols.Add(int(col))
}
if !orderedCols.SubsetOf(groupCols) {
return r, errors.AssertionFailedf("ordered cols must be a subset of grouping cols")
var needHash bool
needHash, err = needHashAggregator(aggSpec)
if err != nil {
return r, err
}

inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes))
copy(inputTypes, spec.Input[0].ColumnTypes)
evalCtx := flowCtx.NewEvalCtx()
Expand Down Expand Up @@ -691,8 +702,8 @@ func NewColOperator(
evalCtx.SingleDatumAggMemAccount = hashAggregatorMemAccount
result.Op, err = colexec.NewHashAggregator(
colmem.NewAllocator(ctx, hashAggregatorMemAccount, factory),
inputs[0], inputTypes, aggSpec, evalCtx, constructors,
constArguments, result.ColumnTypes,
hashAggregatorMemAccount, inputs[0], inputTypes, aggSpec,
evalCtx, constructors, constArguments, result.ColumnTypes,
)
} else {
evalCtx.SingleDatumAggMemAccount = streamingMemAccount
Expand Down
Loading