From 4ddf002a3b554bc209af9044c058a0b56f67b4e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Thu, 30 Jan 2025 16:53:45 +0100 Subject: [PATCH] Prepare VectorAgg exec path to handle arrow slots The VectorAgg exec loop reads tuples directly from a compressed relation, thus bypassing the DecompressChunk child node. This won't work with arrow slots, which are read via a table access method. To make the VectorAgg exec code similar to the standard pattern of reading slots from child nodes, code specific to decompressing batches is moved out of the main VectorAgg exec loop so that the loop only deals with the final compressed batch slot instead of the raw compressed slot. The code is instead put in a "get_next_slot" function, which is called from the loop. Also move the code to initialize vectorized filters to its own "init_vector_qual" function, since it is specific to compressed batches. With these two function interfaces, it is possible to provide implementations of the functions for handling arrow slots. --- tsl/src/nodes/vector_agg/exec.c | 224 +++++++++++++++++++++----------- tsl/src/nodes/vector_agg/exec.h | 20 ++- 2 files changed, 165 insertions(+), 79 deletions(-) diff --git a/tsl/src/nodes/vector_agg/exec.c b/tsl/src/nodes/vector_agg/exec.c index b9184797998..3b1cbb6eff8 100644 --- a/tsl/src/nodes/vector_agg/exec.c +++ b/tsl/src/nodes/vector_agg/exec.c @@ -16,7 +16,6 @@ #include "nodes/vector_agg/exec.h" #include "compression/arrow_c_data_interface.h" -#include "guc.h" #include "nodes/decompress_chunk/compressed_batch.h" #include "nodes/decompress_chunk/exec.h" #include "nodes/decompress_chunk/vector_quals.h" @@ -24,24 +23,25 @@ #include "nodes/vector_agg/plan.h" static int -get_input_offset(DecompressChunkState *decompress_state, Var *var) +get_input_offset(const CustomScanState *state, const Var *var) { - DecompressContext *dcontext = &decompress_state->decompress_context; + const DecompressChunkState *decompress_state = (DecompressChunkState *) state; + const DecompressContext *dcontext = &decompress_state->decompress_context; /* * All variable references in the vectorized aggregation node were * translated to uncompressed chunk variables when it was created. */ - CustomScan *cscan = castNode(CustomScan, decompress_state->csstate.ss.ps.plan); + const CustomScan *cscan = castNode(CustomScan, decompress_state->csstate.ss.ps.plan); Ensure((Index) var->varno == (Index) cscan->scan.scanrelid, "got vector varno %d expected %d", var->varno, cscan->scan.scanrelid); - CompressionColumnDescription *value_column_description = NULL; + const CompressionColumnDescription *value_column_description = NULL; for (int i = 0; i < dcontext->num_data_columns; i++) { - CompressionColumnDescription *current_column = &dcontext->compressed_chunk_columns[i]; + const CompressionColumnDescription *current_column = &dcontext->compressed_chunk_columns[i]; if (current_column->uncompressed_chunk_attno == var->varattno) { value_column_description = current_column; @@ -57,6 +57,15 @@ get_input_offset(DecompressChunkState *decompress_state, Var *var) return index; } +static int +get_value_bytes(const CustomScanState *state, int input_offset) +{ + const DecompressChunkState *decompress_state = (DecompressChunkState *) state; + const DecompressContext *dcontext = &decompress_state->decompress_context; + const CompressionColumnDescription *desc = &dcontext->compressed_chunk_columns[input_offset]; + return desc->value_bytes; +} + static void vector_agg_begin(CustomScanState *node, EState *estate, int eflags) { @@ -66,9 +75,7 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags) VectorAggState *vector_agg_state = (VectorAggState *) node; vector_agg_state->input_ended = false; - - DecompressChunkState *decompress_state = - (DecompressChunkState *) linitial(vector_agg_state->custom.custom_ps); + CustomScanState *childstate = (CustomScanState *) linitial(vector_agg_state->custom.custom_ps); /* * Set up the helper structures used to evaluate stable expressions in @@ -157,7 +164,7 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags) Assert(aggref->aggsplit == AGGSPLIT_INITIAL_SERIAL); Var *var = castNode(Var, castNode(TargetEntry, linitial(aggref->args))->expr); - def->input_offset = get_input_offset(decompress_state, var); + def->input_offset = get_input_offset(childstate, var); } else { @@ -179,11 +186,8 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags) col->output_offset = i; Var *var = castNode(Var, tlentry->expr); - col->input_offset = get_input_offset(decompress_state, var); - DecompressContext *dcontext = &decompress_state->decompress_context; - CompressionColumnDescription *desc = - &dcontext->compressed_chunk_columns[col->input_offset]; - col->value_bytes = desc->value_bytes; + col->input_offset = get_input_offset(childstate, var); + col->value_bytes = get_value_bytes(childstate, col->input_offset); } } @@ -237,6 +241,104 @@ vector_agg_rescan(CustomScanState *node) state->grouping->gp_reset(state->grouping); } +/* + * Get the next slot to aggregate for a compressed batch. + * + * Implements "get next slot" on top of DecompressChunk. Note that compressed + * tuples are read directly from the DecompressChunk child node, which means + * that the processing normally done in DecompressChunk is actually done here + * (batch processing and filtering). + * + * Returns an TupleTableSlot that implements a compressed batch. + */ +static TupleTableSlot * +compressed_batch_get_next_slot(VectorAggState *vector_agg_state) +{ + DecompressChunkState *decompress_state = + (DecompressChunkState *) linitial(vector_agg_state->custom.custom_ps); + DecompressContext *dcontext = &decompress_state->decompress_context; + BatchQueue *batch_queue = decompress_state->batch_queue; + DecompressBatchState *batch_state = batch_array_get_at(&batch_queue->batch_array, 0); + + do + { + /* + * We discard the previous compressed batch here and not earlier, + * because the grouping column values returned by the batch grouping + * policy are owned by the compressed batch memory context. This is done + * to avoid generic value copying in the grouping policy to simplify its + * code. + */ + compressed_batch_discard_tuples(batch_state); + + TupleTableSlot *compressed_slot = + ExecProcNode(linitial(decompress_state->csstate.custom_ps)); + + if (TupIsNull(compressed_slot)) + { + vector_agg_state->input_ended = true; + return NULL; + } + + compressed_batch_set_compressed_tuple(dcontext, batch_state, compressed_slot); + + /* If the entire batch is filtered out, then immediately read the next + * one */ + } while (batch_state->next_batch_row >= batch_state->total_batch_rows); + + /* + * Count rows filtered out by vectorized filters for EXPLAIN. Normally + * this is done in tuple-by-tuple interface of DecompressChunk, so that + * it doesn't say it filtered out more rows that were returned (e.g. + * with LIMIT). Here we always work in full batches. The batches that + * were fully filtered out, and their rows, were already counted in + * compressed_batch_set_compressed_tuple(). + */ + const int not_filtered_rows = + arrow_num_valid(batch_state->vector_qual_result, batch_state->total_batch_rows); + InstrCountFiltered1(dcontext->ps, batch_state->total_batch_rows - not_filtered_rows); + if (dcontext->ps->instrument) + { + /* + * These values are normally updated by InstrStopNode(), and are + * required so that the calculations in InstrEndLoop() run properly. + */ + dcontext->ps->instrument->running = true; + dcontext->ps->instrument->tuplecount += not_filtered_rows; + } + + return &batch_state->decompressed_scan_slot_data.base; +} + +/* + * Initialize vector quals for a compressed batch. + * + * Used to implement vectorized aggregate function filter clause. + */ +static VectorQualState * +compressed_batch_init_vector_quals(VectorAggState *agg_state, VectorAggDef *agg_def, + TupleTableSlot *slot) +{ + DecompressChunkState *decompress_state = + (DecompressChunkState *) linitial(agg_state->custom.custom_ps); + DecompressContext *dcontext = &decompress_state->decompress_context; + DecompressBatchState *batch_state = (DecompressBatchState *) slot; + + agg_state->vqual_state = (CompressedBatchVectorQualState) { + .vqstate = { + .vectorized_quals_constified = agg_def->filter_clauses, + .num_results = batch_state->total_batch_rows, + .per_vector_mcxt = batch_state->per_batch_context, + .slot = decompress_state->csstate.ss.ss_ScanTupleSlot, + .get_arrow_array = compressed_batch_get_arrow_array, + }, + .batch_state = batch_state, + .dcontext = dcontext, + }; + + return &agg_state->vqual_state.vqstate; +} + static TupleTableSlot * vector_agg_exec(CustomScanState *node) { @@ -275,14 +377,6 @@ vector_agg_exec(CustomScanState *node) */ grouping->gp_reset(grouping); - DecompressChunkState *decompress_state = - (DecompressChunkState *) linitial(vector_agg_state->custom.custom_ps); - - DecompressContext *dcontext = &decompress_state->decompress_context; - - BatchQueue *batch_queue = decompress_state->batch_queue; - DecompressBatchState *batch_state = batch_array_get_at(&batch_queue->batch_array, 0); - /* * Now we loop through the input compressed tuples, until they end or until * the grouping policy asks us to emit partials. @@ -290,52 +384,21 @@ vector_agg_exec(CustomScanState *node) while (!grouping->gp_should_emit(grouping)) { /* - * We discard the previous compressed batch here and not earlier, - * because the grouping column values returned by the batch grouping - * policy are owned by the compressed batch memory context. This is done - * to avoid generic value copying in the grouping policy to simplify its - * code. + * Get the next slot to aggregate. It will be either a compressed + * batch or an arrow tuple table slot. Both hold arrow arrays of data + * that can be vectorized. */ - compressed_batch_discard_tuples(batch_state); - - TupleTableSlot *compressed_slot = - ExecProcNode(linitial(decompress_state->csstate.custom_ps)); - - if (TupIsNull(compressed_slot)) - { - /* The input has ended. */ - vector_agg_state->input_ended = true; - break; - } - - compressed_batch_set_compressed_tuple(dcontext, batch_state, compressed_slot); - - if (batch_state->next_batch_row >= batch_state->total_batch_rows) - { - /* This batch was fully filtered out. */ - continue; - } + TupleTableSlot *slot = vector_agg_state->get_next_slot(vector_agg_state); /* - * Count rows filtered out by vectorized filters for EXPLAIN. Normally - * this is done in tuple-by-tuple interface of DecompressChunk, so that - * it doesn't say it filtered out more rows that were returned (e.g. - * with LIMIT). Here we always work in full batches. The batches that - * were fully filtered out, and their rows, were already counted in - * compressed_batch_set_compressed_tuple(). + * Exit if there is no more data. Note that it is not possible to do + * the standard TupIsNull() check here because the compressed batch's + * implementation of TupleTableSlot never clears the empty flag bit + * (TTS_EMPTY), so it will always look empty. Therefore, look at the + * "input_ended" flag instead. */ - const int not_filtered_rows = - arrow_num_valid(batch_state->vector_qual_result, batch_state->total_batch_rows); - InstrCountFiltered1(dcontext->ps, batch_state->total_batch_rows - not_filtered_rows); - if (dcontext->ps->instrument) - { - /* - * These values are normally updated by InstrStopNode(), and are - * required so that the calculations in InstrEndLoop() run properly. - */ - dcontext->ps->instrument->running = true; - dcontext->ps->instrument->tuplecount += not_filtered_rows; - } + if (vector_agg_state->input_ended) + break; /* * Compute the vectorized filters for the aggregate function FILTER @@ -349,18 +412,9 @@ vector_agg_exec(CustomScanState *node) { continue; } - CompressedBatchVectorQualState cbvqstate = { - .vqstate = { - .vectorized_quals_constified = agg_def->filter_clauses, - .num_results = batch_state->total_batch_rows, - .per_vector_mcxt = batch_state->per_batch_context, - .slot = compressed_slot, - .get_arrow_array = compressed_batch_get_arrow_array, - }, - .batch_state = batch_state, - .dcontext = dcontext, - }; - VectorQualState *vqstate = &cbvqstate.vqstate; + + VectorQualState *vqstate = + vector_agg_state->init_vector_quals(vector_agg_state, agg_def, slot); vector_qual_compute(vqstate); agg_def->filter_result = vqstate->vector_qual_result; } @@ -368,7 +422,7 @@ vector_agg_exec(CustomScanState *node) /* * Finally, pass the compressed batch to the grouping policy. */ - grouping->gp_add_batch(grouping, &batch_state->decompressed_scan_slot_data.base); + grouping->gp_add_batch(grouping, slot); } /* @@ -425,6 +479,20 @@ Node * vector_agg_state_create(CustomScan *cscan) { VectorAggState *state = (VectorAggState *) newNode(sizeof(VectorAggState), T_CustomScanState); + state->custom.methods = &exec_methods; + + /* + * Initialize VectorAggState to process vector slots from different + * subnodes. Currently, only compressed batches are supported, but arrow + * slots will be supported as well. + * + * The vector qual init functions are needed to implement vectorized + * aggregate function FILTER clauses for arrow tuple table slots and + * compressed batches, respectively. + */ + state->get_next_slot = compressed_batch_get_next_slot; + state->init_vector_quals = compressed_batch_init_vector_quals; + return (Node *) state; } diff --git a/tsl/src/nodes/vector_agg/exec.h b/tsl/src/nodes/vector_agg/exec.h index 8c2e5d53d00..88b7ff9574c 100644 --- a/tsl/src/nodes/vector_agg/exec.h +++ b/tsl/src/nodes/vector_agg/exec.h @@ -8,6 +8,7 @@ #include +#include "nodes/decompress_chunk/compressed_batch.h" #include #include "function/functions.h" @@ -29,7 +30,7 @@ typedef struct GroupingColumn int value_bytes; } GroupingColumn; -typedef struct +typedef struct VectorAggState { CustomScanState custom; @@ -47,6 +48,23 @@ typedef struct bool input_ended; GroupingPolicy *grouping; + + /* + * State to compute vector quals for FILTER clauses. + */ + CompressedBatchVectorQualState vqual_state; + + /* + * Initialization function for vectorized quals depending on slot type. + */ + VectorQualState *(*init_vector_quals)(struct VectorAggState *agg_state, VectorAggDef *agg_def, + TupleTableSlot *slot); + + /* + * Function for getting the next slot from the child node depending on + * child node type. + */ + TupleTableSlot *(*get_next_slot)(struct VectorAggState *vector_agg_state); } VectorAggState; extern Node *vector_agg_state_create(CustomScan *cscan);