Skip to content

Commit

Permalink
Store per chunk compression settings
Browse files Browse the repository at this point in the history
This patch materializes the hypertable compression configuration
when the compressed chunk is created and adjusts the code to use
the per chunk settings whenever working with compressed data.
Allow chunk settings to differ from hypertable settings will be
implemented in a follow-up patch.
  • Loading branch information
svenklemm committed Dec 21, 2023
1 parent e41137d commit 7409002
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 47 deletions.
4 changes: 4 additions & 0 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
#include "trigger.h"
#include "ts_catalog/catalog.h"
#include "ts_catalog/compression_chunk_size.h"
#include "ts_catalog/compression_settings.h"
#include "ts_catalog/continuous_agg.h"
#include "ts_catalog/continuous_aggs_watermark.h"
#include "utils.h"
Expand Down Expand Up @@ -2927,9 +2928,12 @@ chunk_tuple_delete(TupleInfo *ti, DropBehavior behavior, bool preserve_chunk_cat

/* The chunk may have been delete by a CASCADE */
if (compressed_chunk != NULL)
{
/* Plain drop without preserving catalog row because this is the compressed
* chunk */
ts_compression_settings_delete(compressed_chunk->table_id);
ts_chunk_drop(compressed_chunk, behavior, DEBUG1);
}
}

ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
Expand Down
11 changes: 8 additions & 3 deletions src/event_trigger.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,15 @@ make_event_trigger_drop_index(const char *index_name, const char *schema)
}

static EventTriggerDropRelation *
make_event_trigger_drop_table(const char *table_name, const char *schema, char relkind)
make_event_trigger_drop_table(Oid relid, const char *table_name, const char *schema, char relkind)
{
EventTriggerDropRelation *obj = palloc(sizeof(EventTriggerDropRelation));

*obj = (EventTriggerDropRelation){
.obj = {
.type = (relkind == RELKIND_RELATION) ? EVENT_TRIGGER_DROP_TABLE : EVENT_TRIGGER_DROP_FOREIGN_TABLE,
},
.relid = relid,
.name = table_name,
.schema = schema,
};
Expand Down Expand Up @@ -278,9 +279,12 @@ ts_event_trigger_dropped_objects(void)
eventobj =
make_event_trigger_drop_index(lsecond(addrnames), linitial(addrnames));
else if (strcmp(objtype, "table") == 0)
eventobj = make_event_trigger_drop_table(lsecond(addrnames),
{
eventobj = make_event_trigger_drop_table(DatumGetInt32(values[1]),
lsecond(addrnames),
linitial(addrnames),
RELKIND_RELATION);
}
else if (strcmp(objtype, "view") == 0)
{
List *addrnames = extract_addrnames(DatumGetArrayTypeP(values[10]));
Expand All @@ -290,7 +294,8 @@ ts_event_trigger_dropped_objects(void)
linitial(addrnames)));
}
else if (strcmp(objtype, "foreign table") == 0)
eventobj = make_event_trigger_drop_table(lsecond(addrnames),
eventobj = make_event_trigger_drop_table(DatumGetInt32(values[1]),
lsecond(addrnames),
linitial(addrnames),
RELKIND_FOREIGN_TABLE);
break;
Expand Down
1 change: 1 addition & 0 deletions src/event_trigger.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ typedef struct EventTriggerDropTableConstraint
typedef struct EventTriggerDropRelation
{
EventTriggerDropObject obj;
Oid relid;
const char *name;
const char *schema;
} EventTriggerDropRelation;
Expand Down
4 changes: 1 addition & 3 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -614,9 +614,6 @@ hypertable_tuple_delete(TupleInfo *ti, void *data)
/* Remove any dependent continuous aggs */
ts_continuous_agg_drop_hypertable_callback(hypertable_id);

/* remove any associated compression definitions */
ts_compression_settings_delete(ts_hypertable_id_to_relid(hypertable_id, true));

if (!compressed_hypertable_id_isnull)
{
Hypertable *compressed_hypertable = ts_hypertable_get_by_id(compressed_hypertable_id);
Expand Down Expand Up @@ -709,6 +706,7 @@ ts_hypertable_drop(Hypertable *hypertable, DropBehavior behavior)
/* Drop the postgres table */
performDeletion(&hypertable_addr, behavior, 0);
}

/* Clean up catalog */
ts_hypertable_delete_by_name(NameStr(hypertable->fd.schema_name),
NameStr(hypertable->fd.table_name));
Expand Down
3 changes: 2 additions & 1 deletion src/process_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -1889,7 +1889,7 @@ process_rename_column(ProcessUtilityArgs *args, Cache *hcache, Oid relid, Rename
* we don't do anything. */
if (ht)
{
ts_compression_settings_rename_column(ht->main_table_relid, stmt->subname, stmt->newname);
ts_compression_settings_rename_column_hypertable(ht, stmt->subname, stmt->newname);
add_hypertable_to_process_args(args, ht);
dim = ts_hyperspace_get_mutable_dimension_by_name(ht->space,
DIMENSION_TYPE_ANY,
Expand Down Expand Up @@ -4235,6 +4235,7 @@ process_drop_table(EventTriggerDropObject *obj)
Assert(obj->type == EVENT_TRIGGER_DROP_TABLE || obj->type == EVENT_TRIGGER_DROP_FOREIGN_TABLE);
ts_hypertable_delete_by_name(table->schema, table->name);
ts_chunk_delete_by_name(table->schema, table->name, DROP_RESTRICT);
ts_compression_settings_delete(table->relid);
}

static void
Expand Down
31 changes: 31 additions & 0 deletions src/ts_catalog/compression_settings.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <postgres.h>
#include <utils/builtins.h>

#include "chunk.h"
#include "hypertable.h"
#include "hypertable_cache.h"
#include "ts_catalog/catalog.h"
Expand All @@ -18,6 +19,20 @@ static ScanTupleResult compression_settings_tuple_update(TupleInfo *ti, void *da
static HeapTuple compression_settings_formdata_make_tuple(const FormData_compression_settings *fd,
TupleDesc desc);

CompressionSettings *
ts_compression_settings_materialize(Oid ht_relid, Oid dst_relid)
{
CompressionSettings *src = ts_compression_settings_get(ht_relid);
Assert(src);
CompressionSettings *dst = ts_compression_settings_create(dst_relid,
src->fd.segmentby,
src->fd.orderby,
src->fd.orderby_desc,
src->fd.orderby_nullsfirst);

return dst;
}

CompressionSettings *
ts_compression_settings_create(Oid relid, ArrayType *segmentby, ArrayType *orderby,
ArrayType *orderby_desc, ArrayType *orderby_nullsfirst)
Expand Down Expand Up @@ -154,6 +169,22 @@ ts_compression_settings_delete(Oid relid)
return count > 0;
}

TSDLLEXPORT void
ts_compression_settings_rename_column_hypertable(Hypertable *ht, char *old, char *new)
{
ts_compression_settings_rename_column(ht->main_table_relid, old, new);
if (ht->fd.compressed_hypertable_id)
{
ListCell *lc;
List *chunk_ids = ts_chunk_get_chunk_ids_by_hypertable_id(ht->fd.compressed_hypertable_id);
foreach (lc, chunk_ids)
{
Oid relid = ts_chunk_get_relid(lfirst_int(lc), false);
ts_compression_settings_rename_column(relid, old, new);
}
}
}

TSDLLEXPORT void
ts_compression_settings_rename_column(Oid relid, char *old, char *new)
{
Expand Down
4 changes: 4 additions & 0 deletions src/ts_catalog/compression_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ TSDLLEXPORT CompressionSettings *ts_compression_settings_create(Oid relid, Array
ArrayType *orderby_desc,
ArrayType *orderby_nullsfirst);
TSDLLEXPORT CompressionSettings *ts_compression_settings_get(Oid relid);
TSDLLEXPORT CompressionSettings *ts_compression_settings_materialize(Oid ht_relid, Oid dst_relid);
TSDLLEXPORT bool ts_compression_settings_delete(Oid relid);

TSDLLEXPORT int ts_compression_settings_update(CompressionSettings *settings);

TSDLLEXPORT void ts_compression_settings_rename_column(Oid relid, char *old, char *new);
TSDLLEXPORT void ts_compression_settings_rename_column_hypertable(Hypertable *ht, char *old,
char *new);
24 changes: 9 additions & 15 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
typedef struct CompressChunkCxt
{
Hypertable *srcht;
CompressionSettings *settings;
Chunk *srcht_chunk; /* chunk from srcht */
Hypertable *compress_ht; /*compressed table for srcht */
} CompressChunkCxt;
Expand Down Expand Up @@ -294,8 +293,6 @@ compresschunkcxt_init(CompressChunkCxt *cxt, Cache *hcache, Oid hypertable_relid

ts_hypertable_permissions_check(srcht->main_table_relid, GetUserId());

cxt->settings = ts_compression_settings_get(srcht->main_table_relid);

if (!TS_HYPERTABLE_HAS_COMPRESSION_TABLE(srcht))
{
NameData cagg_ht_name;
Expand Down Expand Up @@ -418,8 +415,11 @@ check_is_chunk_order_violated_by_merge(CompressChunkCxt *cxt, const Dimension *t
return true;
}

CompressionSettings *ht_settings =
ts_compression_settings_get(mergable_chunk->hypertable_relid);

char *attname = get_attname(cxt->srcht->main_table_relid, time_dim->column_attno, false);
int index = ts_array_position(cxt->settings->fd.orderby, attname);
int index = ts_array_position(ht_settings->fd.orderby, attname);

/* Primary dimension column should be first compress_orderby column. */
if (index != 1)
Expand All @@ -430,7 +430,7 @@ check_is_chunk_order_violated_by_merge(CompressChunkCxt *cxt, const Dimension *t
* NULLS FIRST/LAST here because partitioning columns have NOT NULL
* constraint.
*/
if (ts_array_get_element_bool(cxt->settings->fd.orderby_desc, index))
if (ts_array_get_element_bool(ht_settings->fd.orderby_desc, index))
return true;

return false;
Expand Down Expand Up @@ -507,10 +507,7 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
int insert_options = new_compressed_chunk ? HEAP_INSERT_FROZEN : 0;

before_size = ts_relation_size_impl(cxt.srcht_chunk->table_id);
cstat = compress_chunk(cxt.srcht,
cxt.srcht_chunk->table_id,
compress_ht_chunk->table_id,
insert_options);
cstat = compress_chunk(cxt.srcht_chunk->table_id, compress_ht_chunk->table_id, insert_options);

/* Drop all FK constraints on the uncompressed chunk. This is needed to allow
* cascading deleted data in FK-referenced tables, while blocking deleting data
Expand Down Expand Up @@ -651,6 +648,7 @@ decompress_chunk_impl(Oid uncompressed_hypertable_relid, Oid uncompressed_chunk_
/* Delete the compressed chunk */
ts_compression_chunk_size_delete(uncompressed_chunk->fd.id);
ts_chunk_clear_compressed_chunk(uncompressed_chunk);
ts_compression_settings_delete(compressed_chunk->table_id);

/*
* Lock the compressed chunk that is going to be deleted. At this point,
Expand Down Expand Up @@ -911,14 +909,12 @@ tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS)
if (NULL == uncompressed_chunk)
elog(ERROR, "unknown chunk id %d", uncompressed_chunk_id);

int32 srcht_id = uncompressed_chunk->fd.hypertable_id;
Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);

Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ExclusiveLock);
Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, ExclusiveLock);

Hypertable *ht = ts_hypertable_get_by_id(srcht_id);
CompressionSettings *settings = ts_compression_settings_get(ht->main_table_relid);
CompressionSettings *settings = ts_compression_settings_get(compressed_chunk->table_id);

TupleDesc compressed_rel_tupdesc = RelationGetDescr(compressed_chunk_rel);
TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel);
Expand Down Expand Up @@ -1117,10 +1113,8 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
NameStr(uncompressed_chunk->fd.table_name));

/* need it to find the segby cols from the catalog */
int32 srcht_id = uncompressed_chunk->fd.hypertable_id;
Hypertable *ht = ts_hypertable_get_by_id(srcht_id);
CompressionSettings *settings = ts_compression_settings_get(ht->main_table_relid);
Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);
CompressionSettings *settings = ts_compression_settings_get(compressed_chunk->table_id);

int nsegmentby_cols = ts_array_length(settings->fd.segmentby);

Expand Down
26 changes: 12 additions & 14 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ truncate_relation(Oid table_oid)
}

CompressionStats
compress_chunk(Hypertable *ht, Oid in_table, Oid out_table, int insert_options)
compress_chunk(Oid in_table, Oid out_table, int insert_options)
{
int n_keys;
ListCell *lc;
Expand All @@ -235,7 +235,7 @@ compress_chunk(Hypertable *ht, Oid in_table, Oid out_table, int insert_options)
HeapTuple in_table_tp = NULL, index_tp = NULL;
Form_pg_attribute in_table_attr_tp, index_attr_tp;
CompressionStats cstat;
CompressionSettings *settings = ts_compression_settings_get(ht->main_table_relid);
CompressionSettings *settings = ts_compression_settings_get(out_table);

/* We want to prevent other compressors from compressing this table,
* and we want to prevent INSERTs or UPDATEs which could mess up our compression.
Expand Down Expand Up @@ -1951,14 +1951,14 @@ compression_get_default_algorithm(Oid typeoid)
* columns of the uncompressed chunk.
*/
static ScanKeyData *
build_scankeys(int32 hypertable_id, Oid hypertable_relid, RowDecompressor *decompressor,
build_scankeys(Oid hypertable_relid, Oid out_rel, RowDecompressor *decompressor,
Bitmapset *key_columns, Bitmapset **null_columns, TupleTableSlot *slot,
int *num_scankeys)
{
int key_index = 0;
ScanKeyData *scankeys = NULL;

CompressionSettings *settings = ts_compression_settings_get(hypertable_relid);
CompressionSettings *settings = ts_compression_settings_get(out_rel);
Assert(settings);

if (!bms_is_empty(key_columns))
Expand Down Expand Up @@ -2137,8 +2137,8 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
Bitmapset *null_columns = NULL;

int num_scankeys;
ScanKeyData *scankeys = build_scankeys(chunk->fd.hypertable_id,
chunk->hypertable_relid,
ScanKeyData *scankeys = build_scankeys(chunk->hypertable_relid,
in_rel->rd_id,
&decompressor,
key_columns,
&null_columns,
Expand Down Expand Up @@ -2380,13 +2380,9 @@ find_matching_index(Relation comp_chunk_rel, List **index_filters, List **heap_f
* be used to build scan keys later.
*/
static void
fill_predicate_context(Chunk *ch, List *predicates, List **heap_filters, List **index_filters,
List **is_null)
fill_predicate_context(Chunk *ch, CompressionSettings *settings, List *predicates,
List **heap_filters, List **index_filters, List **is_null)
{
Hypertable *ht = ts_hypertable_get_by_id(ch->fd.hypertable_id);
CompressionSettings *settings = ts_compression_settings_get(ht->main_table_relid);
Assert(settings);

ListCell *lc;
foreach (lc, predicates)
{
Expand Down Expand Up @@ -2940,10 +2936,12 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
ScanKeyData *index_scankeys = NULL;
int num_index_scankeys = 0;

fill_predicate_context(chunk, predicates, &heap_filters, &index_filters, &is_null);
comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
CompressionSettings *settings = ts_compression_settings_get(comp_chunk->table_id);

fill_predicate_context(chunk, settings, predicates, &heap_filters, &index_filters, &is_null);

chunk_rel = table_open(chunk->table_id, RowExclusiveLock);
comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
comp_chunk_rel = table_open(comp_chunk->table_id, RowExclusiveLock);
decompressor = build_decompressor(comp_chunk_rel, chunk_rel);

Expand Down
3 changes: 1 addition & 2 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,7 @@ pg_attribute_unused() assert_num_compression_algorithms_sane(void)
extern CompressionStorage compression_get_toast_storage(CompressionAlgorithm algo);
extern CompressionAlgorithm compression_get_default_algorithm(Oid typeoid);

extern CompressionStats compress_chunk(Hypertable *ht, Oid in_table, Oid out_table,
int insert_options);
extern CompressionStats compress_chunk(Oid in_table, Oid out_table, int insert_options);
extern void decompress_chunk(Oid in_table, Oid out_table);

extern DecompressionIterator *(*tsl_get_decompression_iterator_init(
Expand Down
18 changes: 9 additions & 9 deletions tsl/src/compression/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -575,22 +575,22 @@ create_compression_table(Oid owner, CompressColInfo *compress_cols, Oid tablespa
Chunk *
create_compress_chunk(Hypertable *compress_ht, Chunk *src_chunk, Oid table_id)
{
Hyperspace *hs = compress_ht->space;
Catalog *catalog = ts_catalog_get();
CatalogSecurityContext sec_ctx;
Chunk *compress_chunk;
int namelen;
Oid tablespace_oid;
const char *tablespace;

/* Create a new chunk based on the hypercube */
Assert(compress_ht->space->num_dimensions == 0);

/* Create a new catalog entry for chunk based on the hypercube */
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
compress_chunk = ts_chunk_create_base(ts_catalog_table_next_seq_id(catalog, CHUNK),
hs->num_dimensions,
RELKIND_RELATION);
compress_chunk =
ts_chunk_create_base(ts_catalog_table_next_seq_id(catalog, CHUNK), 0, RELKIND_RELATION);
ts_catalog_restore_user(&sec_ctx);

compress_chunk->fd.hypertable_id = hs->hypertable_id;
compress_chunk->fd.hypertable_id = compress_ht->fd.id;
compress_chunk->cube = src_chunk->cube;
compress_chunk->hypertable_relid = compress_ht->main_table_relid;
compress_chunk->constraints = ts_chunk_constraints_alloc(1, CurrentMemoryContext);
Expand Down Expand Up @@ -648,6 +648,9 @@ create_compress_chunk(Hypertable *compress_ht, Chunk *src_chunk, Oid table_id)
if (!OidIsValid(compress_chunk->table_id))
elog(ERROR, "could not create compressed chunk table");

/* Materialize current compression settings for this chunk */
ts_compression_settings_materialize(src_chunk->hypertable_relid, compress_chunk->table_id);

/* if the src chunk is not in the default tablespace, the compressed indexes
* should also be in a non-default tablespace. IN the usual case, this is inferred
* from the hypertable's and chunk's tablespace info. We do not propagate
Expand Down Expand Up @@ -1136,9 +1139,6 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht,
segmentby_cols,
orderby_cols);

settings = ts_compression_settings_get(ht->main_table_relid);
Assert(settings);

/* Check if we can create a compressed hypertable with existing
* constraints and indexes. */
List *indexes = NIL;
Expand Down
Loading

0 comments on commit 7409002

Please sign in to comment.