Skip to content

Commit

Permalink
colexec: implement function concat_agg for vectorized engine
Browse files Browse the repository at this point in the history
This commit implements aggregate function "concat_agg" for vectorized engine

Release note (sql change): vectorized engine now supports aggregate function "concat_agg"
  • Loading branch information
yongyanglai committed Jul 12, 2020
1 parent 8e38c32 commit dd8049d
Show file tree
Hide file tree
Showing 11 changed files with 744 additions and 138 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/hash_any_not_null_agg.eg.go \
pkg/sql/colexec/hash_avg_agg.eg.go \
pkg/sql/colexec/hash_bool_and_or_agg.eg.go \
pkg/sql/colexec/hash_concat_agg.eg.go \
pkg/sql/colexec/hash_count_agg.eg.go \
pkg/sql/colexec/hash_min_max_agg.eg.go \
pkg/sql/colexec/hash_sum_agg.eg.go \
Expand All @@ -879,6 +880,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/ordered_any_not_null_agg.eg.go \
pkg/sql/colexec/ordered_avg_agg.eg.go \
pkg/sql/colexec/ordered_bool_and_or_agg.eg.go \
pkg/sql/colexec/ordered_concat_agg.eg.go \
pkg/sql/colexec/ordered_count_agg.eg.go \
pkg/sql/colexec/ordered_min_max_agg.eg.go \
pkg/sql/colexec/ordered_sum_agg.eg.go \
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/colexec/aggregate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var SupportedAggFns = []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_AVG,
execinfrapb.AggregatorSpec_SUM,
execinfrapb.AggregatorSpec_SUM_INT,
execinfrapb.AggregatorSpec_CONCAT_AGG,
execinfrapb.AggregatorSpec_COUNT_ROWS,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_MIN,
Expand Down Expand Up @@ -147,6 +148,12 @@ func newAggregateFuncsAlloc(
} else {
funcAllocs[i], err = newSumIntOrderedAggAlloc(allocator, aggTyps[i][0], allocSize)
}
case execinfrapb.AggregatorSpec_CONCAT_AGG:
if isHashAgg {
funcAllocs[i] = newConcatHashAggAlloc(allocator, allocSize)
} else {
funcAllocs[i] = newConcatOrderedAggAlloc(allocator, allocSize)
}
case execinfrapb.AggregatorSpec_COUNT_ROWS:
if isHashAgg {
funcAllocs[i] = newCountRowsHashAggAlloc(allocator, allocSize)
Expand Down
66 changes: 49 additions & 17 deletions pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,31 @@ func TestAggregatorMultiFunc(t *testing.T) {
aggCols: [][]uint32{{1}, {2}, {3}, {4}, {5}, {6}},
name: "AVG on all types",
},
{
input: tuples{
{1, "1"},
{1, "2"},
{1, "3"},
{2, nil},
{2, "1"},
{2, "2"},
{3, "1"},
{3, nil},
{3, "2"},
{4, nil},
{4, nil},
},
expected: tuples{
{"123"},
{"12"},
{"12"},
{nil},
},
typs: []*types.T{types.Int, types.Bytes},
aggFns: []execinfrapb.AggregatorSpec_Func{execinfrapb.AggregatorSpec_CONCAT_AGG},
groupCols: []uint32{0},
aggCols: [][]uint32{{1}},
},
}

for _, agg := range aggTypes {
Expand Down Expand Up @@ -559,8 +584,9 @@ func TestAggregatorAllFunctions(t *testing.T) {
execinfrapb.AggregatorSpec_MAX,
execinfrapb.AggregatorSpec_BOOL_AND,
execinfrapb.AggregatorSpec_BOOL_OR,
execinfrapb.AggregatorSpec_CONCAT_AGG,
},
aggCols: [][]uint32{{0}, {4}, {1}, {}, {1}, {1}, {2}, {2}, {2}, {3}, {3}},
aggCols: [][]uint32{{0}, {4}, {1}, {}, {1}, {1}, {2}, {2}, {2}, {3}, {3}, {4}},
typs: []*types.T{types.Int, types.Decimal, types.Int, types.Bool, types.Bytes},
input: tuples{
{0, 3.1, 2, true, "zero"},
Expand All @@ -572,10 +598,10 @@ func TestAggregatorAllFunctions(t *testing.T) {
{3, 5.1, 0, true, "three"},
},
expected: tuples{
{0, "zero", 2.1, 2, 2, 4.2, 5, 2, 3, false, true},
{1, "one", 2.6, 2, 2, 5.2, 1, 0, 1, false, false},
{2, "two", 1.1, 1, 1, 1.1, 1, 1, 1, true, true},
{3, "three", 4.6, 2, 2, 9.2, 0, 0, 0, false, true},
{0, "zero", 2.1, 2, 2, 4.2, 5, 2, 3, false, true, "zerozero"},
{1, "one", 2.6, 2, 2, 5.2, 1, 0, 1, false, false, "oneone"},
{2, "two", 1.1, 1, 1, 1.1, 1, 1, 1, true, true, "two"},
{3, "three", 4.6, 2, 2, 9.2, 0, 0, 0, false, true, "threethree"},
},
convToDecimal: true,
},
Expand All @@ -594,20 +620,21 @@ func TestAggregatorAllFunctions(t *testing.T) {
execinfrapb.AggregatorSpec_AVG,
execinfrapb.AggregatorSpec_BOOL_AND,
execinfrapb.AggregatorSpec_BOOL_OR,
execinfrapb.AggregatorSpec_CONCAT_AGG,
},
aggCols: [][]uint32{{0}, {1}, {}, {1}, {1}, {2}, {2}, {2}, {1}, {3}, {3}},
typs: []*types.T{types.Int, types.Decimal, types.Int, types.Bool},
aggCols: [][]uint32{{0}, {1}, {}, {1}, {1}, {2}, {2}, {2}, {1}, {3}, {3}, {4}},
typs: []*types.T{types.Int, types.Decimal, types.Int, types.Bool, types.Bytes},
input: tuples{
{nil, 1.1, 4, true},
{0, nil, nil, nil},
{0, 3.1, 5, nil},
{1, nil, nil, nil},
{1, nil, nil, false},
{nil, 1.1, 4, true, "a"},
{0, nil, nil, nil, nil},
{0, 3.1, 5, nil, "b"},
{1, nil, nil, nil, nil},
{1, nil, nil, false, nil},
},
expected: tuples{
{nil, 1.1, 1, 1, 1.1, 4, 4, 4, 1.1, true, true},
{0, 3.1, 2, 1, 3.1, 5, 5, 5, 3.1, nil, nil},
{1, nil, 2, 0, nil, nil, nil, nil, nil, false, false},
{nil, 1.1, 1, 1, 1.1, 4, 4, 4, 1.1, true, true, "a"},
{0, 3.1, 2, 1, 3.1, 5, 5, 5, 3.1, nil, nil, "b"},
{1, nil, 2, 0, nil, nil, nil, nil, nil, false, false, nil},
},
convToDecimal: true,
},
Expand Down Expand Up @@ -867,9 +894,14 @@ func BenchmarkAggregator(b *testing.B) {
func BenchmarkAllAggregateFunctions(b *testing.B) {
for _, aggFn := range SupportedAggFns {
for _, agg := range aggTypes {
typ := types.Int
if aggFn == execinfrapb.AggregatorSpec_BOOL_AND || aggFn == execinfrapb.AggregatorSpec_BOOL_OR {
var typ *types.T
switch aggFn {
case execinfrapb.AggregatorSpec_BOOL_AND, execinfrapb.AggregatorSpec_BOOL_OR:
typ = types.Bool
case execinfrapb.AggregatorSpec_CONCAT_AGG:
typ = types.Bytes
default:
typ = types.Int
}
for _, groupSize := range []int{1, coldata.BatchSize()} {
benchmarkAggregateFunction(b, agg, aggFn, typ, groupSize, nullProbability)
Expand Down
200 changes: 200 additions & 0 deletions pkg/sql/colexec/concat_agg_tmpl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

// {{/*
// +build execgen_template
//
// This file is the execgen template for concat_agg.eg.go. It's formatted in a
// special way, so it's both valid Go and a valid text/template input. This
// permits editing this file with editor support.
//
// */}}

package colexec

import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
)

func (a *concat_AGGKINDAgg) Init(groups []bool, vec coldata.Vec) {
// {{if eq "_AGGKIND" "Ordered"}}
a.groups = groups
// {{end}}
a.vec = vec
a.col = vec.Bytes()
a.nulls = vec.Nulls()
a.Reset()
}

func (a *concat_AGGKINDAgg) Reset() {
a.curIdx = 0
a.foundNonNullForCurrentGroup = false
a.curAgg = zeroBytesValue
a.nulls.UnsetNulls()
}

func (a *concat_AGGKINDAgg) CurrentOutputIndex() int {
return a.curIdx
}

func (a *concat_AGGKINDAgg) SetOutputIndex(idx int) {
a.curIdx = idx
}

func (a *concat_AGGKINDAgg) Compute(batch coldata.Batch, inputIdxs []uint32) {
inputLen := batch.Length()
vec, sel := batch.ColVec(int(inputIdxs[0])), batch.Selection()
col, nulls := vec.Bytes(), vec.Nulls()
a.allocator.PerformOperation(
[]coldata.Vec{a.vec},
func() {
previousAggValMemoryUsage := a.aggValMemoryUsage()
if nulls.MaybeHasNulls() {
if sel != nil {
sel = sel[:inputLen]
for _, i := range sel {
_ACCUMULATE_CONCAT(a, nulls, i, true)
}
} else {
for i := 0; i < inputLen; i++ {
_ACCUMULATE_CONCAT(a, nulls, i, true)
}
}
} else {
if sel != nil {
sel = sel[:inputLen]
for _, i := range sel {
_ACCUMULATE_CONCAT(a, nulls, i, false)
}
} else {
for i := 0; i < inputLen; i++ {
_ACCUMULATE_CONCAT(a, nulls, i, false)
}
}
}
currentAggValMemoryUsage := a.aggValMemoryUsage()
a.allocator.AdjustMemoryUsage(currentAggValMemoryUsage - previousAggValMemoryUsage)
},
)
}

func (a *concat_AGGKINDAgg) Flush() {
a.allocator.PerformOperation(
[]coldata.Vec{a.vec}, func() {
if !a.foundNonNullForCurrentGroup {
a.nulls.SetNull(a.curIdx)
} else {
a.col.Set(a.curIdx, a.curAgg)
}
a.allocator.AdjustMemoryUsage(-a.aggValMemoryUsage())
// release reference to curAgg eagerly
a.curAgg = nil
a.curIdx++
})
}

func (a *concat_AGGKINDAgg) HandleEmptyInputScalar() {
a.nulls.SetNull(0)
}

func (a *concat_AGGKINDAgg) aggValMemoryUsage() int64 {
return int64(len(a.curAgg))
}

func (a *concat_AGGKINDAggAlloc) newAggFunc() aggregateFunc {
if len(a.aggFuncs) == 0 {
a.allocator.AdjustMemoryUsage(sizeOfConcat_AGGKINDAgg * a.allocSize)
a.aggFuncs = make([]concat_AGGKINDAgg, a.allocSize)
for i := range a.aggFuncs {
a.aggFuncs[i].allocator = a.allocator
}
}
f := &a.aggFuncs[0]
a.aggFuncs = a.aggFuncs[1:]
return f
}

const sizeOfConcat_AGGKINDAgg = int64(unsafe.Sizeof(concat_AGGKINDAgg{}))

func newConcat_AGGKINDAggAlloc(allocator *colmem.Allocator, allocSize int64) aggregateFuncAlloc {
return &concat_AGGKINDAggAlloc{aggAllocBase: aggAllocBase{
allocator: allocator,
allocSize: allocSize,
}}
}

type concat_AGGKINDAggAlloc struct {
aggAllocBase
aggFuncs []concat_AGGKINDAgg
}

type concat_AGGKINDAgg struct {
// allocator is the allocator used to create this aggregateFunc
// memory usage of output vector and curAgg varies during aggregation,
// we need the allocator to monitor this change.
allocator *colmem.Allocator
// {{if eq "_AGGKIND" "Ordered"}}
groups []bool
// {{end}}
curIdx int
// curAgg holds the running total.
curAgg []byte
// col points to the output vector we are updating.
col *coldata.Bytes
// vec is the same as col before conversion from coldata.Vec.
vec coldata.Vec
// nulls points to the output null vector that we are updating.
nulls *coldata.Nulls
// foundNonNullForCurrentGroup tracks if we have seen any non-null values
// for the group that is currently being aggregated.
foundNonNullForCurrentGroup bool
}

// {{/*
func _ACCUMULATE_CONCAT(a *concat_AGGKINDAgg, nulls *coldata.Nulls, i int, _HAS_NULLS bool) { // */}}
// {{define "accumulateConcat" }}
// {{if eq "_AGGKIND" "Ordered"}}
if a.groups[i] {
// If we encounter a new group, and we haven't found any non-nulls for the
// current group, the output for this group should be null.
if !a.foundNonNullForCurrentGroup {
a.nulls.SetNull(a.curIdx)
} else {
a.col.Set(a.curIdx, a.curAgg)
}
a.curIdx++
a.curAgg = zeroBytesValue

// {{/*
// We only need to reset this flag if there are nulls. If there are no
// nulls, this will be updated unconditionally below.
// */}}
// {{if .HasNulls}}
a.foundNonNullForCurrentGroup = false
// {{end}}
}
// {{end}}

var isNull bool
// {{if .HasNulls}}
isNull = nulls.NullAt(i)
// {{else}}
isNull = false
// {{end}}
if !isNull {
a.curAgg = append(a.curAgg, col.Get(i)...)
a.foundNonNullForCurrentGroup = true
}
// {{end}}
// {{/*
} // */}}
34 changes: 34 additions & 0 deletions pkg/sql/colexec/execgen/cmd/execgen/concat_agg_gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package main

import (
"io"
"text/template"
)

const concatAggTmpl = "pkg/sql/colexec/concat_agg_tmpl.go"

func genConcatAgg(inputFileContents string, wr io.Writer) error {

accumulateConcatRe := makeFunctionRegex("_ACCUMULATE_CONCAT", 4)
s := accumulateConcatRe.ReplaceAllString(inputFileContents, `{{template "accumulateConcat" buildDict "HasNulls" $4}}`)

tmpl, err := template.New("concat_agg").Funcs(template.FuncMap{"buildDict": buildDict}).Parse(s)
if err != nil {
return err
}
return tmpl.Execute(wr, struct{}{})
}

func init() {
registerAggGenerator(genConcatAgg, "concat_agg.eg.go", concatAggTmpl)
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
// these utility test methods for datum-backed types.
// */}}
{{if and (not (eq .Left.VecMethod "Datum")) (not (eq .Right.VecMethod "Datum"))}}
{{if and (not (eq .Left.VecMethod "Bytes")) (not (eq .Right.VecMethod "Bytes"))}}
func {{template "opName" .}}(a {{.Left.GoType}}, b {{.Right.GoType}}) {{.Right.RetGoType}} {
var r {{.Right.RetGoType}}
Expand All @@ -52,6 +53,7 @@ func {{template "opName" .}}(a {{.Left.GoType}}, b {{.Right.GoType}}) {{.Right.R
return r
}
{{end}}
{{end}}
{{end}}
`
Expand Down
Loading

0 comments on commit dd8049d

Please sign in to comment.