Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Put vectorized aggregation results in short-lived memory context #7461

Merged
merged 8 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,23 +194,31 @@ static TupleTableSlot *
vector_agg_exec(CustomScanState *node)
{
VectorAggState *vector_agg_state = (VectorAggState *) node;
ExprContext *econtext = node->ss.ps.ps_ExprContext;
ResetExprContext(econtext);

TupleTableSlot *aggregated_slot = vector_agg_state->custom.ss.ps.ps_ResultTupleSlot;
ExecClearTuple(aggregated_slot);

/*
* If we have more partial aggregation results, continue returning them.
*/
GroupingPolicy *grouping = vector_agg_state->grouping;
if (grouping->gp_do_emit(grouping, aggregated_slot))
MemoryContext old_context = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
bool have_partial = grouping->gp_do_emit(grouping, aggregated_slot);
MemoryContextSwitchTo(old_context);
if (have_partial)
{
/* The grouping policy produced a partial aggregation result. */
return ExecStoreVirtualTuple(aggregated_slot);
}

/*
* If the partial aggregation results have ended, and the input has ended,
* we're done.
*/
if (vector_agg_state->input_ended)
{
/*
* The partial aggregation results have ended, and the input has ended,
* so we're done.
*/
return NULL;
}

Expand Down Expand Up @@ -285,7 +293,13 @@ vector_agg_exec(CustomScanState *node)
grouping->gp_add_batch(grouping, batch_state);
}

if (grouping->gp_do_emit(grouping, aggregated_slot))
/*
* If we have partial aggregation results, start returning them.
*/
old_context = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
have_partial = grouping->gp_do_emit(grouping, aggregated_slot);
MemoryContextSwitchTo(old_context);
if (have_partial)
{
/* Have partial aggregation results. */
return ExecStoreVirtualTuple(aggregated_slot);
Expand Down
10 changes: 10 additions & 0 deletions tsl/src/nodes/vector_agg/plan.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ has_vector_agg_node(Plan *plan, bool *has_normal_agg)
append_plans = custom->custom_plans;
}
}
else if (IsA(plan, SubqueryScan))
{
SubqueryScan *subquery = castNode(SubqueryScan, plan);
append_plans = list_make1(subquery->subplan);
}

if (append_plans)
{
Expand Down Expand Up @@ -437,6 +442,11 @@ try_insert_vector_agg_node(Plan *plan)
append_plans = custom->custom_plans;
}
}
else if (IsA(plan, SubqueryScan))
{
SubqueryScan *subquery = castNode(SubqueryScan, plan);
append_plans = list_make1(subquery->subplan);
}

if (append_plans)
{
Expand Down
121 changes: 121 additions & 0 deletions tsl/test/expected/vector_agg_memory.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_SUPERUSER
-- Helper function that returns the amount of memory currently allocated in a
-- given memory context.
create or replace function ts_debug_allocated_bytes(text = 'PortalContext') returns bigint
as :MODULE_PATHNAME, 'ts_debug_allocated_bytes'
language c strict volatile;
create table mvagg (t int, s0 int, s1 int);
select create_hypertable('mvagg', 't', chunk_time_interval => pow(10, 9)::int);
NOTICE: adding not-null constraint to column "t"
create_hypertable
--------------------
(1,public,mvagg,t)
(1 row)

insert into mvagg select generate_series(1, 2 * pow(10, 6)::int) t, 1 s0, 1 s1;
-- Need two segmentbys to prevent compressed index scans to force hash aggregation.
-- Otherwise we might get GroupAggregate with Sort which uses linear memory in
-- the number of compressed batches.
alter table mvagg set (timescaledb.compress, timescaledb.compress_segmentby='s0, s1');
NOTICE: default order by for hypertable "mvagg" is set to "t DESC"
-- Need to inflate the estimated cardinalities of segmentby columns, again to
-- force the hash aggregation.
insert into mvagg select -1 - x t, -x s0, -x s1 from generate_series(1, 1000) x;
-- Need two chunks for chunkwise aggregation.
insert into mvagg select -1 t, 1 s0, 1 s1;
select count(compress_chunk(x)) from show_chunks('mvagg') x;
count
-------
2
(1 row)

vacuum analyze mvagg;
-- We are going to log memory usage as a function of number of aggregated elements
-- here.
create table log(n int, bytes int, a bigint, b bigint, c bigint, d bigint, e bigint, f bigint);
-- First, ensure that the underlying decompression has constant memory usage.
explain (costs off) select distinct on (s0, s1) ts_debug_allocated_bytes() bytes,
s0, s1, t
from mvagg where t >= -1 and t < 1000000 order by s0, s1, t desc;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------
Result
-> Unique
-> Merge Append
Sort Key: _hyper_1_1_chunk.s0, _hyper_1_1_chunk.s1, _hyper_1_1_chunk.t DESC
-> Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: ((t >= '-1'::integer) AND (t < 1000000))
-> Index Scan using compress_hyper_2_3_chunk_s0_s1__ts_meta_min_1__ts_meta_max__idx on compress_hyper_2_3_chunk
Index Cond: ((_ts_meta_min_1 < 1000000) AND (_ts_meta_max_1 >= '-1'::integer))
-> Custom Scan (DecompressChunk) on _hyper_1_2_chunk
Vectorized Filter: ((t >= '-1'::integer) AND (t < 1000000))
-> Index Scan using compress_hyper_2_4_chunk_s0_s1__ts_meta_min_1__ts_meta_max__idx on compress_hyper_2_4_chunk
Index Cond: ((_ts_meta_min_1 < 1000000) AND (_ts_meta_max_1 >= '-1'::integer))
(12 rows)

truncate log;
\set ECHO none
select * from log where (
-- Ideally the memory usage should be constant, but we have to allow for
-- small spurious changes to make this test more robust.
select regr_slope(bytes, n) > 1/65536 from log
);
n | bytes | a | b | c | d | e | f
---+-------+---+---+---+---+---+---
(0 rows)

-- Test the vectorized aggregation with grouping by segmentby with various number
-- of input row. We expect approximately constant memory usage.
truncate log;
set max_parallel_workers_per_gather = 0;
set timescaledb.debug_require_vector_agg = 'require';
-- Despite the tweaks above, we are unable to force the HashAggregation, because
-- the unsorted DecompressChunk paths for aggregation are not created properly
-- (see issue #6836). Limit the memory consumed by tuplesort.
set work_mem = '64kB';
explain (costs off) select ts_debug_allocated_bytes() bytes,
count(*) a, count(t) b, sum(t) c, avg(t) d, min(t) e, max(t) f
from mvagg where t >= -1 and t < 1000000 group by s1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate
Group Key: _hyper_1_1_chunk.s1
-> Sort
Sort Key: _hyper_1_1_chunk.s1
-> Append
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: ((t >= '-1'::integer) AND (t < 1000000))
-> Sort
Sort Key: compress_hyper_2_3_chunk.s1
-> Seq Scan on compress_hyper_2_3_chunk
Filter: ((_ts_meta_max_1 >= '-1'::integer) AND (_ts_meta_min_1 < 1000000))
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_1_2_chunk
Vectorized Filter: ((t >= '-1'::integer) AND (t < 1000000))
-> Sort
Sort Key: compress_hyper_2_4_chunk.s1
-> Index Scan using compress_hyper_2_4_chunk_s0_s1__ts_meta_min_1__ts_meta_max__idx on compress_hyper_2_4_chunk
Index Cond: ((_ts_meta_min_1 < 1000000) AND (_ts_meta_max_1 >= '-1'::integer))
(19 rows)

\set ECHO none
reset timescaledb.debug_require_vector_agg;
reset max_parallel_workers_per_gather;
reset work_mem;
select * from log where (
-- For aggregation by segmentby, memory usage should be constant regardless
-- of the number of tuples. Still, we have to allow for small variations
-- that can be caused by other reasons. Currently the major increase is
-- caused by tuplesort, because we are unable to force hash aggregation due
-- to unrelated planning bugs.
select regr_slope(bytes, n) > 0.05 from log
);
n | bytes | a | b | c | d | e | f
---+-------+---+---+---+---+---+---
(0 rows)

reset timescaledb.debug_require_vector_agg;
1 change: 1 addition & 0 deletions tsl/test/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ if(CMAKE_BUILD_TYPE MATCHES Debug)
recompress_chunk_segmentwise.sql
feature_flags.sql
vector_agg_default.sql
vector_agg_memory.sql
vector_agg_segmentby.sql)

list(
Expand Down
99 changes: 99 additions & 0 deletions tsl/test/sql/vector_agg_memory.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.

\c :TEST_DBNAME :ROLE_SUPERUSER

-- Helper function that returns the amount of memory currently allocated in a
-- given memory context.
create or replace function ts_debug_allocated_bytes(text = 'PortalContext') returns bigint
as :MODULE_PATHNAME, 'ts_debug_allocated_bytes'
language c strict volatile;


create table mvagg (t int, s0 int, s1 int);
select create_hypertable('mvagg', 't', chunk_time_interval => pow(10, 9)::int);
insert into mvagg select generate_series(1, 2 * pow(10, 6)::int) t, 1 s0, 1 s1;

-- Need two segmentbys to prevent compressed index scans to force hash aggregation.
-- Otherwise we might get GroupAggregate with Sort which uses linear memory in
-- the number of compressed batches.
alter table mvagg set (timescaledb.compress, timescaledb.compress_segmentby='s0, s1');
-- Need to inflate the estimated cardinalities of segmentby columns, again to
-- force the hash aggregation.
insert into mvagg select -1 - x t, -x s0, -x s1 from generate_series(1, 1000) x;
-- Need two chunks for chunkwise aggregation.
insert into mvagg select -1 t, 1 s0, 1 s1;

select count(compress_chunk(x)) from show_chunks('mvagg') x;

vacuum analyze mvagg;

-- We are going to log memory usage as a function of number of aggregated elements
-- here.
create table log(n int, bytes int, a bigint, b bigint, c bigint, d bigint, e bigint, f bigint);


-- First, ensure that the underlying decompression has constant memory usage.
explain (costs off) select distinct on (s0, s1) ts_debug_allocated_bytes() bytes,
s0, s1, t
from mvagg where t >= -1 and t < 1000000 order by s0, s1, t desc;

truncate log;
\set ECHO none
select
format('insert into log
select distinct on (s0, s1) %1$s,
ts_debug_allocated_bytes() bytes,
0 a, 0 b, 0 c, 0 d, 0 e, 0 f
from mvagg where t >= -1 and t < %1$s
order by s0, s1, t desc',
pow(10, generate_series(1, 7)))
\gexec
\set ECHO all

select * from log where (
-- Ideally the memory usage should be constant, but we have to allow for
-- small spurious changes to make this test more robust.
select regr_slope(bytes, n) > 1/65536 from log
);

-- Test the vectorized aggregation with grouping by segmentby with various number
-- of input row. We expect approximately constant memory usage.
truncate log;
set max_parallel_workers_per_gather = 0;
set timescaledb.debug_require_vector_agg = 'require';
-- Despite the tweaks above, we are unable to force the HashAggregation, because
-- the unsorted DecompressChunk paths for aggregation are not created properly
-- (see issue #6836). Limit the memory consumed by tuplesort.
set work_mem = '64kB';

explain (costs off) select ts_debug_allocated_bytes() bytes,
count(*) a, count(t) b, sum(t) c, avg(t) d, min(t) e, max(t) f
from mvagg where t >= -1 and t < 1000000 group by s1;

\set ECHO none
select
format('insert into log
select %1$s,
ts_debug_allocated_bytes() bytes,
count(*) a, count(t) b, sum(t) c, avg(t) d, min(t) e, max(t) f
from mvagg where t >= -1 and t < %1$s group by s1',
pow(10, generate_series(1, 7)))
\gexec
\set ECHO all

reset timescaledb.debug_require_vector_agg;
reset max_parallel_workers_per_gather;
reset work_mem;

select * from log where (
-- For aggregation by segmentby, memory usage should be constant regardless
-- of the number of tuples. Still, we have to allow for small variations
-- that can be caused by other reasons. Currently the major increase is
-- caused by tuplesort, because we are unable to force hash aggregation due
-- to unrelated planning bugs.
select regr_slope(bytes, n) > 0.05 from log
);

reset timescaledb.debug_require_vector_agg;
Loading