Skip to content

Commit

Permalink
colexec: implement vectorized ntile window function [WIP]
Browse files Browse the repository at this point in the history
This patch implements the `ntile` window function in the vectorized
engine. `ntile` takes in an integer argument `num_buckets` and then
distributes all rows in a partition equally between the buckets,
outputting the bucket number for each row.

In the vectorized implementation, batches are buffered until the end
of a partition is reached, at which point the `ntile` bucket values can
be calculated. The batches are emitted in a streaming fashion; as soon
as a batch is fully processed, it is returned and work is paused until
the next call to `Next()`.

Release note: None
  • Loading branch information
Drew Kimball authored and DrewKimball committed May 13, 2021
1 parent ac03340 commit 0e64034
Show file tree
Hide file tree
Showing 13 changed files with 1,556 additions and 82 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/colexecsel/default_cmp_sel_ops.eg.go \
pkg/sql/colexec/colexecsel/selection_ops.eg.go \
pkg/sql/colexec/colexecsel/sel_like_ops.eg.go \
pkg/sql/colexec/colexecwindow/ntile.eg.go \
pkg/sql/colexec/colexecwindow/rank.eg.go \
pkg/sql/colexec/colexecwindow/relative_rank.eg.go \
pkg/sql/colexec/colexecwindow/row_number.eg.go \
Expand Down
21 changes: 20 additions & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,10 @@ func NewColOperator(
// temporary columns that can be appended below.
typs := make([]*types.T, len(result.ColumnTypes), len(result.ColumnTypes)+2)
copy(typs, result.ColumnTypes)
argTypes := make([]*types.T, len(wf.ArgsIdxs))
for i, idx := range wf.ArgsIdxs {
argTypes[i] = typs[idx]
}
tempColOffset, partitionColIdx := uint32(0), tree.NoColumnIdx
peersColIdx := tree.NoColumnIdx
windowFn := *wf.Func.WindowFunc
Expand Down Expand Up @@ -1238,6 +1242,21 @@ func NewColOperator(
if c, ok := result.Root.(colexecop.Closer); ok {
result.ToClose = append(result.ToClose, c)
}
case execinfrapb.WindowerSpec_NTILE:
// We are using an unlimited memory monitor here because
// the ntile operators themselves are responsible for
// making sure that we stay within the memory limit, and
// they will fall back to disk if necessary.
opName := opNamePrefix + "ntile"
unlimitedAllocator := colmem.NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(ctx, flowCtx, opName, spec.ProcessorID), factory,
)
diskAcc := result.createDiskAccount(ctx, flowCtx, opName, spec.ProcessorID)
argIdx := int(wf.ArgsIdxs[0])
result.Root = colexecwindow.NewNTileOperator(
unlimitedAllocator, execinfra.GetWorkMemLimit(flowCtx), args.DiskQueueCfg,
args.FDSemaphore, input, typs, outputIdx, partitionColIdx, argIdx, diskAcc,
)
default:
return r, errors.AssertionFailedf("window function %s is not supported", wf.String())
}
Expand All @@ -1254,7 +1273,7 @@ func NewColOperator(
result.Root = colexecbase.NewSimpleProjectOp(result.Root, int(wf.OutputColIdx+tempColOffset), projection)
}

_, returnType, err := execinfrapb.GetWindowFunctionInfo(wf.Func, []*types.T{}...)
_, returnType, err := execinfrapb.GetWindowFunctionInfo(wf.Func, argTypes...)
if err != nil {
return r, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/colexecwindow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ go_library(
"//pkg/sql/colexecop", # keep
"//pkg/sql/colmem", # keep
"//pkg/sql/execinfrapb", # keep
"//pkg/sql/pgwire/pgcode", # keep
"//pkg/sql/pgwire/pgerror", # keep
"//pkg/sql/sem/tree", # keep
"//pkg/sql/types", # keep
"//pkg/util/mon", # keep
Expand Down Expand Up @@ -48,6 +50,7 @@ go_test(
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/testutils/buildutil",
Expand All @@ -63,6 +66,7 @@ go_test(

# Map between target name and relevant template.
targets = [
("ntile.eg.go", "ntile_tmpl.go"),
("rank.eg.go", "rank_tmpl.go"),
("relative_rank.eg.go", "relative_rank_tmpl.go"),
("row_number.eg.go", "row_number_tmpl.go"),
Expand Down
Loading

0 comments on commit 0e64034

Please sign in to comment.