Skip to content

Commit

Permalink
Remove limitation of compression policy for continuous aggregates
Browse files Browse the repository at this point in the history
Before compressed chunks were mutable, adding a compression policy
to a continuous aggregate that could include portions of the refrsh
window were blocked. When a continuous aggregate is refreshed, the
underlying chunks needed to allow DELETEs and INSERTs, so they
could not be compressed. Now, compressed chunks allow both operations
and there is no longer a need to prevent the refresh window and
compression window from overlapping.
  • Loading branch information
RobAtticus committed Feb 5, 2025
1 parent d0c2bb7 commit 30b47b8
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 39 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7656
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7656 Remove limitation of compression policy for continuous aggregates
15 changes: 0 additions & 15 deletions tsl/src/bgw_policy/compression_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,21 +338,6 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
format_type_be(compress_after_type))));
}
/*
* If this is a compression policy for a cagg, verify that
* compress_after > refresh_start of cagg policy. We do not want
* to compress regions that can be refreshed by the cagg policy.
*/
if (is_cagg && !policy_refresh_cagg_refresh_start_lt(hypertable->fd.id,
compress_after_type,
compress_after_datum))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("compress_after value for compression policy should be greater than the "
"start of the refresh window of continuous aggregate policy for %s",
get_rel_name(user_rel_oid))));
}

JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
Jsonb *config = JsonbValueToJsonb(result);
Expand Down
6 changes: 0 additions & 6 deletions tsl/test/expected/cagg_errors.out
Original file line number Diff line number Diff line change
Expand Up @@ -595,14 +595,8 @@ ALTER MATERIALIZED VIEW i2980_cagg SET ( timescaledb.compress );
NOTICE: defaulting compress_orderby to time_bucket
WARNING: there was some uncertainty picking the default segment by for the hypertable: You do not have any indexes on columns that can be used for segment_by and thus we are not using segment_by for compression. Please make sure you are not missing any indexes
NOTICE: default segment by for hypertable "_materialized_hypertable_13" is set to ""
SELECT add_compression_policy('i2980_cagg', '8 day'::interval);
ERROR: compress_after value for compression policy should be greater than the start of the refresh window of continuous aggregate policy for i2980_cagg
SELECT add_continuous_aggregate_policy('i2980_cagg2', '10 day'::interval, '6 day'::interval);
ERROR: function add_continuous_aggregate_policy(unknown, interval, interval) does not exist at character 8
SELECT add_compression_policy('i2980_cagg2', '3 day'::interval);
ERROR: compress_after value for compression policy should be greater than the start of the refresh window of continuous aggregate policy for i2980_cagg2
SELECT add_compression_policy('i2980_cagg2', '1 day'::interval);
ERROR: compress_after value for compression policy should be greater than the start of the refresh window of continuous aggregate policy for i2980_cagg2
SELECT add_compression_policy('i2980_cagg2', '3'::integer);
ERROR: unsupported compress_after argument type, expected type : interval
SELECT add_compression_policy('i2980_cagg2', 13::integer);
Expand Down
42 changes: 30 additions & 12 deletions tsl/test/expected/cagg_policy.out
Original file line number Diff line number Diff line change
Expand Up @@ -1158,24 +1158,42 @@ ALTER MATERIALIZED VIEW mat_smallint SET (timescaledb.compress);
NOTICE: defaulting compress_orderby to a
WARNING: there was some uncertainty picking the default segment by for the hypertable: You do not have any indexes on columns that can be used for segment_by and thus we are not using segment_by for compression. Please make sure you are not missing any indexes
NOTICE: default segment by for hypertable "_materialized_hypertable_11" is set to ""
\set ON_ERROR_STOP 0
SELECT add_compression_policy('mat_smallint', 0::smallint);
ERROR: compress_after value for compression policy should be greater than the start of the refresh window of continuous aggregate policy for mat_smallint
-- With immutable compressed chunks, these policies would fail by overlapping the refresh window
SELECT add_compression_policy('mat_smallint', -4::smallint);
ERROR: compress_after value for compression policy should be greater than the start of the refresh window of continuous aggregate policy for mat_smallint
add_compression_policy
------------------------
1053
(1 row)

SELECT remove_compression_policy('mat_smallint');
remove_compression_policy
---------------------------
t
(1 row)

SELECT add_compression_policy('mat_bigint', 0::bigint);
ERROR: compress_after value for compression policy should be greater than the start of the refresh window of continuous aggregate policy for mat_bigint
\set ON_ERROR_STOP 1
add_compression_policy
------------------------
1054
(1 row)

SELECT remove_compression_policy('mat_bigint');
remove_compression_policy
---------------------------
t
(1 row)

-- End previous limitation tests
SELECT add_compression_policy('mat_smallint', 5::smallint);
add_compression_policy
------------------------
1053
1055
(1 row)

SELECT add_compression_policy('mat_bigint', 20::bigint);
add_compression_policy
------------------------
1054
1056
(1 row)

-- end of coverage tests
Expand Down Expand Up @@ -1227,7 +1245,7 @@ WITH NO DATA;
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, NULL, '1 h'::interval, if_not_exists => true);
add_continuous_aggregate_policy
---------------------------------
1055
1057
(1 row)

SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval, if_not_exists => true);
Expand All @@ -1246,7 +1264,7 @@ SELECT remove_continuous_aggregate_policy('metrics_cagg');
SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true);
add_continuous_aggregate_policy
---------------------------------
1056
1058
(1 row)

SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE
Expand Down Expand Up @@ -1287,7 +1305,7 @@ ERROR: cannot use "compress_created_before" with continuous aggregate "metrics_
SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
COMP_JOB
----------
1058
1060
(1 row)

SELECT remove_compression_policy('metrics_cagg');
Expand Down Expand Up @@ -1429,7 +1447,7 @@ SELECT set_integer_now_func('t', 'unix_now');
SELECT add_retention_policy('t', 20);
add_retention_policy
----------------------
1060
1062
(1 row)

CREATE MATERIALIZED VIEW cagg(a, sumb) WITH (timescaledb.continuous)
Expand Down
3 changes: 0 additions & 3 deletions tsl/test/sql/cagg_errors.sql
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,8 @@ ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress, timescaledb.comp
select add_continuous_aggregate_policy('i2980_cagg2', interval '10 day', interval '2 day' ,'4h') AS job_id ;
SELECT add_compression_policy('i2980_cagg', '8 day'::interval);
ALTER MATERIALIZED VIEW i2980_cagg SET ( timescaledb.compress );
SELECT add_compression_policy('i2980_cagg', '8 day'::interval);

SELECT add_continuous_aggregate_policy('i2980_cagg2', '10 day'::interval, '6 day'::interval);
SELECT add_compression_policy('i2980_cagg2', '3 day'::interval);
SELECT add_compression_policy('i2980_cagg2', '1 day'::interval);
SELECT add_compression_policy('i2980_cagg2', '3'::integer);
SELECT add_compression_policy('i2980_cagg2', 13::integer);

Expand Down
8 changes: 5 additions & 3 deletions tsl/test/sql/cagg_policy.sql
Original file line number Diff line number Diff line change
Expand Up @@ -505,11 +505,13 @@ SELECT * FROM mat_bigint WHERE a>100 ORDER BY 1;

ALTER MATERIALIZED VIEW mat_bigint SET (timescaledb.compress);
ALTER MATERIALIZED VIEW mat_smallint SET (timescaledb.compress);
\set ON_ERROR_STOP 0
SELECT add_compression_policy('mat_smallint', 0::smallint);
-- With immutable compressed chunks, these policies would fail by overlapping the refresh window
SELECT add_compression_policy('mat_smallint', -4::smallint);
SELECT remove_compression_policy('mat_smallint');
SELECT add_compression_policy('mat_bigint', 0::bigint);
\set ON_ERROR_STOP 1
SELECT remove_compression_policy('mat_bigint');
-- End previous limitation tests

SELECT add_compression_policy('mat_smallint', 5::smallint);
SELECT add_compression_policy('mat_bigint', 20::bigint);

Expand Down

0 comments on commit 30b47b8

Please sign in to comment.