Skip to content

Commit

Permalink
colexec: implement vectorized ntile window function
Browse files Browse the repository at this point in the history
This patch implements the `ntile` window function in the vectorized
engine. `ntile` takes in an integer argument `num_buckets` and then
distributes all rows in a partition equally between the buckets,
outputting the bucket number for each row.

In the vectorized implementation, batches are buffered until the end
of a partition is reached, at which point the `ntile` bucket values can
be calculated. The batches are emitted in a streaming fashion; as soon
as a batch is fully processed, it is returned and work is paused until
the next call to `Next()`.

See #37035

Release note (sql change): the vectorized engine now supports the ntile
window function.
  • Loading branch information
Drew Kimball authored and DrewKimball committed May 18, 2021
1 parent a3a3381 commit 4ee8ca0
Show file tree
Hide file tree
Showing 13 changed files with 1,509 additions and 64 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/colexecsel/default_cmp_sel_ops.eg.go \
pkg/sql/colexec/colexecsel/selection_ops.eg.go \
pkg/sql/colexec/colexecsel/sel_like_ops.eg.go \
pkg/sql/colexec/colexecwindow/ntile.eg.go \
pkg/sql/colexec/colexecwindow/rank.eg.go \
pkg/sql/colexec/colexecwindow/relative_rank.eg.go \
pkg/sql/colexec/colexecwindow/row_number.eg.go \
Expand Down
53 changes: 46 additions & 7 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,22 +1180,47 @@ func NewColOperator(
result.ColumnTypes = make([]*types.T, len(spec.Input[0].ColumnTypes))
copy(result.ColumnTypes, spec.Input[0].ColumnTypes)
for _, wf := range core.Windower.WindowFns {
// We allocate the capacity for two extra types because of the
// temporary columns that can be appended below.
typs := make([]*types.T, len(result.ColumnTypes), len(result.ColumnTypes)+2)
// We allocate the capacity for two extra types because of the temporary
// columns that can be appended below. Capacity is also allocated for
// each of the argument types in case casting is necessary.
typs := make([]*types.T, len(result.ColumnTypes), len(result.ColumnTypes)+len(wf.ArgsIdxs)+2)
copy(typs, result.ColumnTypes)
tempColOffset, partitionColIdx := uint32(0), tree.NoColumnIdx

tempColOffset := uint32(0)
argTypes := make([]*types.T, len(wf.ArgsIdxs))
argIdxs := make([]int, len(wf.ArgsIdxs))
for i, idx := range wf.ArgsIdxs {
// Retrieve the type of each argument and perform any necessary casting.
expectedType := colexecwindow.GetWindowFnArgType(*wf.Func.WindowFunc, i)
if !expectedType.Identical(typs[idx]) {
// We must cast to the expected argument type.
castIdx := len(typs)
input, err = colexecbase.GetCastOperator(
streamingAllocator, input, int(idx), castIdx, typs[idx], expectedType,
)
if err != nil {
colexecerror.InternalError(errors.AssertionFailedf(
"failed to cast window function argument to type %v", expectedType))
}
typs = append(typs, expectedType)
idx = uint32(castIdx)
tempColOffset++
}
argTypes[i] = expectedType
argIdxs[i] = int(idx)
}
partitionColIdx := tree.NoColumnIdx
peersColIdx := tree.NoColumnIdx
windowFn := *wf.Func.WindowFunc
if len(core.Windower.PartitionBy) > 0 {
// TODO(yuzefovich): add support for hashing partitioner
// (probably by leveraging hash routers once we can
// distribute). The decision about which kind of partitioner
// to use should come from the optimizer.
partitionColIdx = int(wf.OutputColIdx)
partitionColIdx = int(wf.OutputColIdx + tempColOffset)
input, err = colexecwindow.NewWindowSortingPartitioner(
streamingAllocator, input, typs,
core.Windower.PartitionBy, wf.Ordering.Columns, int(wf.OutputColIdx),
core.Windower.PartitionBy, wf.Ordering.Columns, partitionColIdx,
func(input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column) (colexecop.Operator, error) {
return result.createDiskBackedSort(
ctx, flowCtx, args, input, inputTypes,
Expand Down Expand Up @@ -1262,6 +1287,20 @@ func NewColOperator(
if c, ok := result.Root.(colexecop.Closer); ok {
result.ToClose = append(result.ToClose, c)
}
case execinfrapb.WindowerSpec_NTILE:
// We are using an unlimited memory monitor here because
// the ntile operators themselves are responsible for
// making sure that we stay within the memory limit, and
// they will fall back to disk if necessary.
opName := opNamePrefix + "ntile"
unlimitedAllocator := colmem.NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(ctx, flowCtx, opName, spec.ProcessorID), factory,
)
diskAcc := result.createDiskAccount(ctx, flowCtx, opName, spec.ProcessorID)
result.Root = colexecwindow.NewNTileOperator(
unlimitedAllocator, execinfra.GetWorkMemLimit(flowCtx), args.DiskQueueCfg,
args.FDSemaphore, input, typs, outputIdx, partitionColIdx, argIdxs[0], diskAcc,
)
default:
return r, errors.AssertionFailedf("window function %s is not supported", wf.String())
}
Expand All @@ -1278,7 +1317,7 @@ func NewColOperator(
result.Root = colexecbase.NewSimpleProjectOp(result.Root, int(wf.OutputColIdx+tempColOffset), projection)
}

_, returnType, err := execinfrapb.GetWindowFunctionInfo(wf.Func, []*types.T{}...)
_, returnType, err := execinfrapb.GetWindowFunctionInfo(wf.Func, argTypes...)
if err != nil {
return r, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecwindow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/sql/colexecop", # keep
"//pkg/sql/colmem", # keep
"//pkg/sql/execinfrapb", # keep
"//pkg/sql/sem/builtins", # keep
"//pkg/sql/sem/tree", # keep
"//pkg/sql/types", # keep
"//pkg/util/mon", # keep
Expand Down Expand Up @@ -63,6 +64,7 @@ go_test(

# Map between target name and relevant template.
targets = [
("ntile.eg.go", "ntile_tmpl.go"),
("rank.eg.go", "rank_tmpl.go"),
("relative_rank.eg.go", "relative_rank_tmpl.go"),
("row_number.eg.go", "row_number_tmpl.go"),
Expand Down
Loading

0 comments on commit 4ee8ca0

Please sign in to comment.