Skip to content

Commit

Permalink
Use a stateful detoaster for compressed data
Browse files Browse the repository at this point in the history
The normal Postgres detoasting code locks and opens toast tables and
indexes for each toast value, which can take a large percentage CPU time
on simple queries. Since in decompression we're working with one table
at a time, the toast table and index are the same for every datum, so we
don't have to redo this work.
  • Loading branch information
akuzm committed Dec 19, 2023
1 parent 301612a commit a64ae61
Show file tree
Hide file tree
Showing 12 changed files with 582 additions and 27 deletions.
10 changes: 3 additions & 7 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS)
table_close(compressed_chunk_rel, NoLock);
table_close(uncompressed_chunk_rel, NoLock);

row_compressor_finish(&row_compressor);
row_compressor_close(&row_compressor);

if (OidIsValid(row_compressor.index_oid))
{
Expand Down Expand Up @@ -1194,10 +1194,6 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
/******************** row decompressor **************/

RowDecompressor decompressor = build_decompressor(compressed_chunk_rel, uncompressed_chunk_rel);
/* do not need the indexes on the uncompressed chunk as we do not write to it anymore */
ts_catalog_close_indexes(decompressor.indexstate);
/* also do not need estate because we don't insert into indexes */
FreeExecutorState(decompressor.estate);
/********** row compressor *******************/
RowCompressor row_compressor;
row_compressor_init(settings,
Expand Down Expand Up @@ -1392,12 +1388,12 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
row_compressor.rowcnt_pre_compression,
row_compressor.num_compressed_rows);

row_compressor_finish(&row_compressor);
FreeBulkInsertState(decompressor.bistate);
row_compressor_close(&row_compressor);
ExecDropSingleTupleTableSlot(slot);
index_endscan(index_scan);
UnregisterSnapshot(snapshot);
index_close(index_rel, AccessExclusiveLock);
row_decompressor_close(&decompressor);

#if PG14_LT
int options = 0;
Expand Down
36 changes: 22 additions & 14 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ compress_chunk(Hypertable *ht, Oid in_table, Oid out_table, int insert_options)
tuplesort_end(sorted_rel);
}

row_compressor_finish(&row_compressor);
row_compressor_close(&row_compressor);
DEBUG_WAITPOINT("compression_done_before_truncate_uncompressed");
truncate_relation(in_table);

Expand Down Expand Up @@ -1269,7 +1269,7 @@ row_compressor_reset(RowCompressor *row_compressor)
}

void
row_compressor_finish(RowCompressor *row_compressor)
row_compressor_close(RowCompressor *row_compressor)
{
if (row_compressor->bistate)
FreeBulkInsertState(row_compressor->bistate);
Expand Down Expand Up @@ -1395,9 +1395,21 @@ build_decompressor(Relation in_rel, Relation out_rel)
*/
memset(decompressor.decompressed_is_nulls, true, out_desc->natts);

detoaster_init(&decompressor.detoaster, CurrentMemoryContext);

return decompressor;
}

void
row_decompressor_close(RowDecompressor *decompressor)
{
FreeBulkInsertState(decompressor->bistate);
MemoryContextDelete(decompressor->per_compressed_row_ctx);
ts_catalog_close_indexes(decompressor->indexstate);
FreeExecutorState(decompressor->estate);
detoaster_close(&decompressor->detoaster);
}

void
decompress_chunk(Oid in_table, Oid out_table)
{
Expand Down Expand Up @@ -1438,10 +1450,7 @@ decompress_chunk(Oid in_table, Oid out_table)

table_endscan(scan);
ExecDropSingleTupleTableSlot(slot);
FreeBulkInsertState(decompressor.bistate);
MemoryContextDelete(decompressor.per_compressed_row_ctx);
ts_catalog_close_indexes(decompressor.indexstate);
FreeExecutorState(decompressor.estate);
row_decompressor_close(&decompressor);

table_close(out_rel, NoLock);
table_close(in_rel, NoLock);
Expand Down Expand Up @@ -1555,8 +1564,11 @@ decompress_batch(RowDecompressor *decompressor)
}

/* Normal compressed column. */
CompressedDataHeader *header =
get_compressed_data_header(decompressor->compressed_datums[input_column]);
Datum compressed_datum = PointerGetDatum(
detoaster_detoast_attr((struct varlena *) DatumGetPointer(
decompressor->compressed_datums[input_column]),
&decompressor->detoaster));
CompressedDataHeader *header = get_compressed_data_header(compressed_datum);
column_info->iterator =
definitions[header->compression_algorithm]
.iterator_init_forward(PointerGetDatum(header), column_info->decompressed_type);
Expand Down Expand Up @@ -2201,9 +2213,7 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo

table_endscan(scan);
ExecDropSingleTupleTableSlot(compressed_slot);
ts_catalog_close_indexes(decompressor.indexstate);
FreeExecutorState(decompressor.estate);
FreeBulkInsertState(decompressor.bistate);
row_decompressor_close(&decompressor);

CommandCounterIncrement();
table_close(in_rel, NoLock);
Expand Down Expand Up @@ -3337,9 +3347,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
if (chunk_status_changed == true)
ts_chunk_set_partial(chunk);

ts_catalog_close_indexes(decompressor.indexstate);
FreeExecutorState(decompressor.estate);
FreeBulkInsertState(decompressor.bistate);
row_decompressor_close(&decompressor);

table_close(chunk_rel, NoLock);
table_close(comp_chunk_rel, NoLock);
Expand Down
6 changes: 5 additions & 1 deletion tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
typedef struct BulkInsertStateData *BulkInsertState;

#include "compat/compat.h"
#include "nodes/decompress_chunk/detoaster.h"
#include "hypertable.h"
#include "segment_meta.h"
#include "ts_catalog/compression_settings.h"
Expand Down Expand Up @@ -148,6 +149,8 @@ typedef struct RowDecompressor
int64 tuples_decompressed;

TupleTableSlot **decompressed_slots;

Detoaster detoaster;
} RowDecompressor;

/*
Expand Down Expand Up @@ -358,13 +361,14 @@ extern void row_compressor_init(CompressionSettings *settings, RowCompressor *ro
int16 num_columns_in_compressed_table, bool need_bistate,
bool reset_sequence, int insert_options);
extern void row_compressor_reset(RowCompressor *row_compressor);
extern void row_compressor_finish(RowCompressor *row_compressor);
extern void row_compressor_close(RowCompressor *row_compressor);
extern void row_compressor_append_sorted_rows(RowCompressor *row_compressor,
Tuplesortstate *sorted_rel, TupleDesc sorted_desc);
extern void segment_info_update(SegmentInfo *segment_info, Datum val, bool is_null);

extern RowDecompressor build_decompressor(Relation in_rel, Relation out_rel);

extern void row_decompressor_close(RowDecompressor *decompressor);
extern enum CompressionAlgorithms compress_get_default_algorithm(Oid typeoid);
/*
* A convenience macro to throw an error about the corrupted compressed data, if
Expand Down
1 change: 1 addition & 0 deletions tsl/src/nodes/decompress_chunk/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/batch_queue_fifo.c
${CMAKE_CURRENT_SOURCE_DIR}/compressed_batch.c
${CMAKE_CURRENT_SOURCE_DIR}/decompress_chunk.c
${CMAKE_CURRENT_SOURCE_DIR}/detoaster.c
${CMAKE_CURRENT_SOURCE_DIR}/exec.c
${CMAKE_CURRENT_SOURCE_DIR}/planner.c
${CMAKE_CURRENT_SOURCE_DIR}/pred_vector_array.c
Expand Down
6 changes: 5 additions & 1 deletion tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,12 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
return;
}

/* Detoast the compressed datum. */
value = PointerGetDatum(
detoaster_detoast_attr((struct varlena *) DatumGetPointer(value), &dcontext->detoaster));

/* Decompress the entire batch if it is supported. */
CompressedDataHeader *header = (CompressedDataHeader *) PG_DETOAST_DATUM(value);
CompressedDataHeader *header = (CompressedDataHeader *) value;
ArrowArray *arrow = NULL;
if (dcontext->enable_bulk_decompression && column_description->bulk_decompression_supported)
{
Expand Down
3 changes: 3 additions & 0 deletions tsl/src/nodes/decompress_chunk/decompress_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <nodes/pg_list.h>

#include "batch_array.h"
#include "detoaster.h"

typedef enum CompressionColumnType
{
Expand Down Expand Up @@ -74,6 +75,8 @@ typedef struct DecompressContext
TupleDesc compressed_slot_tdesc;

PlanState *ps; /* Set for filtering and instrumentation */

Detoaster detoaster;
} DecompressContext;

#endif /* TIMESCALEDB_DECOMPRESS_CONTEXT_H */
Loading

0 comments on commit a64ae61

Please sign in to comment.