From 900cfc02b0bcb726fb6ba8eefbcab2104875d212 Mon Sep 17 00:00:00 2001 From: Yongyang Lai Date: Sat, 11 Jul 2020 20:09:18 +0800 Subject: [PATCH] colexec: implement function concat_agg for vectorized engine This commit implements aggregate function "concat_agg" for vectorized engine Release note (sql change): vectorized engine now supports aggregate function "concat_agg" --- Makefile | 2 + pkg/sql/colexec/aggregate_funcs.go | 7 + pkg/sql/colexec/aggregators_test.go | 66 ++++-- pkg/sql/colexec/concat_agg_tmpl.go | 190 ++++++++++++++++ .../execgen/cmd/execgen/concat_agg_gen.go | 34 +++ .../cmd/execgen/overloads_test_utils_gen.go | 2 + pkg/sql/colexec/hash_concat_agg.eg.go | 159 +++++++++++++ pkg/sql/colexec/ordered_concat_agg.eg.go | 215 ++++++++++++++++++ pkg/sql/colexec/overloads_test_utils.eg.go | 121 ---------- pkg/sql/colexec/utils.go | 1 + .../testdata/logic_test/vectorize_agg | 55 +++++ 11 files changed, 714 insertions(+), 138 deletions(-) create mode 100644 pkg/sql/colexec/concat_agg_tmpl.go create mode 100644 pkg/sql/colexec/execgen/cmd/execgen/concat_agg_gen.go create mode 100644 pkg/sql/colexec/hash_concat_agg.eg.go create mode 100644 pkg/sql/colexec/ordered_concat_agg.eg.go diff --git a/Makefile b/Makefile index e5b4efdb6065..7c13bafa29b9 100644 --- a/Makefile +++ b/Makefile @@ -861,6 +861,7 @@ EXECGEN_TARGETS = \ pkg/sql/colexec/hash_any_not_null_agg.eg.go \ pkg/sql/colexec/hash_avg_agg.eg.go \ pkg/sql/colexec/hash_bool_and_or_agg.eg.go \ + pkg/sql/colexec/hash_concat_agg.eg.go \ pkg/sql/colexec/hash_count_agg.eg.go \ pkg/sql/colexec/hash_min_max_agg.eg.go \ pkg/sql/colexec/hash_sum_agg.eg.go \ @@ -879,6 +880,7 @@ EXECGEN_TARGETS = \ pkg/sql/colexec/ordered_any_not_null_agg.eg.go \ pkg/sql/colexec/ordered_avg_agg.eg.go \ pkg/sql/colexec/ordered_bool_and_or_agg.eg.go \ + pkg/sql/colexec/ordered_concat_agg.eg.go \ pkg/sql/colexec/ordered_count_agg.eg.go \ pkg/sql/colexec/ordered_min_max_agg.eg.go \ pkg/sql/colexec/ordered_sum_agg.eg.go \ diff --git a/pkg/sql/colexec/aggregate_funcs.go b/pkg/sql/colexec/aggregate_funcs.go index f0c70e8b8445..01f5c6c28f21 100644 --- a/pkg/sql/colexec/aggregate_funcs.go +++ b/pkg/sql/colexec/aggregate_funcs.go @@ -27,6 +27,7 @@ var SupportedAggFns = []execinfrapb.AggregatorSpec_Func{ execinfrapb.AggregatorSpec_AVG, execinfrapb.AggregatorSpec_SUM, execinfrapb.AggregatorSpec_SUM_INT, + execinfrapb.AggregatorSpec_CONCAT_AGG, execinfrapb.AggregatorSpec_COUNT_ROWS, execinfrapb.AggregatorSpec_COUNT, execinfrapb.AggregatorSpec_MIN, @@ -147,6 +148,12 @@ func newAggregateFuncsAlloc( } else { funcAllocs[i], err = newSumIntOrderedAggAlloc(allocator, aggTyps[i][0], allocSize) } + case execinfrapb.AggregatorSpec_CONCAT_AGG: + if isHashAgg { + funcAllocs[i] = newConcatHashAggAlloc(allocator, allocSize) + } else { + funcAllocs[i] = newConcatOrderedAggAlloc(allocator, allocSize) + } case execinfrapb.AggregatorSpec_COUNT_ROWS: if isHashAgg { funcAllocs[i] = newCountRowsHashAggAlloc(allocator, allocSize) diff --git a/pkg/sql/colexec/aggregators_test.go b/pkg/sql/colexec/aggregators_test.go index c4934dae8ec5..df5c9ecc251d 100644 --- a/pkg/sql/colexec/aggregators_test.go +++ b/pkg/sql/colexec/aggregators_test.go @@ -525,6 +525,31 @@ func TestAggregatorMultiFunc(t *testing.T) { aggCols: [][]uint32{{1}, {2}, {3}, {4}, {5}, {6}}, name: "AVG on all types", }, + { + input: tuples{ + {1, "1"}, + {1, "2"}, + {1, "3"}, + {2, nil}, + {2, "1"}, + {2, "2"}, + {3, "1"}, + {3, nil}, + {3, "2"}, + {4, nil}, + {4, nil}, + }, + expected: tuples{ + {"123"}, + {"12"}, + {"12"}, + {nil}, + }, + typs: []*types.T{types.Int, types.Bytes}, + aggFns: []execinfrapb.AggregatorSpec_Func{execinfrapb.AggregatorSpec_CONCAT_AGG}, + groupCols: []uint32{0}, + aggCols: [][]uint32{{1}}, + }, } for _, agg := range aggTypes { @@ -559,8 +584,9 @@ func TestAggregatorAllFunctions(t *testing.T) { execinfrapb.AggregatorSpec_MAX, execinfrapb.AggregatorSpec_BOOL_AND, execinfrapb.AggregatorSpec_BOOL_OR, + execinfrapb.AggregatorSpec_CONCAT_AGG, }, - aggCols: [][]uint32{{0}, {4}, {1}, {}, {1}, {1}, {2}, {2}, {2}, {3}, {3}}, + aggCols: [][]uint32{{0}, {4}, {1}, {}, {1}, {1}, {2}, {2}, {2}, {3}, {3}, {4}}, typs: []*types.T{types.Int, types.Decimal, types.Int, types.Bool, types.Bytes}, input: tuples{ {0, 3.1, 2, true, "zero"}, @@ -572,10 +598,10 @@ func TestAggregatorAllFunctions(t *testing.T) { {3, 5.1, 0, true, "three"}, }, expected: tuples{ - {0, "zero", 2.1, 2, 2, 4.2, 5, 2, 3, false, true}, - {1, "one", 2.6, 2, 2, 5.2, 1, 0, 1, false, false}, - {2, "two", 1.1, 1, 1, 1.1, 1, 1, 1, true, true}, - {3, "three", 4.6, 2, 2, 9.2, 0, 0, 0, false, true}, + {0, "zero", 2.1, 2, 2, 4.2, 5, 2, 3, false, true, "zerozero"}, + {1, "one", 2.6, 2, 2, 5.2, 1, 0, 1, false, false, "oneone"}, + {2, "two", 1.1, 1, 1, 1.1, 1, 1, 1, true, true, "two"}, + {3, "three", 4.6, 2, 2, 9.2, 0, 0, 0, false, true, "threethree"}, }, convToDecimal: true, }, @@ -594,20 +620,21 @@ func TestAggregatorAllFunctions(t *testing.T) { execinfrapb.AggregatorSpec_AVG, execinfrapb.AggregatorSpec_BOOL_AND, execinfrapb.AggregatorSpec_BOOL_OR, + execinfrapb.AggregatorSpec_CONCAT_AGG, }, - aggCols: [][]uint32{{0}, {1}, {}, {1}, {1}, {2}, {2}, {2}, {1}, {3}, {3}}, - typs: []*types.T{types.Int, types.Decimal, types.Int, types.Bool}, + aggCols: [][]uint32{{0}, {1}, {}, {1}, {1}, {2}, {2}, {2}, {1}, {3}, {3}, {4}}, + typs: []*types.T{types.Int, types.Decimal, types.Int, types.Bool, types.Bytes}, input: tuples{ - {nil, 1.1, 4, true}, - {0, nil, nil, nil}, - {0, 3.1, 5, nil}, - {1, nil, nil, nil}, - {1, nil, nil, false}, + {nil, 1.1, 4, true, "a"}, + {0, nil, nil, nil, nil}, + {0, 3.1, 5, nil, "b"}, + {1, nil, nil, nil, nil}, + {1, nil, nil, false, nil}, }, expected: tuples{ - {nil, 1.1, 1, 1, 1.1, 4, 4, 4, 1.1, true, true}, - {0, 3.1, 2, 1, 3.1, 5, 5, 5, 3.1, nil, nil}, - {1, nil, 2, 0, nil, nil, nil, nil, nil, false, false}, + {nil, 1.1, 1, 1, 1.1, 4, 4, 4, 1.1, true, true, "a"}, + {0, 3.1, 2, 1, 3.1, 5, 5, 5, 3.1, nil, nil, "b"}, + {1, nil, 2, 0, nil, nil, nil, nil, nil, false, false, nil}, }, convToDecimal: true, }, @@ -867,9 +894,14 @@ func BenchmarkAggregator(b *testing.B) { func BenchmarkAllAggregateFunctions(b *testing.B) { for _, aggFn := range SupportedAggFns { for _, agg := range aggTypes { - typ := types.Int - if aggFn == execinfrapb.AggregatorSpec_BOOL_AND || aggFn == execinfrapb.AggregatorSpec_BOOL_OR { + var typ *types.T + switch aggFn { + case execinfrapb.AggregatorSpec_BOOL_AND, execinfrapb.AggregatorSpec_BOOL_OR: typ = types.Bool + case execinfrapb.AggregatorSpec_CONCAT_AGG: + typ = types.Bytes + default: + typ = types.Int } for _, groupSize := range []int{1, coldata.BatchSize()} { benchmarkAggregateFunction(b, agg, aggFn, typ, groupSize, nullProbability) diff --git a/pkg/sql/colexec/concat_agg_tmpl.go b/pkg/sql/colexec/concat_agg_tmpl.go new file mode 100644 index 000000000000..efab591009fc --- /dev/null +++ b/pkg/sql/colexec/concat_agg_tmpl.go @@ -0,0 +1,190 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// {{/* +// +build execgen_template +// +// This file is the execgen template for concat_agg.eg.go. It's formatted in a +// special way, so it's both valid Go and a valid text/template input. This +// permits editing this file with editor support. +// +// */}} + +package colexec + +import ( + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" +) + +func (a *concat_AGGKINDAgg) Init(groups []bool, vec coldata.Vec) { + // {{if eq "_AGGKIND" "Ordered"}} + a.groups = groups + // {{end}} + a.vec = vec + a.col = vec.Bytes() + a.nulls = vec.Nulls() + a.Reset() +} + +func (a *concat_AGGKINDAgg) Reset() { + a.curIdx = 0 + a.foundNonNullForCurrentGroup = false + a.curAgg = zeroBytesValue + a.nulls.UnsetNulls() +} + +func (a *concat_AGGKINDAgg) CurrentOutputIndex() int { + return a.curIdx +} + +func (a *concat_AGGKINDAgg) SetOutputIndex(idx int) { + a.curIdx = idx +} + +func (a *concat_AGGKINDAgg) Compute(batch coldata.Batch, inputIdxs []uint32) { + inputLen := batch.Length() + vec, sel := batch.ColVec(int(inputIdxs[0])), batch.Selection() + col, nulls := vec.Bytes(), vec.Nulls() + a.allocator.PerformOperation( + []coldata.Vec{a.vec}, + func() { + if nulls.MaybeHasNulls() { + if sel != nil { + sel = sel[:inputLen] + for _, i := range sel { + _ACCUMULATE_CONCAT(a, nulls, i, true) + } + } else { + for i := 0; i < inputLen; i++ { + _ACCUMULATE_CONCAT(a, nulls, i, true) + } + } + } else { + if sel != nil { + sel = sel[:inputLen] + for _, i := range sel { + _ACCUMULATE_CONCAT(a, nulls, i, false) + } + } else { + for i := 0; i < inputLen; i++ { + _ACCUMULATE_CONCAT(a, nulls, i, false) + } + } + } + }, + ) +} + +func (a *concat_AGGKINDAgg) Flush() { + a.allocator.PerformOperation( + []coldata.Vec{a.vec}, func() { + if !a.foundNonNullForCurrentGroup { + a.nulls.SetNull(a.curIdx) + } else { + a.col.Set(a.curIdx, a.curAgg) + } + a.curIdx++ + }) +} + +func (a *concat_AGGKINDAgg) HandleEmptyInputScalar() { + a.nulls.SetNull(0) +} + +func (a *concat_AGGKINDAggAlloc) newAggFunc() aggregateFunc { + if len(a.aggFuncs) == 0 { + a.allocator.AdjustMemoryUsage(sizeOfConcat_AGGKINDAgg * a.allocSize) + a.aggFuncs = make([]concat_AGGKINDAgg, a.allocSize) + for i := range a.aggFuncs { + a.aggFuncs[i].allocator = a.allocator + } + } + f := &a.aggFuncs[0] + a.aggFuncs = a.aggFuncs[1:] + return f +} + +const sizeOfConcat_AGGKINDAgg = int64(unsafe.Sizeof(concat_AGGKINDAgg{})) + +func newConcat_AGGKINDAggAlloc(allocator *colmem.Allocator, allocSize int64) aggregateFuncAlloc { + return &concat_AGGKINDAggAlloc{aggAllocBase: aggAllocBase{ + allocator: allocator, + allocSize: allocSize, + }} +} + +type concat_AGGKINDAggAlloc struct { + aggAllocBase + aggFuncs []concat_AGGKINDAgg +} + +type concat_AGGKINDAgg struct { + // allocator is the allocator used to create this aggregateFunc + // memory usage of output vector varies during aggregation, we + // need the allocator to monitor this change. + allocator *colmem.Allocator + // {{if eq "_AGGKIND" "Ordered"}} + groups []bool + // {{end}} + curIdx int + // curAgg holds the running total. + curAgg []byte + // col points to the output vector we are updating. + col *coldata.Bytes + // vec is the same as col before conversion from coldata.Vec. + vec coldata.Vec + // nulls points to the output null vector that we are updating. + nulls *coldata.Nulls + // foundNonNullForCurrentGroup tracks if we have seen any non-null values + // for the group that is currently being aggregated. + foundNonNullForCurrentGroup bool +} + +// {{/* +func _ACCUMULATE_CONCAT(a *concat_AGGKINDAgg, nulls *coldata.Nulls, i int, _HAS_NULLS bool) { // */}} + // {{define "accumulateConcat" }} + // {{if eq "_AGGKIND" "Ordered"}} + if a.groups[i] { + // If we encounter a new group, and we haven't found any non-nulls for the + // current group, the output for this group should be null. + if !a.foundNonNullForCurrentGroup { + a.nulls.SetNull(a.curIdx) + } else { + a.col.Set(a.curIdx, a.curAgg) + } + a.curIdx++ + a.curAgg = zeroBytesValue + + // {{/* + // We only need to reset this flag if there are nulls. If there are no + // nulls, this will be updated unconditionally below. + // */}} + // {{if .HasNulls}} + a.foundNonNullForCurrentGroup = false + // {{end}} + } + // {{end}} + + var isNull bool + // {{if .HasNulls}} + isNull = nulls.NullAt(i) + // {{else}} + isNull = false + // {{end}} + if !isNull { + a.curAgg = append(a.curAgg, col.Get(i)...) + a.foundNonNullForCurrentGroup = true + } + // {{end}} + // {{/* +} // */}} diff --git a/pkg/sql/colexec/execgen/cmd/execgen/concat_agg_gen.go b/pkg/sql/colexec/execgen/cmd/execgen/concat_agg_gen.go new file mode 100644 index 000000000000..81811700a637 --- /dev/null +++ b/pkg/sql/colexec/execgen/cmd/execgen/concat_agg_gen.go @@ -0,0 +1,34 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package main + +import ( + "io" + "text/template" +) + +const concatAggTmpl = "pkg/sql/colexec/concat_agg_tmpl.go" + +func genConcatAgg(inputFileContents string, wr io.Writer) error { + + accumulateConcatRe := makeFunctionRegex("_ACCUMULATE_CONCAT", 4) + s := accumulateConcatRe.ReplaceAllString(inputFileContents, `{{template "accumulateConcat" buildDict "HasNulls" $4}}`) + + tmpl, err := template.New("concat_agg").Funcs(template.FuncMap{"buildDict": buildDict}).Parse(s) + if err != nil { + return err + } + return tmpl.Execute(wr, struct{}{}) +} + +func init() { + registerAggGenerator(genConcatAgg, "concat_agg.eg.go", concatAggTmpl) +} diff --git a/pkg/sql/colexec/execgen/cmd/execgen/overloads_test_utils_gen.go b/pkg/sql/colexec/execgen/cmd/execgen/overloads_test_utils_gen.go index 38c81e2dcc67..2f0e030819ab 100644 --- a/pkg/sql/colexec/execgen/cmd/execgen/overloads_test_utils_gen.go +++ b/pkg/sql/colexec/execgen/cmd/execgen/overloads_test_utils_gen.go @@ -39,6 +39,7 @@ import ( // these utility test methods for datum-backed types. // */}} {{if and (not (eq .Left.VecMethod "Datum")) (not (eq .Right.VecMethod "Datum"))}} +{{if and (not (eq .Left.VecMethod "Bytes")) (not (eq .Right.VecMethod "Bytes"))}} func {{template "opName" .}}(a {{.Left.GoType}}, b {{.Right.GoType}}) {{.Right.RetGoType}} { var r {{.Right.RetGoType}} @@ -52,6 +53,7 @@ func {{template "opName" .}}(a {{.Left.GoType}}, b {{.Right.GoType}}) {{.Right.R return r } +{{end}} {{end}} {{end}} ` diff --git a/pkg/sql/colexec/hash_concat_agg.eg.go b/pkg/sql/colexec/hash_concat_agg.eg.go new file mode 100644 index 000000000000..d01569f45075 --- /dev/null +++ b/pkg/sql/colexec/hash_concat_agg.eg.go @@ -0,0 +1,159 @@ +// Code generated by execgen; DO NOT EDIT. +// Copyright 2020 The Cockroach Authors. +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colexec + +import ( + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" +) + +func (a *concatHashAgg) Init(groups []bool, vec coldata.Vec) { + a.vec = vec + a.col = vec.Bytes() + a.nulls = vec.Nulls() + a.Reset() +} + +func (a *concatHashAgg) Reset() { + a.curIdx = 0 + a.foundNonNullForCurrentGroup = false + a.curAgg = zeroBytesValue + a.nulls.UnsetNulls() +} + +func (a *concatHashAgg) CurrentOutputIndex() int { + return a.curIdx +} + +func (a *concatHashAgg) SetOutputIndex(idx int) { + a.curIdx = idx +} + +func (a *concatHashAgg) Compute(batch coldata.Batch, inputIdxs []uint32) { + inputLen := batch.Length() + vec, sel := batch.ColVec(int(inputIdxs[0])), batch.Selection() + col, nulls := vec.Bytes(), vec.Nulls() + a.allocator.PerformOperation( + []coldata.Vec{a.vec}, + func() { + if nulls.MaybeHasNulls() { + if sel != nil { + sel = sel[:inputLen] + for _, i := range sel { + + var isNull bool + isNull = nulls.NullAt(i) + if !isNull { + a.curAgg = append(a.curAgg, col.Get(i)...) + a.foundNonNullForCurrentGroup = true + } + } + } else { + for i := 0; i < inputLen; i++ { + + var isNull bool + isNull = nulls.NullAt(i) + if !isNull { + a.curAgg = append(a.curAgg, col.Get(i)...) + a.foundNonNullForCurrentGroup = true + } + } + } + } else { + if sel != nil { + sel = sel[:inputLen] + for _, i := range sel { + + var isNull bool + isNull = false + if !isNull { + a.curAgg = append(a.curAgg, col.Get(i)...) + a.foundNonNullForCurrentGroup = true + } + } + } else { + for i := 0; i < inputLen; i++ { + + var isNull bool + isNull = false + if !isNull { + a.curAgg = append(a.curAgg, col.Get(i)...) + a.foundNonNullForCurrentGroup = true + } + } + } + } + }, + ) +} + +func (a *concatHashAgg) Flush() { + a.allocator.PerformOperation( + []coldata.Vec{a.vec}, func() { + if !a.foundNonNullForCurrentGroup { + a.nulls.SetNull(a.curIdx) + } else { + a.col.Set(a.curIdx, a.curAgg) + } + a.curIdx++ + }) +} + +func (a *concatHashAgg) HandleEmptyInputScalar() { + a.nulls.SetNull(0) +} + +func (a *concatHashAggAlloc) newAggFunc() aggregateFunc { + if len(a.aggFuncs) == 0 { + a.allocator.AdjustMemoryUsage(sizeOfConcatHashAgg * a.allocSize) + a.aggFuncs = make([]concatHashAgg, a.allocSize) + for i := range a.aggFuncs { + a.aggFuncs[i].allocator = a.allocator + } + } + f := &a.aggFuncs[0] + a.aggFuncs = a.aggFuncs[1:] + return f +} + +const sizeOfConcatHashAgg = int64(unsafe.Sizeof(concatHashAgg{})) + +func newConcatHashAggAlloc(allocator *colmem.Allocator, allocSize int64) aggregateFuncAlloc { + return &concatHashAggAlloc{aggAllocBase: aggAllocBase{ + allocator: allocator, + allocSize: allocSize, + }} +} + +type concatHashAggAlloc struct { + aggAllocBase + aggFuncs []concatHashAgg +} + +type concatHashAgg struct { + // allocator is the allocator used to create this aggregateFunc + // memory usage of output vector varies during aggregation, we + // need the allocator to monitor this change. + allocator *colmem.Allocator + curIdx int + // curAgg holds the running total. + curAgg []byte + // col points to the output vector we are updating. + col *coldata.Bytes + // vec is the same as col before conversion from coldata.Vec. + vec coldata.Vec + // nulls points to the output null vector that we are updating. + nulls *coldata.Nulls + // foundNonNullForCurrentGroup tracks if we have seen any non-null values + // for the group that is currently being aggregated. + foundNonNullForCurrentGroup bool +} diff --git a/pkg/sql/colexec/ordered_concat_agg.eg.go b/pkg/sql/colexec/ordered_concat_agg.eg.go new file mode 100644 index 000000000000..f69606648bb4 --- /dev/null +++ b/pkg/sql/colexec/ordered_concat_agg.eg.go @@ -0,0 +1,215 @@ +// Code generated by execgen; DO NOT EDIT. +// Copyright 2020 The Cockroach Authors. +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colexec + +import ( + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" +) + +func (a *concatOrderedAgg) Init(groups []bool, vec coldata.Vec) { + a.groups = groups + a.vec = vec + a.col = vec.Bytes() + a.nulls = vec.Nulls() + a.Reset() +} + +func (a *concatOrderedAgg) Reset() { + a.curIdx = 0 + a.foundNonNullForCurrentGroup = false + a.curAgg = zeroBytesValue + a.nulls.UnsetNulls() +} + +func (a *concatOrderedAgg) CurrentOutputIndex() int { + return a.curIdx +} + +func (a *concatOrderedAgg) SetOutputIndex(idx int) { + a.curIdx = idx +} + +func (a *concatOrderedAgg) Compute(batch coldata.Batch, inputIdxs []uint32) { + inputLen := batch.Length() + vec, sel := batch.ColVec(int(inputIdxs[0])), batch.Selection() + col, nulls := vec.Bytes(), vec.Nulls() + a.allocator.PerformOperation( + []coldata.Vec{a.vec}, + func() { + if nulls.MaybeHasNulls() { + if sel != nil { + sel = sel[:inputLen] + for _, i := range sel { + + if a.groups[i] { + // If we encounter a new group, and we haven't found any non-nulls for the + // current group, the output for this group should be null. + if !a.foundNonNullForCurrentGroup { + a.nulls.SetNull(a.curIdx) + } else { + a.col.Set(a.curIdx, a.curAgg) + } + a.curIdx++ + a.curAgg = zeroBytesValue + + a.foundNonNullForCurrentGroup = false + } + + var isNull bool + isNull = nulls.NullAt(i) + if !isNull { + a.curAgg = append(a.curAgg, col.Get(i)...) + a.foundNonNullForCurrentGroup = true + } + } + } else { + for i := 0; i < inputLen; i++ { + + if a.groups[i] { + // If we encounter a new group, and we haven't found any non-nulls for the + // current group, the output for this group should be null. + if !a.foundNonNullForCurrentGroup { + a.nulls.SetNull(a.curIdx) + } else { + a.col.Set(a.curIdx, a.curAgg) + } + a.curIdx++ + a.curAgg = zeroBytesValue + + a.foundNonNullForCurrentGroup = false + } + + var isNull bool + isNull = nulls.NullAt(i) + if !isNull { + a.curAgg = append(a.curAgg, col.Get(i)...) + a.foundNonNullForCurrentGroup = true + } + } + } + } else { + if sel != nil { + sel = sel[:inputLen] + for _, i := range sel { + + if a.groups[i] { + // If we encounter a new group, and we haven't found any non-nulls for the + // current group, the output for this group should be null. + if !a.foundNonNullForCurrentGroup { + a.nulls.SetNull(a.curIdx) + } else { + a.col.Set(a.curIdx, a.curAgg) + } + a.curIdx++ + a.curAgg = zeroBytesValue + + } + + var isNull bool + isNull = false + if !isNull { + a.curAgg = append(a.curAgg, col.Get(i)...) + a.foundNonNullForCurrentGroup = true + } + } + } else { + for i := 0; i < inputLen; i++ { + + if a.groups[i] { + // If we encounter a new group, and we haven't found any non-nulls for the + // current group, the output for this group should be null. + if !a.foundNonNullForCurrentGroup { + a.nulls.SetNull(a.curIdx) + } else { + a.col.Set(a.curIdx, a.curAgg) + } + a.curIdx++ + a.curAgg = zeroBytesValue + + } + + var isNull bool + isNull = false + if !isNull { + a.curAgg = append(a.curAgg, col.Get(i)...) + a.foundNonNullForCurrentGroup = true + } + } + } + } + }, + ) +} + +func (a *concatOrderedAgg) Flush() { + a.allocator.PerformOperation( + []coldata.Vec{a.vec}, func() { + if !a.foundNonNullForCurrentGroup { + a.nulls.SetNull(a.curIdx) + } else { + a.col.Set(a.curIdx, a.curAgg) + } + a.curIdx++ + }) +} + +func (a *concatOrderedAgg) HandleEmptyInputScalar() { + a.nulls.SetNull(0) +} + +func (a *concatOrderedAggAlloc) newAggFunc() aggregateFunc { + if len(a.aggFuncs) == 0 { + a.allocator.AdjustMemoryUsage(sizeOfConcatOrderedAgg * a.allocSize) + a.aggFuncs = make([]concatOrderedAgg, a.allocSize) + for i := range a.aggFuncs { + a.aggFuncs[i].allocator = a.allocator + } + } + f := &a.aggFuncs[0] + a.aggFuncs = a.aggFuncs[1:] + return f +} + +const sizeOfConcatOrderedAgg = int64(unsafe.Sizeof(concatOrderedAgg{})) + +func newConcatOrderedAggAlloc(allocator *colmem.Allocator, allocSize int64) aggregateFuncAlloc { + return &concatOrderedAggAlloc{aggAllocBase: aggAllocBase{ + allocator: allocator, + allocSize: allocSize, + }} +} + +type concatOrderedAggAlloc struct { + aggAllocBase + aggFuncs []concatOrderedAgg +} + +type concatOrderedAgg struct { + // allocator is the allocator used to create this aggregateFunc + // memory usage of output vector varies during aggregation, we + // need the allocator to monitor this change. + allocator *colmem.Allocator + groups []bool + curIdx int + // curAgg holds the running total. + curAgg []byte + // col points to the output vector we are updating. + col *coldata.Bytes + // vec is the same as col before conversion from coldata.Vec. + vec coldata.Vec + // nulls points to the output null vector that we are updating. + nulls *coldata.Nulls + // foundNonNullForCurrentGroup tracks if we have seen any non-null values + // for the group that is currently being aggregated. + foundNonNullForCurrentGroup bool +} diff --git a/pkg/sql/colexec/overloads_test_utils.eg.go b/pkg/sql/colexec/overloads_test_utils.eg.go index 2449a01727b2..f56575cf6fad 100644 --- a/pkg/sql/colexec/overloads_test_utils.eg.go +++ b/pkg/sql/colexec/overloads_test_utils.eg.go @@ -3,7 +3,6 @@ package colexec import ( - "bytes" "math" "time" @@ -3292,18 +3291,6 @@ func performPowFloat64Float64(a float64, b float64) float64 { return r } -func performConcatBytesBytes(a []byte, b []byte) []byte { - var r []byte - // In order to inline the templated code of overloads, we need to have a - // "_overloadHelper" local variable of type "overloadHelper". - var _overloadHelper overloadHelper - // However, the scratch is not used in all of the functions, so we add this - // to go around "unused" error. - _ = _overloadHelper - colexecerror.InternalError("couldn't translate indexing on target element: r") - return r -} - func performLShiftInt16Int16(a int16, b int16) int64 { var r int64 // In order to inline the templated code of overloads, we need to have a @@ -3690,24 +3677,6 @@ func performEQBoolBool(a bool, b bool) bool { return r } -func performEQBytesBytes(a []byte, b []byte) bool { - var r bool - // In order to inline the templated code of overloads, we need to have a - // "_overloadHelper" local variable of type "overloadHelper". - var _overloadHelper overloadHelper - // However, the scratch is not used in all of the functions, so we add this - // to go around "unused" error. - _ = _overloadHelper - - { - var cmpResult int - cmpResult = bytes.Compare(a, b) - r = cmpResult == 0 - } - - return r -} - func performEQDecimalInt16(a apd.Decimal, b int16) bool { var r bool // In order to inline the templated code of overloads, we need to have a @@ -4511,24 +4480,6 @@ func performNEBoolBool(a bool, b bool) bool { return r } -func performNEBytesBytes(a []byte, b []byte) bool { - var r bool - // In order to inline the templated code of overloads, we need to have a - // "_overloadHelper" local variable of type "overloadHelper". - var _overloadHelper overloadHelper - // However, the scratch is not used in all of the functions, so we add this - // to go around "unused" error. - _ = _overloadHelper - - { - var cmpResult int - cmpResult = bytes.Compare(a, b) - r = cmpResult != 0 - } - - return r -} - func performNEDecimalInt16(a apd.Decimal, b int16) bool { var r bool // In order to inline the templated code of overloads, we need to have a @@ -5332,24 +5283,6 @@ func performLTBoolBool(a bool, b bool) bool { return r } -func performLTBytesBytes(a []byte, b []byte) bool { - var r bool - // In order to inline the templated code of overloads, we need to have a - // "_overloadHelper" local variable of type "overloadHelper". - var _overloadHelper overloadHelper - // However, the scratch is not used in all of the functions, so we add this - // to go around "unused" error. - _ = _overloadHelper - - { - var cmpResult int - cmpResult = bytes.Compare(a, b) - r = cmpResult < 0 - } - - return r -} - func performLTDecimalInt16(a apd.Decimal, b int16) bool { var r bool // In order to inline the templated code of overloads, we need to have a @@ -6153,24 +6086,6 @@ func performLEBoolBool(a bool, b bool) bool { return r } -func performLEBytesBytes(a []byte, b []byte) bool { - var r bool - // In order to inline the templated code of overloads, we need to have a - // "_overloadHelper" local variable of type "overloadHelper". - var _overloadHelper overloadHelper - // However, the scratch is not used in all of the functions, so we add this - // to go around "unused" error. - _ = _overloadHelper - - { - var cmpResult int - cmpResult = bytes.Compare(a, b) - r = cmpResult <= 0 - } - - return r -} - func performLEDecimalInt16(a apd.Decimal, b int16) bool { var r bool // In order to inline the templated code of overloads, we need to have a @@ -6974,24 +6889,6 @@ func performGTBoolBool(a bool, b bool) bool { return r } -func performGTBytesBytes(a []byte, b []byte) bool { - var r bool - // In order to inline the templated code of overloads, we need to have a - // "_overloadHelper" local variable of type "overloadHelper". - var _overloadHelper overloadHelper - // However, the scratch is not used in all of the functions, so we add this - // to go around "unused" error. - _ = _overloadHelper - - { - var cmpResult int - cmpResult = bytes.Compare(a, b) - r = cmpResult > 0 - } - - return r -} - func performGTDecimalInt16(a apd.Decimal, b int16) bool { var r bool // In order to inline the templated code of overloads, we need to have a @@ -7795,24 +7692,6 @@ func performGEBoolBool(a bool, b bool) bool { return r } -func performGEBytesBytes(a []byte, b []byte) bool { - var r bool - // In order to inline the templated code of overloads, we need to have a - // "_overloadHelper" local variable of type "overloadHelper". - var _overloadHelper overloadHelper - // However, the scratch is not used in all of the functions, so we add this - // to go around "unused" error. - _ = _overloadHelper - - { - var cmpResult int - cmpResult = bytes.Compare(a, b) - r = cmpResult >= 0 - } - - return r -} - func performGEDecimalInt16(a apd.Decimal, b int16) bool { var r bool // In order to inline the templated code of overloads, we need to have a diff --git a/pkg/sql/colexec/utils.go b/pkg/sql/colexec/utils.go index 28d06d82339d..e1bb2b47bb92 100644 --- a/pkg/sql/colexec/utils.go +++ b/pkg/sql/colexec/utils.go @@ -33,6 +33,7 @@ var ( zeroFloat64Value float64 zeroInt64Value int64 zeroIntervalValue duration.Duration + zeroBytesValue []byte ) // overloadHelper is a utility struct that helps us avoid allocations diff --git a/pkg/sql/logictest/testdata/logic_test/vectorize_agg b/pkg/sql/logictest/testdata/logic_test/vectorize_agg index d29bc6bd459a..347b616e1cfb 100644 --- a/pkg/sql/logictest/testdata/logic_test/vectorize_agg +++ b/pkg/sql/logictest/testdata/logic_test/vectorize_agg @@ -1,3 +1,5 @@ +# LogicTest: local + statement ok CREATE TABLE bools (a INT, b BOOL) @@ -28,3 +30,56 @@ false false false true true true false false + +statement ok +CREATE TABLE bytes_string(_group INT, _bytes BYTES, _string STRING) + +query T +EXPLAIN (VEC) SELECT concat_agg(_bytes), concat_agg(_string) FROM bytes_string GROUP BY _group +---- +│ +└ Node 1 + └ *colexec.hashAggregator + └ *colfetcher.colBatchScan + +query T +EXPLAIN (VEC) SELECT concat_agg(_bytes), concat_agg(_string) FROM bytes_string +---- +│ +└ Node 1 + └ *colexec.orderedAggregator + └ *colexec.distinctChainOps + └ *colfetcher.colBatchScan + +statement ok +SET vectorize=experimental_always + +query TT +SELECT concat_agg(_bytes), concat_agg(_string) FROM bytes_string +---- +NULL NULL + +query TT +SELECT concat_agg(_bytes), concat_agg(_string) FROM bytes_string GROUP BY _group +---- + +statement ok +RESET vectorize + +statement ok +INSERT INTO bytes_string VALUES +(0, NULL, NULL), +(1, b'1', '1'), +(2, b'2', '2'), (2, b'2', '2'), +(3, b'3', '3'), (3, NULL, NULL), (3, b'3', '3') + +statement ok +SET vectorize=experimental_always + +query TT +SELECT concat_agg(_bytes), concat_agg(_string) FROM bytes_string GROUP BY _group ORDER BY _group +---- +NULL NULL +1 1 +22 22 +33 33