From b2d6612e81d95190621e12fb18f0878a374a686b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Thu, 23 Jan 2025 09:05:18 +0100 Subject: [PATCH] Refactor vector qual handling for arrow slots Move the code for vector qual execution to its own module. The vector qual execution will produce a result in the form of a bitmap filter for the arrow array. Add functions to the arrow slot to carry the result bitmap in the arrow tuple table slot. This allows passing the filter result to nodes above the node that computed the vector qual result. This is necessary to, e.g., run vectorized aggregation above a columnar scan. --- tsl/src/hypercore/CMakeLists.txt | 3 +- tsl/src/hypercore/arrow_tts.c | 9 ++ tsl/src/hypercore/arrow_tts.h | 38 ++++- tsl/src/hypercore/vector_quals.c | 138 ++++++++++++++++++ tsl/src/hypercore/vector_quals.h | 17 +++ tsl/src/nodes/columnar_scan/columnar_scan.c | 150 +++++--------------- tsl/src/nodes/columnar_scan/columnar_scan.h | 1 + 7 files changed, 236 insertions(+), 120 deletions(-) create mode 100644 tsl/src/hypercore/vector_quals.c create mode 100644 tsl/src/hypercore/vector_quals.h diff --git a/tsl/src/hypercore/CMakeLists.txt b/tsl/src/hypercore/CMakeLists.txt index 8719d603956..06d0efee6f3 100644 --- a/tsl/src/hypercore/CMakeLists.txt +++ b/tsl/src/hypercore/CMakeLists.txt @@ -7,7 +7,8 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/hypercore_handler.c ${CMAKE_CURRENT_SOURCE_DIR}/hypercore_proxy.c ${CMAKE_CURRENT_SOURCE_DIR}/relstats.c - ${CMAKE_CURRENT_SOURCE_DIR}/utils.c) + ${CMAKE_CURRENT_SOURCE_DIR}/utils.c + ${CMAKE_CURRENT_SOURCE_DIR}/vector_quals.c) if(PG_VERSION VERSION_GREATER_EQUAL "17.0") list(APPEND SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/import/analyze.c) endif() diff --git a/tsl/src/hypercore/arrow_tts.c b/tsl/src/hypercore/arrow_tts.c index b6e91e874e9..c666434cdfc 100644 --- a/tsl/src/hypercore/arrow_tts.c +++ b/tsl/src/hypercore/arrow_tts.c @@ -93,6 +93,7 @@ tts_arrow_init(TupleTableSlot *slot) aslot->tuple_index = InvalidTupleIndex; aslot->total_row_count = 0; aslot->referenced_attrs = NULL; + aslot->arrow_qual_result = NULL; /* * Set up child slots, one for the non-compressed relation and one for the @@ -120,6 +121,11 @@ tts_arrow_init(TupleTableSlot *slot) Assert(TTS_EMPTY(slot)); Assert(TTS_EMPTY(aslot->noncompressed_slot)); + + /* Memory context reset every new segment. Used to store, e.g., vectorized + * filters */ + aslot->per_segment_mcxt = + GenerationContextCreateCompat(slot->tts_mcxt, "Per-segment memory context", 64 * 1024); } /* @@ -262,6 +268,8 @@ tts_arrow_clear(TupleTableSlot *slot) /* Clear arrow slot fields */ memset(aslot->valid_attrs, 0, sizeof(bool) * slot->tts_tupleDescriptor->natts); aslot->arrow_cache_entry = NULL; + aslot->arrow_qual_result = NULL; + MemoryContextReset(aslot->per_segment_mcxt); } static inline void @@ -333,6 +341,7 @@ tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *child_slot, uint16 t aslot->arrow_cache_entry = NULL; /* Clear valid attributes */ memset(aslot->valid_attrs, 0, sizeof(bool) * slot->tts_tupleDescriptor->natts); + MemoryContextReset(aslot->per_segment_mcxt); } /* diff --git a/tsl/src/hypercore/arrow_tts.h b/tsl/src/hypercore/arrow_tts.h index 48c9f9c8303..2af5306d96f 100644 --- a/tsl/src/hypercore/arrow_tts.h +++ b/tsl/src/hypercore/arrow_tts.h @@ -79,6 +79,15 @@ typedef struct ArrowTupleTableSlot int16 *attrs_offset_map; /* Offset number mappings between the * non-compressed and compressed * relation */ + + /* Per-segment data. The following data is allocated on the per-segment + * memory context which is reset for every new segment stored and + * processed in the slot. */ + MemoryContext per_segment_mcxt; + + const uint64 *arrow_qual_result; /* Bitmap with result of qual + * filtering over arrow_array. NULL if + * no filtering has been applied. */ } ArrowTupleTableSlot; extern const TupleTableSlotOps TTSOpsArrowTuple; @@ -197,9 +206,9 @@ arrow_slot_get_noncompressed_slot(TupleTableSlot *slot) } static inline uint16 -arrow_slot_total_row_count(TupleTableSlot *slot) +arrow_slot_total_row_count(const TupleTableSlot *slot) { - ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + const ArrowTupleTableSlot *aslot = (const ArrowTupleTableSlot *) slot; Assert(TTS_IS_ARROWTUPLE(slot)); Assert(aslot->total_row_count > 0); @@ -271,6 +280,23 @@ arrow_slot_is_last(const TupleTableSlot *slot) return aslot->tuple_index == InvalidTupleIndex || aslot->tuple_index == aslot->total_row_count; } +static inline void +arrow_slot_set_qual_result(TupleTableSlot *slot, const uint64 *qual_result) +{ + ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + + Assert(TTS_IS_ARROWTUPLE(slot)); + aslot->arrow_qual_result = qual_result; +} + +static inline const uint64 * +arrow_slot_get_qual_result(const TupleTableSlot *slot) +{ + const ArrowTupleTableSlot *aslot = (const ArrowTupleTableSlot *) slot; + + return aslot->arrow_qual_result; +} + /* * Increment or decrement an arrow slot to point to a subsequent row. * @@ -368,6 +394,14 @@ arrow_slot_try_getnext(TupleTableSlot *slot, ScanDirection direction) return false; } +static inline MemoryContext +arrow_slot_per_segment_memory_context(const TupleTableSlot *slot) +{ + const ArrowTupleTableSlot *aslot = (const ArrowTupleTableSlot *) slot; + Assert(TTS_IS_ARROWTUPLE(slot)); + return aslot->per_segment_mcxt; +} + extern bool is_compressed_col(const TupleDesc tupdesc, AttrNumber attno); extern const ArrowArray *arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno); extern void arrow_slot_set_referenced_attrs(TupleTableSlot *slot, Bitmapset *attrs); diff --git a/tsl/src/hypercore/vector_quals.c b/tsl/src/hypercore/vector_quals.c new file mode 100644 index 00000000000..3635a5247c3 --- /dev/null +++ b/tsl/src/hypercore/vector_quals.c @@ -0,0 +1,138 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include "nodes/decompress_chunk/vector_quals.h" +#include + +#include "arrow_tts.h" +#include "vector_quals.h" + +/* + * Support functions to execute vectorized quals over arrow tuple table slots. + */ + +/* + * Initialize the vector qual state. + */ +void +vector_qual_state_init(VectorQualState *vqstate, List *quals, TupleTableSlot *slot) +{ + MemSet(vqstate, 0, sizeof(VectorQualState)); + vqstate->vectorized_quals_constified = quals; + vqstate->per_vector_mcxt = arrow_slot_per_segment_memory_context(slot); + vqstate->get_arrow_array = vector_qual_state_get_arrow_array; + vqstate->num_results = TupIsNull(slot) ? 0 : arrow_slot_total_row_count(slot); + vqstate->slot = slot; +} + +/* + * Reset the vector qual state. + * + * The function should be called when all values in the arrow array have been + * processed. + */ +void +vector_qual_state_reset(VectorQualState *vqstate) +{ + MemoryContextReset(vqstate->per_vector_mcxt); + vqstate->vector_qual_result = NULL; + vqstate->num_results = arrow_slot_total_row_count(vqstate->slot); + arrow_slot_set_qual_result(vqstate->slot, NULL); +} + +/* + * Implementation of VectorQualState->get_arrow_array() for arrow tuple table + * slots. + * + * Given a VectorQualState return the ArrowArray in the contained slot. + */ +const ArrowArray * +vector_qual_state_get_arrow_array(VectorQualState *vqstate, Expr *expr, bool *is_default_value) +{ + TupleTableSlot *slot = vqstate->slot; + const Var *var = castNode(Var, expr); + const int attoff = AttrNumberGetAttrOffset(var->varattno); + const ArrowArray *array = arrow_slot_get_array(slot, var->varattno); + + if (array == NULL) + { + Form_pg_attribute attr = &slot->tts_tupleDescriptor->attrs[attoff]; + /* + * If getting here, this is a non-compressed value or a compressed + * column with a default value. We can treat non-compressed values the + * same as default ones. It is not possible to fall back to the + * non-vectorized quals now, so build a single-value ArrowArray with + * this (default) value, check if it passes the predicate, and apply + * it to the entire batch. + */ + array = make_single_value_arrow(attr->atttypid, + slot->tts_values[attoff], + slot->tts_isnull[attoff]); + *is_default_value = true; + } + else + *is_default_value = false; + + return array; +} + +/* + * Execute vectorized filter over a vector/array of values. + * + * Returns the number of values filtered until the first valid value. + */ +uint16 +ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext) +{ + TupleTableSlot *slot = econtext->ecxt_scantuple; + const uint16 rowindex = arrow_slot_row_index(slot); + + /* Compute the vector quals over both compressed and non-compressed + * tuples. In case a non-compressed tuple is filtered, return SomeRowsPass + * although only one row will pass. */ + if (rowindex <= 1) + { + vector_qual_state_reset(vqstate); + VectorQualSummary vector_qual_summary = vqstate->vectorized_quals_constified != NIL ? + vector_qual_compute(vqstate) : + AllRowsPass; + + switch (vector_qual_summary) + { + case NoRowsPass: + return arrow_slot_total_row_count(slot); + case AllRowsPass: + /* + * If all rows pass, no need to test the vector qual for each row. This + * is a common case for time range conditions. + */ + vector_qual_state_reset(vqstate); + return 0; + case SomeRowsPass: + break; + } + } + + /* Fast path when all rows have passed (i.e., no rows filtered). No need + * to check qual result and it should be NULL. */ + if (vqstate->vector_qual_result == NULL) + return 0; + + const uint16 nrows = arrow_slot_total_row_count(slot); + const uint16 off = arrow_slot_arrow_offset(slot); + uint16 nfiltered = 0; + + for (uint16 i = off; i < nrows; i++) + { + if (arrow_row_is_valid(vqstate->vector_qual_result, i)) + break; + nfiltered++; + } + + arrow_slot_set_qual_result(slot, vqstate->vector_qual_result); + + return nfiltered; +} diff --git a/tsl/src/hypercore/vector_quals.h b/tsl/src/hypercore/vector_quals.h new file mode 100644 index 00000000000..5563f0676a1 --- /dev/null +++ b/tsl/src/hypercore/vector_quals.h @@ -0,0 +1,17 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#pragma once + +#include +#include + +#include "nodes/decompress_chunk/vector_quals.h" + +extern void vector_qual_state_init(VectorQualState *vqstate, List *quals, TupleTableSlot *slot); +extern void vector_qual_state_reset(VectorQualState *vqstate); +extern const ArrowArray *vector_qual_state_get_arrow_array(VectorQualState *vqstate, Expr *expr, + bool *is_default_value); +extern uint16 ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext); diff --git a/tsl/src/nodes/columnar_scan/columnar_scan.c b/tsl/src/nodes/columnar_scan/columnar_scan.c index 1231eeb85d3..db48f3ea492 100644 --- a/tsl/src/nodes/columnar_scan/columnar_scan.c +++ b/tsl/src/nodes/columnar_scan/columnar_scan.c @@ -27,12 +27,11 @@ #include #include "columnar_scan.h" -#include "compression/arrow_c_data_interface.h" #include "compression/compression.h" #include "hypercore/arrow_tts.h" #include "hypercore/hypercore_handler.h" +#include "hypercore/vector_quals.h" #include "import/ts_explain.h" -#include "nodes/decompress_chunk/vector_quals.h" typedef struct SimpleProjInfo { @@ -67,60 +66,6 @@ match_relvar(Expr *expr, Index relid) return false; } -/* - * ColumnarScan implementation of VectorQualState->get_arrow_array(). - * - * Given a VectorQualState return the ArrowArray in the contained slot. - */ -static const ArrowArray * -vector_qual_state_get_arrow_array(VectorQualState *vqstate, Expr *expr, bool *is_default_value) -{ - TupleTableSlot *slot = vqstate->slot; - const Var *var = castNode(Var, expr); - const int attoff = AttrNumberGetAttrOffset(var->varattno); - const ArrowArray *array = arrow_slot_get_array(slot, var->varattno); - - if (array == NULL) - { - Form_pg_attribute attr = &slot->tts_tupleDescriptor->attrs[attoff]; - /* - * If getting here, this is a non-compressed value or a compressed - * column with a default value. We can treat non-compressed values the - * same as default ones. It is not possible to fall back to the - * non-vectorized quals now, so build a single-value ArrowArray with - * this (default) value, check if it passes the predicate, and apply - * it to the entire batch. - */ - array = make_single_value_arrow(attr->atttypid, - slot->tts_values[attoff], - slot->tts_isnull[attoff]); - *is_default_value = true; - } - else - *is_default_value = false; - - return array; -} - -static void -vector_qual_state_reset(VectorQualState *vqstate, ExprContext *econtext) -{ - MemoryContextReset(vqstate->per_vector_mcxt); - vqstate->vector_qual_result = NULL; - vqstate->slot = econtext->ecxt_scantuple; - vqstate->num_results = arrow_slot_total_row_count(vqstate->slot); -} - -static void -vector_qual_state_init(VectorQualState *vqstate, ExprContext *econtext) -{ - vqstate->per_vector_mcxt = GenerationContextCreateCompat(econtext->ecxt_per_query_memory, - "Per-vector memory context", - 64 * 1024); - vqstate->get_arrow_array = vector_qual_state_get_arrow_array; - vqstate->slot = econtext->ecxt_scantuple; -} - /* * Utility function to extract quals that can be used as scankeys. The * remaining "normal" quals are optionally collected in the corresponding @@ -284,62 +229,6 @@ create_scankeys_from_quals(const HypercoreInfo *hsinfo, Index relid, const List return scankeys; } -/* - * Execute vectorized filter over a vector/array of values. - * - * Returns the number of values filtered until the first valid value. - */ -static inline uint16 -ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext) -{ - TupleTableSlot *slot = econtext->ecxt_scantuple; - const uint16 rowindex = arrow_slot_row_index(slot); - - /* Compute the vector quals over both compressed and non-compressed - * tuples. In case a non-compressed tuple is filtered, return SomeRowsPass - * although only one row will pass. */ - if (rowindex <= 1) - { - vector_qual_state_reset(vqstate, econtext); - VectorQualSummary vector_qual_summary = vqstate->vectorized_quals_constified != NIL ? - vector_qual_compute(vqstate) : - AllRowsPass; - - switch (vector_qual_summary) - { - case NoRowsPass: - return arrow_slot_total_row_count(slot); - case AllRowsPass: - /* - * If all rows pass, no need to test the vector qual for each row. This - * is a common case for time range conditions. - */ - vector_qual_state_reset(vqstate, econtext); - return 0; - case SomeRowsPass: - break; - } - } - - /* Fast path when all rows have passed (i.e., no rows filtered). No need - * to check qual result and it should be NULL. */ - if (vqstate->vector_qual_result == NULL) - return 0; - - const uint16 nrows = arrow_slot_total_row_count(slot); - const uint16 off = arrow_slot_arrow_offset(slot); - uint16 nfiltered = 0; - - for (uint16 i = off; i < nrows; i++) - { - if (arrow_row_is_valid(vqstate->vector_qual_result, i)) - break; - nfiltered++; - } - - return nfiltered; -} - static pg_attribute_always_inline TupleTableSlot * exec_projection(SimpleProjInfo *spi) { @@ -391,6 +280,17 @@ getnextslot(TableScanDesc scandesc, ScanDirection direction, TupleTableSlot *slo return table_scan_getnextslot(scandesc, direction, slot); } +static bool +should_project(const CustomScanState *state) +{ +#if PG15_GE + const CustomScan *scan = castNode(CustomScan, state->ss.ps.plan); + return scan->flags & CUSTOMPATH_SUPPORT_PROJECTION; +#else + return false; +#endif +} + static TupleTableSlot * columnar_scan_exec(CustomScanState *state) { @@ -399,16 +299,19 @@ columnar_scan_exec(CustomScanState *state) EState *estate; ExprContext *econtext; ExprState *qual; - ProjectionInfo *projinfo; ScanDirection direction; TupleTableSlot *slot; bool has_vecquals = cstate->vqstate.vectorized_quals_constified != NIL; + /* + * The VectorAgg node could have requested no projection by unsetting the + * "projection support flag", so only project if the flag is still set. + */ + ProjectionInfo *projinfo = should_project(state) ? state->ss.ps.ps_ProjInfo : NULL; scandesc = state->ss.ss_currentScanDesc; estate = state->ss.ps.state; econtext = state->ss.ps.ps_ExprContext; qual = state->ss.ps.qual; - projinfo = state->ss.ps.ps_ProjInfo; direction = estate->es_direction; slot = state->ss.ss_ScanTupleSlot; @@ -627,7 +530,7 @@ columnar_scan_begin(CustomScanState *state, EState *estate, int eflags) ExecAssignScanProjectionInfo(&state->ss); state->ss.ps.qual = ExecInitQual(state->ss.ps.plan->qual, (PlanState *) state); #endif - vector_qual_state_init(&cstate->vqstate, state->ss.ps.ps_ExprContext); + List *vectorized_quals_constified = NIL; if (cstate->nscankeys > 0) { @@ -647,10 +550,16 @@ columnar_scan_begin(CustomScanState *state, EState *estate, int eflags) foreach (lc, cstate->vectorized_quals_orig) { Node *constified = estimate_expression_value(&root, (Node *) lfirst(lc)); - cstate->vqstate.vectorized_quals_constified = - lappend(cstate->vqstate.vectorized_quals_constified, constified); + vectorized_quals_constified = lappend(vectorized_quals_constified, constified); } + /* + * Initialize the state to compute vectorized quals. + */ + vector_qual_state_init(&cstate->vqstate, + vectorized_quals_constified, + state->ss.ss_ScanTupleSlot); + /* If the node is supposed to project, then try to make it a simple * projection. If not possible, it will fall back to standard PostgreSQL * projection. */ @@ -811,6 +720,7 @@ columnar_scan_initialize_worker(CustomScanState *node, shm_toc *toc, void *arg) } static CustomExecMethods columnar_scan_state_methods = { + .CustomName = "ColumnarScan", .BeginCustomScan = columnar_scan_begin, .ExecCustomScan = columnar_scan_exec, .EndCustomScan = columnar_scan_end, @@ -845,6 +755,12 @@ static CustomScanMethods columnar_scan_plan_methods = { .CreateCustomScanState = columnar_scan_state_create, }; +bool +is_columnar_scan(const CustomScan *scan) +{ + return scan->methods == &columnar_scan_plan_methods; +} + typedef struct VectorQualInfoHypercore { VectorQualInfo vqinfo; diff --git a/tsl/src/nodes/columnar_scan/columnar_scan.h b/tsl/src/nodes/columnar_scan/columnar_scan.h index 9350e8fe2c5..1dc9c94f436 100644 --- a/tsl/src/nodes/columnar_scan/columnar_scan.h +++ b/tsl/src/nodes/columnar_scan/columnar_scan.h @@ -20,6 +20,7 @@ typedef struct ColumnarScanPath extern ColumnarScanPath *columnar_scan_path_create(PlannerInfo *root, RelOptInfo *rel, Relids required_outer, int parallel_workers); extern void columnar_scan_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht); +extern bool is_columnar_scan(const CustomScan *scan); extern void _columnar_scan_init(void); #endif /* TIMESCALEDB_COLUMNAR_SCAN_H */