Skip to content

Commit

Permalink
Optimize segmentwise recompression
Browse files Browse the repository at this point in the history
Instead of recompressing all the segments, try to find segments
which have uncompressed tuples and only recompress those segments.
  • Loading branch information
antekresic committed Nov 28, 2023
1 parent 7443c47 commit 645727b
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 136 deletions.
204 changes: 94 additions & 110 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -947,14 +947,18 @@ tsl_recompress_chunk_wrapper(Chunk *uncompressed_chunk)
return true;
}

/* This is a wrapper around row_compressor_append_sorted_rows. */
/* Sort the tuples and recompress them */
static void
recompress_segment(Tuplesortstate *tuplesortstate, Relation compressed_chunk_rel,
RowCompressor *row_compressor)
{
tuplesort_performsort(tuplesortstate);
row_compressor_reset(row_compressor);
row_compressor_append_sorted_rows(row_compressor,
tuplesortstate,
RelationGetDescr(compressed_chunk_rel));
tuplesort_end(tuplesortstate);
CommandCounterIncrement();
}

static bool
Expand All @@ -975,25 +979,19 @@ decompress_segment_update_current_segment(CompressedSegmentInfo **current_segmen
for (int i = 0; i < nsegmentby_cols; i++)
{
int16 col_offset = segby_col_offsets_compressed[i];
if (!compressed_chunk_column_is_segmentby(per_col[col_offset]))
continue;
else
if (compressed_chunk_column_is_segmentby(per_col[col_offset]))
{
val = slot_getattr(slot, AttrOffsetGetAttrNumber(col_offset), &is_null);
if (!segment_info_datum_is_in_group(current_segment[seg_idx++]->segment_info,
val,
is_null))
{
/* new segment, need to do per-segment processing */
pfree(
current_segment[seg_idx - 1]->segment_info); /* because increased previously */
SegmentInfo *segment_info =
segment_info_new(TupleDescAttr(slot->tts_tupleDescriptor, col_offset));
segment_info_update(segment_info, val, is_null);
current_segment[seg_idx - 1]->segment_info = segment_info;
current_segment[seg_idx - 1]->decompressed_chunk_offset =
per_col[col_offset].decompressed_column_offset;
}
/* new segment, need to do per-segment processing */
if (current_segment[seg_idx]->segment_info)
pfree(current_segment[seg_idx]->segment_info);
SegmentInfo *segment_info =
segment_info_new(TupleDescAttr(slot->tts_tupleDescriptor, col_offset));
segment_info_update(segment_info, val, is_null);
current_segment[seg_idx]->segment_info = segment_info;
current_segment[seg_idx]->decompressed_chunk_offset =
per_col[col_offset].decompressed_column_offset;
seg_idx++;
}
}
}
Expand All @@ -1010,9 +1008,7 @@ decompress_segment_changed_group(CompressedSegmentInfo **current_segment, TupleT
for (int i = 0; i < nsegmentby_cols; i++)
{
int16 col_offset = segby_col_offsets_compressed[i];
if (!compressed_chunk_column_is_segmentby(per_col[col_offset]))
continue;
else
if (compressed_chunk_column_is_segmentby(per_col[col_offset]))
{
val = slot_getattr(slot, AttrOffsetGetAttrNumber(col_offset), &is_null);
if (!segment_info_datum_is_in_group(current_segment[seg_idx++]->segment_info,
Expand Down Expand Up @@ -1157,7 +1153,7 @@ fetch_unmatched_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tuples
table_endscan(heapScan);
}

static void
static bool
fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tuplesortstate,
int nsegmentby_cols,
Relation uncompressed_chunk_rel,
Expand All @@ -1169,6 +1165,7 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso
int index = 0;
int nsegbycols_nonnull = 0;
Bitmapset *null_segbycols = NULL;
bool matching_exist = false;

for (int seg_col = 0; seg_col < nsegmentby_cols; seg_col++)
{
Expand Down Expand Up @@ -1227,6 +1224,7 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso
}
if (valid)
{
matching_exist = true;
ExecStoreHeapTuple(uncompressed_tuple, heap_tuple_slot, false);
slot_getallattrs(heap_tuple_slot);
tuplesort_puttupleslot(segment_tuplesortstate, heap_tuple_slot);
Expand All @@ -1243,6 +1241,7 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso

if (scankey != NULL)
pfree(scankey);
return matching_exist;
}

/*
Expand Down Expand Up @@ -1353,6 +1352,8 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)

/****** compression statistics ******/
RelationSize after_size;
int64 skipped_uncompressed_rows = 0;
int64 skipped_compressed_rows = 0;

Tuplesortstate *segment_tuplesortstate;

Expand Down Expand Up @@ -1417,7 +1418,6 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
HeapTuple compressed_tuple;

IndexScanDesc index_scan;
SegmentInfo *segment_info = NULL;
bool changed_segment = false;
/************ current segment **************/
CompressedSegmentInfo **current_segment =
Expand All @@ -1426,8 +1426,10 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
for (int i = 0; i < nsegmentby_cols; i++)
{
current_segment[i] = palloc(sizeof(CompressedSegmentInfo));
current_segment[i]->segment_info = NULL;
}
bool current_segment_init = false;
bool skip_current_segment = false;

/************** snapshot ****************************/
Snapshot snapshot = RegisterSnapshot(GetTransactionSnapshot());
Expand All @@ -1439,108 +1441,97 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
TupleTableSlot *slot = table_slot_create(compressed_chunk_rel, NULL);
index_rescan(index_scan, NULL, 0, NULL, 0);

Datum val;
bool is_null;

while (index_getnext_slot(index_scan, ForwardScanDirection, slot))
{
i = 0;
int col = 0;
slot_getallattrs(slot);

if (!current_segment_init)
{
current_segment_init = true;
Datum val;
bool is_null;
/* initialize current segment */
for (col = 0; col < slot->tts_tupleDescriptor->natts; col++)
{
val = slot_getattr(slot, AttrOffsetGetAttrNumber(col), &is_null);
if (compressed_chunk_column_is_segmentby(decompressor.per_compressed_cols[col]))
{
segment_info = segment_info_new(TupleDescAttr(slot->tts_tupleDescriptor, col));
current_segment[i]->decompressed_chunk_offset =
decompressor.per_compressed_cols[col].decompressed_column_offset;
/* also need to call segment_info_update here to update the val part */
segment_info_update(segment_info, val, is_null);
current_segment[i]->segment_info = segment_info;
i++;
}
}
decompress_segment_update_current_segment(current_segment,
slot, /*slot from compressed chunk*/
decompressor.per_compressed_cols,
segmentby_column_offsets_compressed,
nsegmentby_cols);

skip_current_segment =
!fetch_matching_uncompressed_chunk_into_tuplesort(segment_tuplesortstate,
nsegmentby_cols,
uncompressed_chunk_rel,
current_segment);
}
/* we have a segment already, so compare those */
changed_segment = decompress_segment_changed_group(current_segment,
slot,
decompressor.per_compressed_cols,
segmentby_column_offsets_compressed,
nsegmentby_cols);
if (!changed_segment)
if (changed_segment)
{
i = 0;
bool should_free;

compressed_tuple = ExecFetchSlotHeapTuple(slot, false, &should_free);

heap_deform_tuple(compressed_tuple,
compressed_rel_tupdesc,
decompressor.compressed_datums,
decompressor.compressed_is_nulls);

row_decompressor_decompress_row_to_tuplesort(&decompressor, segment_tuplesortstate);

simple_table_tuple_delete(compressed_chunk_rel, &(slot->tts_tid), snapshot);

if (should_free)
heap_freetuple(compressed_tuple);
}
else if (changed_segment)
{
fetch_matching_uncompressed_chunk_into_tuplesort(segment_tuplesortstate,
nsegmentby_cols,
uncompressed_chunk_rel,
current_segment);

tuplesort_performsort(segment_tuplesortstate);

row_compressor_reset(&row_compressor);
recompress_segment(segment_tuplesortstate, uncompressed_chunk_rel, &row_compressor);

/* now any pointers returned will be garbage */
tuplesort_end(segment_tuplesortstate);
if (!skip_current_segment)
{
recompress_segment(segment_tuplesortstate, uncompressed_chunk_rel, &row_compressor);

/* reinit tuplesort */
segment_tuplesortstate = tuplesort_begin_heap(uncompressed_rel_tupdesc,
n_keys,
sort_keys,
sort_operators,
sort_collations,
nulls_first,
maintenance_work_mem,
NULL,
false);
}

decompress_segment_update_current_segment(current_segment,
slot, /*slot from compressed chunk*/
decompressor.per_compressed_cols,
segmentby_column_offsets_compressed,
nsegmentby_cols);
/* reinit tuplesort and add the first tuple of the new segment to it */
segment_tuplesortstate = tuplesort_begin_heap(uncompressed_rel_tupdesc,
n_keys,
sort_keys,
sort_operators,
sort_collations,
nulls_first,
maintenance_work_mem,
NULL,
false);

bool should_free;
compressed_tuple = ExecFetchSlotHeapTuple(slot, false, &should_free);

heap_deform_tuple(compressed_tuple,
compressed_rel_tupdesc,
decompressor.compressed_datums,
decompressor.compressed_is_nulls);

row_decompressor_decompress_row_to_tuplesort(&decompressor, segment_tuplesortstate);

simple_table_tuple_delete(compressed_chunk_rel, &(slot->tts_tid), snapshot);
/* because this is the first tuple of the new segment */

changed_segment = false;
/* make changes visible */
CommandCounterIncrement();
skip_current_segment =
!fetch_matching_uncompressed_chunk_into_tuplesort(segment_tuplesortstate,
nsegmentby_cols,
uncompressed_chunk_rel,
current_segment);
}

if (should_free)
heap_freetuple(compressed_tuple);
if (skip_current_segment)
{
val = slot_getattr(slot,
AttrOffsetGetAttrNumber(row_compressor.count_metadata_column_offset),
&is_null);
Assert(!is_null);
skipped_uncompressed_rows += DatumGetInt32(val);
skipped_compressed_rows++;
continue;
}

/* Didn't change group and we are not skipping the current segment
* add it to the tuplesort
*/
bool should_free;

compressed_tuple = ExecFetchSlotHeapTuple(slot, false, &should_free);

heap_deform_tuple(compressed_tuple,
compressed_rel_tupdesc,
decompressor.compressed_datums,
decompressor.compressed_is_nulls);

row_decompressor_decompress_row_to_tuplesort(&decompressor, segment_tuplesortstate);

simple_table_tuple_delete(compressed_chunk_rel, &(slot->tts_tid), snapshot);
CommandCounterIncrement();

if (should_free)
heap_freetuple(compressed_tuple);
}

ExecClearTuple(slot);
Expand All @@ -1550,18 +1541,9 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
* the current segment could not be initialized in the case where two recompress operations
* execute concurrently: one blocks on the Exclusive lock but has already read the chunk
* status and determined that there is data in the uncompressed chunk */
if (!changed_segment && current_segment_init)
if (!changed_segment && !skip_current_segment && current_segment_init)
{
fetch_matching_uncompressed_chunk_into_tuplesort(segment_tuplesortstate,
nsegmentby_cols,
uncompressed_chunk_rel,
current_segment);
tuplesort_performsort(segment_tuplesortstate);
row_compressor_reset(&row_compressor);
recompress_segment(segment_tuplesortstate, uncompressed_chunk_rel, &row_compressor);
tuplesort_end(segment_tuplesortstate);

CommandCounterIncrement();
}
/* done with the compressed chunk segments that had new entries in the uncompressed
but there could be rows inserted into the uncompressed that don't already have a corresponding
Expand Down Expand Up @@ -1598,6 +1580,8 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
/* the compression size statistics we are able to update and accurately report are:
* rowcount pre/post compression,
* compressed chunk sizes */
row_compressor.rowcnt_pre_compression += skipped_uncompressed_rows;
row_compressor.num_compressed_rows += skipped_compressed_rows;
compression_chunk_size_catalog_update_recompressed(uncompressed_chunk->fd.id,
compressed_chunk->fd.id,
&after_size,
Expand Down
Loading

0 comments on commit 645727b

Please sign in to comment.