From a2fb4e1e16a434365306b585ec89efc4f0e574cb Mon Sep 17 00:00:00 2001 From: Ypatia Tsavliri Date: Wed, 22 Jan 2025 16:20:49 +0200 Subject: [PATCH] Constrain non blocking tasks to I/O --- test/src/unit-ReadCellSlabIter.cc | 3 +- ...it-cppapi-consolidation-with-timestamps.cc | 2 +- test/src/unit-result-tile.cc | 12 +- tiledb/sm/filter/test/filter_test_support.cc | 3 +- tiledb/sm/filter/test/tile_data_generator.h | 3 +- tiledb/sm/metadata/test/unit_metadata.cc | 9 +- tiledb/sm/query/readers/dense_reader.cc | 17 +- tiledb/sm/query/readers/filtered_data.h | 16 +- tiledb/sm/query/readers/reader_base.cc | 156 +++++++----------- tiledb/sm/query/readers/reader_base.h | 15 +- tiledb/sm/query/readers/result_tile.cc | 6 +- tiledb/sm/query/readers/result_tile.h | 50 ++---- .../readers/sparse_global_order_reader.cc | 6 - tiledb/sm/query/test/unit_query_condition.cc | 39 ++--- tiledb/sm/tile/generic_tile_io.cc | 2 +- tiledb/sm/tile/test/unit_tile.cc | 3 +- tiledb/sm/tile/tile.cc | 53 +++--- tiledb/sm/tile/tile.h | 89 +++------- 18 files changed, 183 insertions(+), 301 deletions(-) diff --git a/test/src/unit-ReadCellSlabIter.cc b/test/src/unit-ReadCellSlabIter.cc index e25635da940..bdeef4e9c22 100644 --- a/test/src/unit-ReadCellSlabIter.cc +++ b/test/src/unit-ReadCellSlabIter.cc @@ -186,8 +186,7 @@ void set_result_tile_dim( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_coord_tile( constants::format_version, array_schema, diff --git a/test/src/unit-cppapi-consolidation-with-timestamps.cc b/test/src/unit-cppapi-consolidation-with-timestamps.cc index e6f9fbc1a1a..20a4d3f463f 100644 --- a/test/src/unit-cppapi-consolidation-with-timestamps.cc +++ b/test/src/unit-cppapi-consolidation-with-timestamps.cc @@ -636,7 +636,7 @@ TEST_CASE_METHOD( // Will only allow to load two tiles out of 3. Config cfg; - cfg.set("sm.mem.total_budget", "65000"); + cfg.set("sm.mem.total_budget", "50000"); cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15"); ctx_ = Context(cfg); diff --git a/test/src/unit-result-tile.cc b/test/src/unit-result-tile.cc index d0ca3b869d4..a04bba077b7 100644 --- a/test/src/unit-result-tile.cc +++ b/test/src/unit-result-tile.cc @@ -216,8 +216,7 @@ TEST_CASE_METHOD( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; rt.init_coord_tile( constants::format_version, array_schema, @@ -237,8 +236,7 @@ TEST_CASE_METHOD( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; rt.init_coord_tile( constants::format_version, array_schema, @@ -337,8 +335,7 @@ TEST_CASE_METHOD( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; rt.init_coord_tile( constants::format_version, array_schema, @@ -358,8 +355,7 @@ TEST_CASE_METHOD( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; rt.init_coord_tile( constants::format_version, array_schema, diff --git a/tiledb/sm/filter/test/filter_test_support.cc b/tiledb/sm/filter/test/filter_test_support.cc index 09a2d01cbf3..9b5f9b8cce0 100644 --- a/tiledb/sm/filter/test/filter_test_support.cc +++ b/tiledb/sm/filter/test/filter_test_support.cc @@ -204,8 +204,7 @@ Tile create_tile_for_unfiltering( tile->filtered_buffer().data(), tile->filtered_buffer().size(), tracker, - ThreadPool::SharedTask(), - nullptr}; + ThreadPool::SharedTask()}; } void run_reverse( diff --git a/tiledb/sm/filter/test/tile_data_generator.h b/tiledb/sm/filter/test/tile_data_generator.h index 626d4425f66..3c4329054a7 100644 --- a/tiledb/sm/filter/test/tile_data_generator.h +++ b/tiledb/sm/filter/test/tile_data_generator.h @@ -100,8 +100,7 @@ class TileDataGenerator { filtered_buffer.data(), filtered_buffer.size(), memory_tracker, - ThreadPool::SharedTask(), - nullptr); + ThreadPool::SharedTask()); } /** Returns the size of the original unfiltered data. */ diff --git a/tiledb/sm/metadata/test/unit_metadata.cc b/tiledb/sm/metadata/test/unit_metadata.cc index ef5762bd835..0b91d0246bd 100644 --- a/tiledb/sm/metadata/test/unit_metadata.cc +++ b/tiledb/sm/metadata/test/unit_metadata.cc @@ -124,8 +124,7 @@ TEST_CASE( tile1->filtered_buffer().data(), tile1->filtered_buffer().size(), tracker, - ThreadPool::SharedTask(), - nullptr); + ThreadPool::SharedTask()); memcpy(metadata_tiles[0]->data(), tile1->data(), tile1->size()); metadata_tiles[1] = tdb::make_shared( @@ -138,8 +137,7 @@ TEST_CASE( tile2->filtered_buffer().data(), tile2->filtered_buffer().size(), tracker, - ThreadPool::SharedTask(), - nullptr); + ThreadPool::SharedTask()); memcpy(metadata_tiles[1]->data(), tile2->data(), tile2->size()); metadata_tiles[2] = tdb::make_shared( @@ -152,8 +150,7 @@ TEST_CASE( tile3->filtered_buffer().data(), tile3->filtered_buffer().size(), tracker, - ThreadPool::SharedTask(), - nullptr); + ThreadPool::SharedTask()); memcpy(metadata_tiles[2]->data(), tile3->data(), tile3->size()); meta = Metadata::deserialize(metadata_tiles); diff --git a/tiledb/sm/query/readers/dense_reader.cc b/tiledb/sm/query/readers/dense_reader.cc index 729abf18d45..28d0a333c10 100644 --- a/tiledb/sm/query/readers/dense_reader.cc +++ b/tiledb/sm/query/readers/dense_reader.cc @@ -40,6 +40,7 @@ #include "tiledb/sm/misc/parallel_functions.h" #include "tiledb/sm/query/legacy/cell_slab_iter.h" #include "tiledb/sm/query/query_macros.h" +#include "tiledb/sm/query/readers/filtered_data.h" #include "tiledb/sm/query/readers/result_tile.h" #include "tiledb/sm/stats/global_stats.h" #include "tiledb/sm/subarray/subarray.h" @@ -462,6 +463,7 @@ Status DenseReader::dense_read() { // clear the memory. Also, a name in names might not be in the user buffers // so we might skip the copy but still clear the memory. for (auto& name : names) { + shared_ptr> filtered_data; std::vector result_tiles; bool validity_only = null_count_aggregate_only(name); bool dense_dim = name == constants::coords || array_schema_.is_dim(name); @@ -485,7 +487,8 @@ Status DenseReader::dense_read() { // Read and unfilter tiles. std::vector to_load; to_load.emplace_back(name, validity_only); - read_attribute_tiles(to_load, result_tiles); + filtered_data = make_shared>( + read_attribute_tiles(to_load, result_tiles)); } if (compute_task.valid()) { @@ -497,6 +500,7 @@ Status DenseReader::dense_read() { compute_task = resources_.compute_tp().execute([&, iteration_tile_data, + filtered_data, dense_dim, name, validity_only, @@ -508,6 +512,9 @@ Status DenseReader::dense_read() { // Unfilter tiles. RETURN_NOT_OK(unfilter_tiles(name, validity_only, result_tiles)); + // The filtered data is no longer required, release it. + filtered_data.reset(); + // Only copy names that are present in the user buffers. if (buffers_.count(name) != 0) { // Copy attribute data to users buffers. @@ -1068,13 +1075,16 @@ Status DenseReader::apply_query_condition( tiles_cell_num); // Read and unfilter query condition attributes. - read_attribute_tiles(NameToLoad::from_string_vec(qc_names), result_tiles); + shared_ptr> filtered_data = + make_shared>(read_attribute_tiles( + NameToLoad::from_string_vec(qc_names), result_tiles)); if (compute_task.valid()) { throw_if_not_ok(compute_task.wait()); } compute_task = resources_.compute_tp().execute([&, + filtered_data, iteration_tile_data, qc_names, num_range_threads, @@ -1091,6 +1101,9 @@ Status DenseReader::apply_query_condition( RETURN_NOT_OK(unfilter_tiles(name, false, result_tiles)); } + // The filtered data is no longer required, release it. + filtered_data.reset(); + if (stride == UINT64_MAX) { stride = 1; } diff --git a/tiledb/sm/query/readers/filtered_data.h b/tiledb/sm/query/readers/filtered_data.h index 72c29e5f8e9..915de7f28c3 100644 --- a/tiledb/sm/query/readers/filtered_data.h +++ b/tiledb/sm/query/readers/filtered_data.h @@ -329,13 +329,14 @@ class FilteredData { /* ********************************* */ /** - * Get the fixed filtered data for the result tile. + * Get a pointer to the fixed filtered data for the result tile and a future + * which signals when the data is valid. * * @param fragment Fragment metadata for the tile. * @param rt Result tile. * @return Fixed filtered data pointer. */ - inline std::tuple fixed_filtered_data( + inline std::pair fixed_filtered_data( const FragmentMetadata* fragment, const ResultTile* rt) { auto offset{ fragment->loaded_metadata()->file_offset(name_, rt->tile_idx())}; @@ -346,13 +347,13 @@ class FilteredData { } /** - * Get the var filtered data for the result tile. - * + * Get a pointer to the var filtered data for the result tile and a future + * which signals when the data is valid. * * @param fragment Fragment metadata for the tile. * @param rt Result tile. * @return Var filtered data pointer. */ - inline std::tuple var_filtered_data( + inline std::pair var_filtered_data( const FragmentMetadata* fragment, const ResultTile* rt) { if (!var_sized_) { return {nullptr, ThreadPool::SharedTask()}; @@ -367,13 +368,14 @@ class FilteredData { } /** - * Get the nullable filtered data for the result tile. + * Get a pointer to the nullable filtered data for the result tile and a + * future which signals when the data is valid. * * @param fragment Fragment metadata for the tile. * @param rt Result tile. * @return Nullable filtered data pointer. */ - inline std::tuple nullable_filtered_data( + inline std::pair nullable_filtered_data( const FragmentMetadata* fragment, const ResultTile* rt) { if (!nullable_) { return {nullptr, ThreadPool::SharedTask()}; diff --git a/tiledb/sm/query/readers/reader_base.cc b/tiledb/sm/query/readers/reader_base.cc index 642f133b84d..78045e5ce29 100644 --- a/tiledb/sm/query/readers/reader_base.cc +++ b/tiledb/sm/query/readers/reader_base.cc @@ -649,7 +649,7 @@ Status ReaderBase::read_and_unfilter_attribute_tiles( // eventually get rid of it altogether so that we can clarify the data flow. // At the end of this function call, all memory inside of 'filtered_data' has // been used and the tiles are unfiltered so the data can be deleted. - read_attribute_tiles(names, result_tiles); + auto filtered_data{read_attribute_tiles(names, result_tiles)}; for (auto& name : names) { RETURN_NOT_OK( unfilter_tiles(name.name(), name.validity_only(), result_tiles)); @@ -663,7 +663,7 @@ Status ReaderBase::read_and_unfilter_coordinate_tiles( const std::vector& result_tiles) { // See the comment in 'read_and_unfilter_attribute_tiles' to get more // information about the lifetime of this object. - read_coordinate_tiles(names, result_tiles); + auto filtered_data{read_coordinate_tiles(names, result_tiles)}; for (auto& name : names) { RETURN_NOT_OK(unfilter_tiles(name, false, result_tiles)); } @@ -671,28 +671,29 @@ Status ReaderBase::read_and_unfilter_coordinate_tiles( return Status::Ok(); } -void ReaderBase::read_attribute_tiles( +std::list ReaderBase::read_attribute_tiles( const std::vector& names, const std::vector& result_tiles) const { auto timer_se = stats_->start_timer("read_attribute_tiles"); return read_tiles(names, result_tiles); } -void ReaderBase::read_coordinate_tiles( +std::list ReaderBase::read_coordinate_tiles( const std::vector& names, const std::vector& result_tiles) const { auto timer_se = stats_->start_timer("read_coordinate_tiles"); return read_tiles(NameToLoad::from_string_vec(names), result_tiles); } -void ReaderBase::read_tiles( +std::list ReaderBase::read_tiles( const std::vector& names, const std::vector& result_tiles) const { auto timer_se = stats_->start_timer("read_tiles"); + std::list filtered_data; // Shortcut for empty tile vec. if (result_tiles.empty() || names.empty()) { - return; + return filtered_data; } uint64_t num_tiles_read{0}; @@ -707,8 +708,7 @@ void ReaderBase::read_tiles( // read and memory allocations. const bool var_sized{array_schema_.var_size(name)}; const bool nullable{array_schema_.is_nullable(name)}; - shared_ptr filtered_data = make_shared( - HERE(), + filtered_data.emplace_back( resources_, *this, min_batch_size_, @@ -747,14 +747,16 @@ void ReaderBase::read_tiles( // 'TileData' objects should be returned by this function and passed into // 'unfilter_tiles' so that the filter pipeline can stop using the // 'ResultTile' object to get access to the filtered data. - std::tuple n = { + std::pair t = { nullptr, ThreadPool::SharedTask()}; ResultTile::TileData tile_data{ - val_only ? n : - filtered_data->fixed_filtered_data(fragment.get(), tile), - val_only ? n : filtered_data->var_filtered_data(fragment.get(), tile), - filtered_data->nullable_filtered_data(fragment.get(), tile), - filtered_data}; + val_only ? + t : + filtered_data.back().fixed_filtered_data(fragment.get(), tile), + val_only ? + t : + filtered_data.back().var_filtered_data(fragment.get(), tile), + filtered_data.back().nullable_filtered_data(fragment.get(), tile)}; // Initialize the tile(s) const format_version_t format_version{fragment->format_version()}; @@ -777,7 +779,7 @@ void ReaderBase::read_tiles( stats_->add_counter("num_tiles_read", num_tiles_read); - return; + return filtered_data; } tuple, optional, optional> @@ -927,93 +929,59 @@ Status ReaderBase::unfilter_tiles( num_range_threads = 1 + ((num_threads - 1) / num_tiles); } - for (size_t i = 0; i < num_tiles; i++) { - auto result_tile = result_tiles[i]; - // if (skip_field(result_tile->frag_idx(), name)) { - // continue; - // } - ThreadPool::SharedTask task = - resources_.compute_tp().execute([name, - validity_only, - var_size, - nullable, - num_range_threads, - result_tile, - this]() { - const auto stat_type = - (array_schema_.is_attr(name)) ? - "unfilter_attr_tiles_builder.unfilter_attr_tiles" : - "unfilter_coord_tiles_builder.unfilter_coord_tiles"; - - const auto timer_se = stats_->start_timer(stat_type); - // Chunks for unfiltering - ChunkData tiles_chunk_data; - ChunkData tiles_chunk_var_data; - ChunkData tiles_chunk_validity_data; - auto&& [st, tile_size, tile_var_size, tile_validity_size] = - load_tile_chunk_data( - name, - validity_only, - result_tile, - var_size, - nullable, - tiles_chunk_data, - tiles_chunk_var_data, - tiles_chunk_validity_data); - if (!st.ok()) { - return st; - } - - if (tile_size.value_or(0) == 0 && tile_var_size.value_or(0) == 0 && - tile_validity_size.value_or(0) == 0) { - return Status::Ok(); - } + // Vectors with all the necessary chunk data for unfiltering + std::vector tiles_chunk_data(num_tiles); + std::vector tiles_chunk_var_data(num_tiles); + std::vector tiles_chunk_validity_data(num_tiles); - // The current threadpool design does not allow for unfiltering to - // happen in chunks using a parallel for within this async task as the - // wait_all in the end of the parallel for can deadlock. - for (uint64_t range_thread_idx = 0; - range_thread_idx < num_range_threads; - range_thread_idx++) { - st = unfilter_tile( + // Pre-compute chunk offsets. + auto status = parallel_for( + &resources_.compute_tp(), 0, num_tiles, [&, this](uint64_t i) { + auto&& [st, tile_size, tile_var_size, tile_validity_size] = + load_tile_chunk_data( name, validity_only, - result_tile, + result_tiles[i], var_size, nullable, - range_thread_idx, - num_range_threads, - tiles_chunk_data, - tiles_chunk_var_data, - tiles_chunk_validity_data); - if (!st.ok()) { - return st; - } - } - - // Perform required post-processing of unfiltered tiles - return post_process_unfiltered_tile( - name, validity_only, result_tile, var_size, nullable); - }); - - if (skip_field(result_tile->frag_idx(), name)) { - RETURN_NOT_OK(task.wait()); - continue; - } + tiles_chunk_data[i], + tiles_chunk_var_data[i], + tiles_chunk_validity_data[i]); + throw_if_not_ok(st); + return Status::Ok(); + }); + RETURN_NOT_OK_ELSE(status, throw_if_not_ok(logger_->status(status))); - // Unfiltering tasks have been launched, set the tasks to wait for in the - // corresponding tiles. When those tasks(futures) will be ready the tile - // processing that depends on the unfiltered tile will get unblocked. - auto tile_tuple = result_tile->tile_tuple(name); - tile_tuple->fixed_tile().set_unfilter_data_compute_task(task); + if (tiles_chunk_data.empty()) + return Status::Ok(); - if (var_size && !validity_only) { - tile_tuple->var_tile().set_unfilter_data_compute_task(task); - } + // Unfilter all tiles/chunks in parallel using the precomputed offsets. + status = parallel_for_2d( + &resources_.compute_tp(), + 0, + num_tiles, + 0, + num_range_threads, + [&](uint64_t i, uint64_t range_thread_idx) { + throw_if_not_ok(unfilter_tile( + name, + validity_only, + result_tiles[i], + var_size, + nullable, + range_thread_idx, + num_range_threads, + tiles_chunk_data[i], + tiles_chunk_var_data[i], + tiles_chunk_validity_data[i])); + return Status::Ok(); + }); + RETURN_CANCEL_OR_ERROR(status); - if (nullable) { - tile_tuple->validity_tile().set_unfilter_data_compute_task(task); - } + // Perform required post-processing of unfiltered tiles + for (size_t i = 0; i < num_tiles; i++) { + RETURN_NOT_OK(post_process_unfiltered_tile( + name, validity_only, result_tiles[i], var_size, nullable)); } return Status::Ok(); diff --git a/tiledb/sm/query/readers/reader_base.h b/tiledb/sm/query/readers/reader_base.h index c62a2f78acd..71b0bc524ad 100644 --- a/tiledb/sm/query/readers/reader_base.h +++ b/tiledb/sm/query/readers/reader_base.h @@ -543,7 +543,8 @@ class ReaderBase : public StrategyBase { /** * Concurrently executes across each name in `names` and each result tile - * in 'result_tiles'. + * in 'result_tiles'. Attaches a future to each result_tile that is signaling + * when reading the corresponding data from disk is done. * * This must be the entry point for reading attribute tiles because it * generates stats for reading attributes. @@ -553,13 +554,14 @@ class ReaderBase : public StrategyBase { * `ResultTile` instances in this vector. * @return Filtered data blocks. */ - void read_attribute_tiles( + std::list read_attribute_tiles( const std::vector& names, const std::vector& result_tiles) const; /** * Concurrently executes across each name in `names` and each result tile - * in 'result_tiles'. + * in 'result_tiles'. Attaches a future to each result_tile that is signaling + * when reading the corresponding data from disk is done. * * This must be the entry point for reading coordinate tiles because it * generates stats for reading coordinates. @@ -569,7 +571,7 @@ class ReaderBase : public StrategyBase { * `ResultTile` instances in this vector. * @return Filtered data blocks. */ - void read_coordinate_tiles( + std::list read_coordinate_tiles( const std::vector& names, const std::vector& result_tiles) const; @@ -578,7 +580,8 @@ class ReaderBase : public StrategyBase { * in the appropriate result tile. * * Concurrently executes across each name in `names` and each result tile - * in 'result_tiles'. + * in 'result_tiles'. Attaches a future to each result_tile that is signaling + * when reading the corresponding data from disk is done. * * @param names The field names. * @param result_tiles The retrieved tiles will be stored inside the @@ -586,7 +589,7 @@ class ReaderBase : public StrategyBase { * @param validity_only Is the field read for validity only. * @return Filtered data blocks. */ - void read_tiles( + std::list read_tiles( const std::vector& names, const std::vector& result_tiles) const; diff --git a/tiledb/sm/query/readers/result_tile.cc b/tiledb/sm/query/readers/result_tile.cc index c5901e40d15..3ae40019da6 100644 --- a/tiledb/sm/query/readers/result_tile.cc +++ b/tiledb/sm/query/readers/result_tile.cc @@ -95,9 +95,13 @@ ResultTile::~ResultTile() { try { // Wait for all tasks to be done wait_all_attrs(); + } catch (...) { + } + + try { + // Wait for all tasks to be done wait_all_coords(); } catch (...) { - return; } } diff --git a/tiledb/sm/query/readers/result_tile.h b/tiledb/sm/query/readers/result_tile.h index b762d660613..25a8b99fba2 100644 --- a/tiledb/sm/query/readers/result_tile.h +++ b/tiledb/sm/query/readers/result_tile.h @@ -64,7 +64,6 @@ class Domain; class FragmentMetadata; class QueryCondition; class Subarray; -class FilteredData; /** * Utilitary function to sort result tiles by fragment first then tile index. @@ -214,17 +213,15 @@ class ResultTile { /* CONSTRUCTORS & DESTRUCTORS */ /* ********************************* */ TileData( - std::tuple fixed_filtered_data, - std::tuple var_filtered_data, - std::tuple validity_filtered_data, - shared_ptr filtered_data) - : fixed_filtered_data_(std::get<0>(fixed_filtered_data)) - , var_filtered_data_(std::get<0>(var_filtered_data)) - , validity_filtered_data_(std::get<0>(validity_filtered_data)) - , fixed_filtered_data_task_(std::get<1>(fixed_filtered_data)) - , var_filtered_data_task_(std::get<1>(var_filtered_data)) - , validity_filtered_data_task_(std::get<1>(validity_filtered_data)) - , filtered_data_(std::move(filtered_data)) { + std::pair fixed_filtered_data, + std::pair var_filtered_data, + std::pair validity_filtered_data) + : fixed_filtered_data_(fixed_filtered_data.first) + , var_filtered_data_(var_filtered_data.first) + , validity_filtered_data_(validity_filtered_data.first) + , fixed_filtered_data_task_(fixed_filtered_data.second) + , var_filtered_data_task_(var_filtered_data.second) + , validity_filtered_data_task_(validity_filtered_data.second) { } ~TileData() { @@ -232,16 +229,21 @@ class ResultTile { if (fixed_filtered_data_task_.valid()) { auto st = fixed_filtered_data_task_.wait(); } + } catch (...) { + } + try { if (var_filtered_data_task_.valid()) { auto st = var_filtered_data_task_.wait(); } + } catch (...) { + } + try { if (validity_filtered_data_task_.valid()) { auto st = validity_filtered_data_task_.wait(); } } catch (...) { - return; } } @@ -279,16 +281,6 @@ class ResultTile { return validity_filtered_data_task_; } - /** @return shared_ptr to FilteredData block used by this Tile. */ - inline shared_ptr filtered_data() const { - return filtered_data_; - } - - /** Clear the held filtered data. */ - inline void clear_filtered_data() { - filtered_data_ = nullptr; - } - private: /* ********************************* */ /* PRIVATE ATTRIBUTES */ @@ -311,9 +303,6 @@ class ResultTile { /** Stores the validity filtered data I/O task. */ ThreadPool::SharedTask validity_filtered_data_task_; - - /** Pointer to hold the filtered data block as long as needed. */ - shared_ptr filtered_data_; }; /** @@ -348,8 +337,7 @@ class ResultTile { tile_data.fixed_filtered_data(), tile_sizes.tile_persisted_size(), memory_tracker_, - tile_data.fixed_filtered_data_task(), - tile_data.filtered_data()) { + tile_data.fixed_filtered_data_task()) { if (tile_sizes.has_var_tile()) { auto type = array_schema.type(name); var_tile_.emplace( @@ -361,8 +349,7 @@ class ResultTile { tile_data.var_filtered_data(), tile_sizes.tile_var_persisted_size(), memory_tracker_, - tile_data.var_filtered_data_task(), - tile_data.filtered_data()); + tile_data.var_filtered_data_task()); } if (tile_sizes.has_validity_tile()) { @@ -375,8 +362,7 @@ class ResultTile { tile_data.validity_filtered_data(), tile_sizes.tile_validity_persisted_size(), memory_tracker_, - tile_data.validity_filtered_data_task(), - tile_data.filtered_data()); + tile_data.validity_filtered_data_task()); } } diff --git a/tiledb/sm/query/readers/sparse_global_order_reader.cc b/tiledb/sm/query/readers/sparse_global_order_reader.cc index 2204c631b90..b6437244a18 100644 --- a/tiledb/sm/query/readers/sparse_global_order_reader.cc +++ b/tiledb/sm/query/readers/sparse_global_order_reader.cc @@ -815,12 +815,6 @@ bool SparseGlobalOrderReader::add_next_cell_to_queue( return true; } - // This enforces all the coords unfiltering results to be available before - // taking the lock on tile_queue_mutex_. This is to avoid a deadlock where a - // lock is held forever while waiting for a result to be available, while - // the next scheduled task is deadlocking on that lock - rc.tile_->wait_all_coords(); - std::unique_lock ul(tile_queue_mutex_); // Add all the cells in this tile with the same coordinates as this cell diff --git a/tiledb/sm/query/test/unit_query_condition.cc b/tiledb/sm/query/test/unit_query_condition.cc index 9ede86f6c17..a90e941f3c6 100644 --- a/tiledb/sm/query/test/unit_query_condition.cc +++ b/tiledb/sm/query/test/unit_query_condition.cc @@ -1631,8 +1631,7 @@ void test_apply(const Datatype type, bool var_size, bool nullable) { ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -1694,8 +1693,7 @@ void test_apply(const Datatype type, bool var_size, bool nullable) { ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -1814,8 +1812,7 @@ TEST_CASE( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -2360,8 +2357,7 @@ void test_apply_dense( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -2423,8 +2419,7 @@ void test_apply_dense(const Datatype type, bool var_size, bool nullable) { ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -2542,8 +2537,7 @@ TEST_CASE( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -3081,8 +3075,7 @@ void test_apply_sparse( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -3144,8 +3137,7 @@ void test_apply_sparse(const Datatype type, bool var_size, bool nullable) { ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -3927,8 +3919,7 @@ TEST_CASE( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -4223,8 +4214,7 @@ TEST_CASE( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -4645,8 +4635,7 @@ TEST_CASE( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -4914,8 +4903,7 @@ TEST_CASE( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -5024,8 +5012,7 @@ TEST_CASE( ResultTile::TileData tile_data{ {nullptr, ThreadPool::SharedTask()}, {nullptr, ThreadPool::SharedTask()}, - {nullptr, ThreadPool::SharedTask()}, - nullptr}; + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, diff --git a/tiledb/sm/tile/generic_tile_io.cc b/tiledb/sm/tile/generic_tile_io.cc index 76fbff633f2..c74dc4b577a 100644 --- a/tiledb/sm/tile/generic_tile_io.cc +++ b/tiledb/sm/tile/generic_tile_io.cc @@ -122,7 +122,7 @@ shared_ptr GenericTileIO::read_generic( header.persisted_size, memory_tracker->get_resource(MemoryType::GENERIC_TILE_IO), ThreadPool::SharedTask(), - nullptr); + true); // Read the tile. throw_if_not_ok(resources_.vfs().read( diff --git a/tiledb/sm/tile/test/unit_tile.cc b/tiledb/sm/tile/test/unit_tile.cc index 82649d08428..7d99a5d74ed 100644 --- a/tiledb/sm/tile/test/unit_tile.cc +++ b/tiledb/sm/tile/test/unit_tile.cc @@ -58,8 +58,7 @@ TEST_CASE("Tile: Test basic IO", "[Tile][basic_io]") { nullptr, 0, tracker, - ThreadPool::SharedTask(), - nullptr); + ThreadPool::SharedTask()); CHECK(tile.size() == tile_size); // Create a buffer to write to the test Tile. diff --git a/tiledb/sm/tile/tile.cc b/tiledb/sm/tile/tile.cc index 9d5b3badb9b..6699d1aeb2e 100644 --- a/tiledb/sm/tile/tile.cc +++ b/tiledb/sm/tile/tile.cc @@ -71,8 +71,7 @@ shared_ptr Tile::from_generic( nullptr, 0, memory_tracker->get_resource(MemoryType::GENERIC_TILE_IO), - ThreadPool::SharedTask(), - nullptr); + ThreadPool::SharedTask()); } shared_ptr WriterTile::from_generic( @@ -110,15 +109,13 @@ TileBase::TileBase( const Datatype type, const uint64_t cell_size, const uint64_t size, - tdb::pmr::memory_resource* resource, - const bool skip_waiting_on_io_task) + tdb::pmr::memory_resource* resource) : resource_(resource) , data_(tdb::pmr::make_unique(resource_, size)) , size_(size) , cell_size_(cell_size) , format_version_(format_version) - , type_(type) - , skip_waiting_on_io_task_(skip_waiting_on_io_task) { + , type_(type) { /* * We can check for a bad allocation after initialization without risk * because none of the other member variables use its value for their own @@ -139,7 +136,7 @@ Tile::Tile( uint64_t filtered_size, shared_ptr memory_tracker, ThreadPool::SharedTask data_io_task, - shared_ptr filtered_data_block) + const bool skip_waiting_on_io_task) : Tile( format_version, type, @@ -150,7 +147,7 @@ Tile::Tile( filtered_size, memory_tracker->get_resource(MemoryType::TILE_DATA), std::move(data_io_task), - std::move(filtered_data_block)) { + skip_waiting_on_io_task) { } Tile::Tile( @@ -163,19 +160,13 @@ Tile::Tile( uint64_t filtered_size, tdb::pmr::memory_resource* resource, ThreadPool::SharedTask filtered_data_io_task, - shared_ptr filtered_data_block) - : TileBase( - format_version, - type, - cell_size, - size, - resource, - filtered_data_block == nullptr) + const bool skip_waiting_on_io_task) + : TileBase(format_version, type, cell_size, size, resource) , zipped_coords_dim_num_(zipped_coords_dim_num) , filtered_data_(filtered_data) , filtered_size_(filtered_size) , filtered_data_io_task_(std::move(filtered_data_io_task)) - , filtered_data_block_(std::move(filtered_data_block)) { + , skip_waiting_on_io_task_(skip_waiting_on_io_task) { } WriterTile::WriterTile( @@ -189,8 +180,7 @@ WriterTile::WriterTile( type, cell_size, size, - memory_tracker->get_resource(MemoryType::WRITER_TILE_DATA), - true) + memory_tracker->get_resource(MemoryType::WRITER_TILE_DATA)) , filtered_buffer_(0) { } @@ -200,7 +190,7 @@ WriterTile::WriterTile( const uint64_t cell_size, const uint64_t size, tdb::pmr::memory_resource* resource) - : TileBase(format_version, type, cell_size, size, resource, true) + : TileBase(format_version, type, cell_size, size, resource) , filtered_buffer_(0) { } @@ -210,14 +200,6 @@ WriterTile::WriterTile( void TileBase::read( void* const buffer, const uint64_t offset, const uint64_t nbytes) const { - if (!skip_waiting_on_io_task_) { - if (unfilter_data_compute_task_.valid()) { - throw_if_not_ok(unfilter_data_compute_task_.wait()); - } else { - throw std::future_error(std::future_errc::no_state); - } - } - if (nbytes > size_ - offset) { throw TileException("Read tile overflow; may not read beyond buffer size"); } @@ -279,11 +261,6 @@ void WriterTile::clear_data() { size_ = 0; } -void Tile::set_unfilter_data_compute_task( - ThreadPool::SharedTask unfilter_data_compute_task) { - unfilter_data_compute_task_ = std::move(unfilter_data_compute_task); -} - void WriterTile::write_var(const void* data, uint64_t offset, uint64_t nbytes) { if (size_ - offset < nbytes) { auto new_alloc_size = size_ == 0 ? offset + nbytes : size_; @@ -311,9 +288,17 @@ void WriterTile::write_var(const void* data, uint64_t offset, uint64_t nbytes) { uint64_t Tile::load_chunk_data( ChunkData& unfiltered_tile, uint64_t expected_original_size) { - std::scoped_lock lock{filtered_data_io_task_mtx_}; assert(filtered()); + if (!skip_waiting_on_io_task_) { + std::scoped_lock lock{filtered_data_io_task_mtx_}; + if (filtered_data_io_task_.valid()) { + throw_if_not_ok(filtered_data_io_task_.wait()); + } else { + throw std::future_error(std::future_errc::no_state); + } + } + Deserializer deserializer(filtered_data(), filtered_size()); // Make a pass over the tile to get the chunk information. diff --git a/tiledb/sm/tile/tile.h b/tiledb/sm/tile/tile.h index a0c01cdfd3f..a823f74643c 100644 --- a/tiledb/sm/tile/tile.h +++ b/tiledb/sm/tile/tile.h @@ -62,30 +62,18 @@ class TileBase { * @param cell_size The cell size. * @param size The size of the tile. * @param resource The memory resource to use. - * @param skip_waiting_on_io_task whether to skip waiting on I/O tasks and - * directly access data() or block. By default is false, so by default we - * block waiting. Used when we create generic tiles or in testing. */ TileBase( const format_version_t format_version, const Datatype type, const uint64_t cell_size, const uint64_t size, - tdb::pmr::memory_resource* resource, - const bool skip_waiting_on_io_task = false); + tdb::pmr::memory_resource* resource); DISABLE_COPY_AND_COPY_ASSIGN(TileBase); DISABLE_MOVE_AND_MOVE_ASSIGN(TileBase); - virtual ~TileBase() { - if (unfilter_data_compute_task_.valid()) { - try { - auto st = unfilter_data_compute_task_.wait(); - } catch (...) { - return; - } - } - } + virtual ~TileBase() = default; /* ********************************* */ /* API */ @@ -125,14 +113,6 @@ class TileBase { /** Returns the internal buffer. */ inline void* data() const { - if (!skip_waiting_on_io_task_) { - if (unfilter_data_compute_task_.valid()) { - throw_if_not_ok(unfilter_data_compute_task_.wait()); - } else { - throw std::future_error(std::future_errc::no_state); - } - } - return data_.get(); } @@ -201,14 +181,6 @@ class TileBase { /** The tile data type. */ Datatype type_; - - /** - * Whether to block waiting for io data to be ready before accessing data() - */ - const bool skip_waiting_on_io_task_; - - /** Compute task to check and block on if unfiltered data is ready. */ - mutable ThreadPool::SharedTask unfilter_data_compute_task_; }; /** @@ -242,8 +214,9 @@ class Tile : public TileBase { * @param filtered_size The filtered size to allocate. * @param memory_tracker The memory resource to use. * @param filtered_data_io_task The I/O task to wait on for data to be valid. - * @param filtered_data_block The FilteredData block class which backs the - * memory for this filtered tile. + * @param skip_waiting_on_io_task whether to skip waiting on I/O tasks and + * directly access data() or block. By default is false, so by default we + * block waiting. Used when we create generic tiles or in testing. */ Tile( const format_version_t format_version, @@ -255,7 +228,7 @@ class Tile : public TileBase { uint64_t filtered_size, shared_ptr memory_tracker, ThreadPool::SharedTask filtered_data_io_task, - shared_ptr filtered_data_block); + const bool skip_waiting_on_io_task = false); /** * Constructor. @@ -270,8 +243,9 @@ class Tile : public TileBase { * @param filtered_size The filtered size to allocate. * @param resource The memory resource to use. * @param filtered_data_io_task The I/O task to wait on for data to be valid. - * @param filtered_data_block The FilteredData block class which backs the - * memory for this filtered tile. + * @param skip_waiting_on_io_task whether to skip waiting on I/O tasks and + * directly access data() or block. By default is false, so by default we + * block waiting. Used when we create generic tiles or in testing. */ Tile( const format_version_t format_version, @@ -283,20 +257,12 @@ class Tile : public TileBase { uint64_t filtered_size, tdb::pmr::memory_resource* resource, ThreadPool::SharedTask filtered_data_io_task, - shared_ptr filtered_data_block); + const bool skip_waiting_on_io_task = false); DISABLE_MOVE_AND_MOVE_ASSIGN(Tile); DISABLE_COPY_AND_COPY_ASSIGN(Tile); - ~Tile() { - if (unfilter_data_compute_task_.valid()) { - try { - auto st = unfilter_data_compute_task_.wait(); - } catch (...) { - return; - } - } - } + ~Tile() = default; /* ********************************* */ /* API */ @@ -323,8 +289,7 @@ class Tile : public TileBase { /** Returns the buffer that contains the filtered, on-disk format. */ inline char* filtered_data() { - // if an i/o task has been launched - if (filtered_data_block_ != nullptr) { + if (!skip_waiting_on_io_task_) { std::scoped_lock lock{filtered_data_io_task_mtx_}; if (filtered_data_io_task_.valid()) { throw_if_not_ok(filtered_data_io_task_.wait()); @@ -332,21 +297,17 @@ class Tile : public TileBase { throw std::future_error(std::future_errc::no_state); } } - return static_cast(filtered_data_); } /** Returns the data casted as a type. */ template inline T* filtered_data_as() { - // if an i/o task has been launched - if (filtered_data_block_ != nullptr) { - std::scoped_lock lock{filtered_data_io_task_mtx_}; - if (filtered_data_io_task_.valid()) { - throw_if_not_ok(filtered_data_io_task_.wait()); - } else { - throw std::future_error(std::future_errc::no_state); - } + std::scoped_lock lock{filtered_data_io_task_mtx_}; + if (filtered_data_io_task_.valid()) { + throw_if_not_ok(filtered_data_io_task_.wait()); + } else { + throw std::future_error(std::future_errc::no_state); } return static_cast(filtered_data_); @@ -354,7 +315,7 @@ class Tile : public TileBase { /** Clears the filtered buffer. */ void clear_filtered_buffer() { - if (filtered_data_block_ != nullptr) { + if (!skip_waiting_on_io_task_) { std::scoped_lock lock{filtered_data_io_task_mtx_}; if (filtered_data_io_task_.valid()) { throw_if_not_ok(filtered_data_io_task_.wait()); @@ -365,7 +326,6 @@ class Tile : public TileBase { filtered_data_ = nullptr; filtered_size_ = 0; - filtered_data_block_ = nullptr; } /** @@ -402,14 +362,6 @@ class Tile : public TileBase { */ uint64_t load_offsets_chunk_data(ChunkData& chunk_data); - /** - * Set task for filter pipeline unfiltering to allow async monitoring - * - * @param unfilter_data_compute_task task for unfiltering - */ - void set_unfilter_data_compute_task( - ThreadPool::SharedTask unfilter_data_compute_task); - private: /* ********************************* */ /* PRIVATE FUNCTIONS */ @@ -478,10 +430,9 @@ class Tile : public TileBase { mutable std::recursive_mutex filtered_data_io_task_mtx_; /** - * shared_ptr to the FilteredData class that backs this tile. We keep a shared - * pointer to maintain the lifetime. + * Whether to block waiting for io data to be ready before accessing data() */ - shared_ptr filtered_data_block_; + const bool skip_waiting_on_io_task_; }; /**