From 6885b87568b6aea152ead8cc15cc983ea5a841a2 Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Mon, 23 Oct 2023 13:46:07 +0300 Subject: [PATCH 01/11] Decouple `StorageManager` from `FragmentMetadata`. --- tiledb/sm/fragment/fragment_info.cc | 2 +- tiledb/sm/fragment/fragment_metadata.cc | 101 +++++++++--------- tiledb/sm/fragment/fragment_metadata.h | 15 ++- .../sm/query/writers/global_order_writer.cc | 3 +- tiledb/sm/query/writers/unordered_writer.cc | 2 +- tiledb/sm/query/writers/writer_base.cc | 2 +- tiledb/sm/serialization/array.cc | 2 +- tiledb/sm/serialization/fragment_metadata.cc | 6 +- tiledb/sm/serialization/fragment_metadata.h | 4 +- tiledb/sm/storage_manager/storage_manager.cc | 4 +- 10 files changed, 71 insertions(+), 70 deletions(-) diff --git a/tiledb/sm/fragment/fragment_info.cc b/tiledb/sm/fragment/fragment_info.cc index 72b140abdb0..255e2747666 100644 --- a/tiledb/sm/fragment/fragment_info.cc +++ b/tiledb/sm/fragment/fragment_info.cc @@ -1061,7 +1061,7 @@ tuple> FragmentInfo::load( // Get fragment non-empty domain auto meta = make_shared( HERE(), - storage_manager_, + &storage_manager_->resources(), nullptr, array_schema_latest, new_fragment_uri, diff --git a/tiledb/sm/fragment/fragment_metadata.cc b/tiledb/sm/fragment/fragment_metadata.cc index f4afa12b5ac..5c678273f64 100644 --- a/tiledb/sm/fragment/fragment_metadata.cc +++ b/tiledb/sm/fragment/fragment_metadata.cc @@ -81,7 +81,7 @@ FragmentMetadata::FragmentMetadata() { } FragmentMetadata::FragmentMetadata( - StorageManager* storage_manager, + ContextResources* resources, MemoryTracker* memory_tracker, const shared_ptr& array_schema, const URI& fragment_uri, @@ -89,7 +89,7 @@ FragmentMetadata::FragmentMetadata( bool dense, bool has_timestamps, bool has_deletes_meta) - : storage_manager_(storage_manager) + : context_resources_(resources) , memory_tracker_(memory_tracker) , array_schema_(array_schema) , dense_(dense) @@ -115,7 +115,7 @@ FragmentMetadata::~FragmentMetadata() = default; // Copy initialization FragmentMetadata::FragmentMetadata(const FragmentMetadata& other) { - storage_manager_ = other.storage_manager_; + context_resources_ = other.context_resources_; array_schema_ = other.array_schema_; dense_ = other.dense_; fragment_uri_ = other.fragment_uri_; @@ -136,7 +136,7 @@ FragmentMetadata::FragmentMetadata(const FragmentMetadata& other) { } FragmentMetadata& FragmentMetadata::operator=(const FragmentMetadata& other) { - storage_manager_ = other.storage_manager_; + context_resources_ = other.context_resources_; array_schema_ = other.array_schema_; dense_ = other.dense_; fragment_uri_ = other.fragment_uri_; @@ -377,7 +377,7 @@ void FragmentMetadata::compute_fragment_min_max_sum_null_count() { // Process all attributes in parallel. throw_if_not_ok(parallel_for( - storage_manager_->compute_tp(), 0, idx_map_.size(), [&](uint64_t n) { + &context_resources_->compute_tp(), 0, idx_map_.size(), [&](uint64_t n) { // For easy reference. const auto& name = names[n]; const auto& idx = idx_map_[name]; @@ -681,7 +681,7 @@ uint64_t FragmentMetadata::fragment_size() const { auto meta_uri = fragment_uri_.join_path( std::string(constants::fragment_metadata_filename)); throw_if_not_ok( - storage_manager_->vfs()->file_size(meta_uri, &meta_file_size)); + context_resources_->vfs().file_size(meta_uri, &meta_file_size)); } // Validate that the meta_file_size is not zero, either preloaded or fetched // above @@ -795,7 +795,7 @@ Status FragmentMetadata::load( // buffer if (fragment_metadata_tile == nullptr) { RETURN_NOT_OK( - storage_manager_->vfs()->file_size(meta_uri, &meta_file_size_)); + context_resources_->vfs().file_size(meta_uri, &meta_file_size_)); } // Get fragment name version @@ -812,7 +812,7 @@ Status FragmentMetadata::load( void FragmentMetadata::store(const EncryptionKey& encryption_key) { auto timer_se = - storage_manager_->stats()->start_timer("write_store_frag_meta"); + context_resources_->stats().start_timer("write_store_frag_meta"); if (version_ < 7) { auto fragment_metadata_uri = @@ -890,7 +890,7 @@ Status FragmentMetadata::store_v7_v10(const EncryptionKey& encryption_key) { throw_if_not_ok(store_footer(encryption_key)); // Close file - return storage_manager_->vfs()->close_file(fragment_metadata_uri); + return context_resources_->vfs().close_file(fragment_metadata_uri); } Status FragmentMetadata::store_v11(const EncryptionKey& encryption_key) { @@ -972,7 +972,7 @@ Status FragmentMetadata::store_v11(const EncryptionKey& encryption_key) { RETURN_NOT_OK_ELSE(store_footer(encryption_key), clean_up()); // Close file - return storage_manager_->vfs()->close_file(fragment_metadata_uri); + return context_resources_->vfs().close_file(fragment_metadata_uri); } Status FragmentMetadata::store_v12_v14(const EncryptionKey& encryption_key) { @@ -1059,7 +1059,7 @@ Status FragmentMetadata::store_v12_v14(const EncryptionKey& encryption_key) { throw_if_not_ok(store_footer(encryption_key)); // Close file - return storage_manager_->vfs()->close_file(fragment_metadata_uri); + return context_resources_->vfs().close_file(fragment_metadata_uri); } Status FragmentMetadata::store_v15_or_higher( @@ -1152,7 +1152,7 @@ Status FragmentMetadata::store_v15_or_higher( throw_if_not_ok(store_footer(encryption_key)); // Close file - return storage_manager_->vfs()->close_file(fragment_metadata_uri); + return context_resources_->vfs().close_file(fragment_metadata_uri); } Status FragmentMetadata::set_num_tiles(uint64_t num_tiles) { @@ -1478,7 +1478,7 @@ Status FragmentMetadata::load_fragment_min_max_sum_null_count( RETURN_NOT_OK(st); auto& tile = *tile_opt; - storage_manager_->stats()->add_counter( + context_resources_->stats().add_counter( "read_fragment_min_max_sum_null_count_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); @@ -1506,7 +1506,7 @@ Status FragmentMetadata::load_processed_conditions( RETURN_NOT_OK(st); auto& tile = *tile_opt; - storage_manager_->stats()->add_counter( + context_resources_->stats().add_counter( "read_processed_conditions_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); @@ -2002,7 +2002,7 @@ Status FragmentMetadata::load_rtree(const EncryptionKey& encryption_key) { RETURN_NOT_OK(st); auto& tile = *tile_opt; - storage_manager_->stats()->add_counter("read_rtree_size", tile.size()); + context_resources_->stats().add_counter("read_rtree_size", tile.size()); // Use the serialized buffer size to approximate memory usage of the rtree. if (memory_tracker_ != nullptr && @@ -2126,10 +2126,10 @@ Status FragmentMetadata::get_footer_offset_and_size( URI fragment_metadata_uri = fragment_uri_.join_path( std::string(constants::fragment_metadata_filename)); uint64_t size_offset = meta_file_size_ - sizeof(uint64_t); - RETURN_NOT_OK(storage_manager_->vfs()->read( + RETURN_NOT_OK(context_resources_->vfs().read( fragment_metadata_uri, size_offset, size, sizeof(uint64_t))); *offset = meta_file_size_ - *size - sizeof(uint64_t); - storage_manager_->stats()->add_counter( + context_resources_->stats().add_counter( "read_frag_meta_size", sizeof(uint64_t)); } @@ -2407,7 +2407,8 @@ Status FragmentMetadata::load_tile_offsets( RETURN_NOT_OK(st); auto& tile = *tile_opt; - storage_manager_->stats()->add_counter("read_tile_offsets_size", tile.size()); + context_resources_->stats().add_counter( + "read_tile_offsets_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_offsets(idx, deserializer); @@ -2436,7 +2437,7 @@ Status FragmentMetadata::load_tile_var_offsets( RETURN_NOT_OK(st); auto& tile = *tile_opt; - storage_manager_->stats()->add_counter( + context_resources_->stats().add_counter( "read_tile_var_offsets_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); @@ -2462,7 +2463,7 @@ Status FragmentMetadata::load_tile_var_sizes( RETURN_NOT_OK(st); auto& tile = *tile_opt; - storage_manager_->stats()->add_counter( + context_resources_->stats().add_counter( "read_tile_var_sizes_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); @@ -2488,7 +2489,7 @@ Status FragmentMetadata::load_tile_validity_offsets( RETURN_NOT_OK(st); auto& tile = *tile_opt; - storage_manager_->stats()->add_counter( + context_resources_->stats().add_counter( "read_tile_validity_offsets_size", tile.size()); ConstBuffer cbuff(tile.data(), tile.size()); @@ -2514,7 +2515,7 @@ Status FragmentMetadata::load_tile_min_values( RETURN_NOT_OK(st); auto& tile = *tile_opt; - storage_manager_->stats()->add_counter("read_tile_min_size", tile.size()); + context_resources_->stats().add_counter("read_tile_min_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_min_values(idx, deserializer); @@ -2539,7 +2540,7 @@ Status FragmentMetadata::load_tile_max_values( RETURN_NOT_OK(st); auto& tile = *tile_opt; - storage_manager_->stats()->add_counter("read_tile_max_size", tile.size()); + context_resources_->stats().add_counter("read_tile_max_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_max_values(idx, deserializer); @@ -2564,7 +2565,7 @@ Status FragmentMetadata::load_tile_sum_values( RETURN_NOT_OK(st); auto& tile = *tile_opt; - storage_manager_->stats()->add_counter("read_tile_sum_size", tile.size()); + context_resources_->stats().add_counter("read_tile_sum_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_sum_values(idx, deserializer); @@ -2589,7 +2590,7 @@ Status FragmentMetadata::load_tile_null_count_values( RETURN_NOT_OK(st); auto& tile = *tile_opt; - storage_manager_->stats()->add_counter( + context_resources_->stats().add_counter( "read_tile_null_count_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); @@ -3584,13 +3585,13 @@ Status FragmentMetadata::load_v1_v2( URI fragment_metadata_uri = fragment_uri_.join_path( std::string(constants::fragment_metadata_filename)); // Read metadata - GenericTileIO tile_io(storage_manager_->resources(), fragment_metadata_uri); + GenericTileIO tile_io(*context_resources_, fragment_metadata_uri); auto&& [st, tile_opt] = - tile_io.read_generic(0, encryption_key, storage_manager_->config()); + tile_io.read_generic(0, encryption_key, context_resources_->config()); RETURN_NOT_OK(st); auto& tile = *tile_opt; - storage_manager_->stats()->add_counter("read_frag_meta_size", tile.size()); + context_resources_->stats().add_counter("read_frag_meta_size", tile.size()); // Pre-v10 format fragments we need to set the schema and schema name to // the "old" schema. This way "old" fragments are still loaded fine @@ -3869,7 +3870,7 @@ Status FragmentMetadata::store_rtree( const EncryptionKey& encryption_key, uint64_t* nbytes) { auto rtree_tile = write_rtree(); RETURN_NOT_OK(write_generic_tile_to_file(encryption_key, rtree_tile, nbytes)); - storage_manager_->stats()->add_counter("write_rtree_size", *nbytes); + context_resources_->stats().add_counter("write_rtree_size", *nbytes); return Status::Ok(); } @@ -3934,9 +3935,9 @@ tuple> FragmentMetadata::read_generic_tile_from_file( std::string(constants::fragment_metadata_filename)); // Read metadata - GenericTileIO tile_io(storage_manager_->resources(), fragment_metadata_uri); - auto&& [st, tile_opt] = - tile_io.read_generic(offset, encryption_key, storage_manager_->config()); + GenericTileIO tile_io(*context_resources_, fragment_metadata_uri); + auto&& [st, tile_opt] = tile_io.read_generic( + offset, encryption_key, context_resources_->config()); RETURN_NOT_OK_TUPLE(st, nullopt); return {Status::Ok(), std::move(*tile_opt)}; @@ -3954,7 +3955,7 @@ Status FragmentMetadata::read_file_footer( tile = make_shared(HERE(), Tile::from_generic(*footer_size)); - storage_manager_->stats()->add_counter("read_frag_meta_size", *footer_size); + context_resources_->stats().add_counter("read_frag_meta_size", *footer_size); if (memory_tracker_ != nullptr && !memory_tracker_->take_memory( @@ -3968,7 +3969,7 @@ Status FragmentMetadata::read_file_footer( } // Read footer - return storage_manager_->vfs()->read( + return context_resources_->vfs().read( fragment_metadata_uri, *footer_offset, tile->data_as(), @@ -3982,7 +3983,7 @@ Status FragmentMetadata::write_generic_tile_to_file( URI fragment_metadata_uri = fragment_uri_.join_path( std::string(constants::fragment_metadata_filename)); - GenericTileIO tile_io(storage_manager_->resources(), fragment_metadata_uri); + GenericTileIO tile_io(*context_resources_, fragment_metadata_uri); RETURN_NOT_OK(tile_io.write_generic(&tile, encryption_key, nbytes)); return Status::Ok(); @@ -3993,12 +3994,12 @@ Status FragmentMetadata::write_footer_to_file(WriterTile& tile) const { std::string(constants::fragment_metadata_filename)); uint64_t size = tile.size(); - RETURN_NOT_OK(storage_manager_->vfs()->write( + RETURN_NOT_OK(context_resources_->vfs().write( fragment_metadata_uri, tile.data(), tile.size())); // Write the size in the end if there is at least one var-sized dimension if (!array_schema_->domain().all_dims_fixed() || version_ >= 10) - return storage_manager_->vfs()->write( + return context_resources_->vfs().write( fragment_metadata_uri, &size, sizeof(uint64_t)); return Status::Ok(); } @@ -4015,7 +4016,7 @@ void FragmentMetadata::store_tile_offsets( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - storage_manager_->stats()->add_counter("write_tile_offsets_size", *nbytes); + context_resources_->stats().add_counter("write_tile_offsets_size", *nbytes); } void FragmentMetadata::write_tile_offsets( @@ -4043,7 +4044,7 @@ void FragmentMetadata::store_tile_var_offsets( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - storage_manager_->stats()->add_counter( + context_resources_->stats().add_counter( "write_tile_var_offsets_size", *nbytes); } @@ -4073,7 +4074,7 @@ void FragmentMetadata::store_tile_var_sizes( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - storage_manager_->stats()->add_counter("write_tile_var_sizes_size", *nbytes); + context_resources_->stats().add_counter("write_tile_var_sizes_size", *nbytes); } void FragmentMetadata::write_tile_var_sizes( @@ -4101,7 +4102,7 @@ void FragmentMetadata::store_tile_validity_offsets( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - storage_manager_->stats()->add_counter( + context_resources_->stats().add_counter( "write_tile_validity_offsets_size", *nbytes); } @@ -4131,7 +4132,7 @@ void FragmentMetadata::store_tile_mins( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - storage_manager_->stats()->add_counter("write_mins_size", *nbytes); + context_resources_->stats().add_counter("write_mins_size", *nbytes); } void FragmentMetadata::write_tile_mins(unsigned idx, Serializer& serializer) { @@ -4168,7 +4169,7 @@ void FragmentMetadata::store_tile_maxs( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - storage_manager_->stats()->add_counter("write_maxs_size", *nbytes); + context_resources_->stats().add_counter("write_maxs_size", *nbytes); } void FragmentMetadata::write_tile_maxs(unsigned idx, Serializer& serializer) { @@ -4205,7 +4206,7 @@ void FragmentMetadata::store_tile_sums( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - storage_manager_->stats()->add_counter("write_sums_size", *nbytes); + context_resources_->stats().add_counter("write_sums_size", *nbytes); } void FragmentMetadata::write_tile_sums(unsigned idx, Serializer& serializer) { @@ -4231,7 +4232,7 @@ void FragmentMetadata::store_tile_null_counts( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - storage_manager_->stats()->add_counter("write_null_counts_size", *nbytes); + context_resources_->stats().add_counter("write_null_counts_size", *nbytes); } void FragmentMetadata::write_tile_null_counts( @@ -4285,7 +4286,7 @@ void FragmentMetadata::store_fragment_min_max_sum_null_count( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - storage_manager_->stats()->add_counter("write_null_counts_size", *nbytes); + context_resources_->stats().add_counter("write_null_counts_size", *nbytes); } void FragmentMetadata::store_processed_conditions( @@ -4312,7 +4313,7 @@ void FragmentMetadata::store_processed_conditions( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - storage_manager_->stats()->add_counter( + context_resources_->stats().add_counter( "write_processed_conditions_size", *nbytes); } @@ -4642,7 +4643,7 @@ Status FragmentMetadata::store_footer(const EncryptionKey&) { RETURN_NOT_OK(write_footer(serializer)); RETURN_NOT_OK(write_footer_to_file(tile)); - storage_manager_->stats()->add_counter( + context_resources_->stats().add_counter( "write_frag_meta_footer_size", tile.size()); return Status::Ok(); @@ -4669,8 +4670,8 @@ void FragmentMetadata::clean_up() { auto fragment_metadata_uri = fragment_uri_.join_path(constants::fragment_metadata_filename); - throw_if_not_ok(storage_manager_->vfs()->close_file(fragment_metadata_uri)); - throw_if_not_ok(storage_manager_->vfs()->remove_file(fragment_metadata_uri)); + throw_if_not_ok(context_resources_->vfs().close_file(fragment_metadata_uri)); + throw_if_not_ok(context_resources_->vfs().remove_file(fragment_metadata_uri)); } const shared_ptr& FragmentMetadata::array_schema() const { diff --git a/tiledb/sm/fragment/fragment_metadata.h b/tiledb/sm/fragment/fragment_metadata.h index 7e89f6f83a9..68a43dcc950 100644 --- a/tiledb/sm/fragment/fragment_metadata.h +++ b/tiledb/sm/fragment/fragment_metadata.h @@ -45,7 +45,6 @@ #include "tiledb/sm/filesystem/uri.h" #include "tiledb/sm/misc/types.h" #include "tiledb/sm/rtree/rtree.h" -#include "tiledb/sm/storage_manager/storage_manager_declaration.h" using namespace tiledb::common; using namespace tiledb::type; @@ -75,7 +74,7 @@ class FragmentMetadata { /** * Constructor. * - * @param storage_manager A storage manager instance. + * @param resources A context resources instance. * @param memory_tracker The memory tracker of the array this fragment * metadata corresponds to. * @param array_schema The schema of the array the fragment belongs to. @@ -88,7 +87,7 @@ class FragmentMetadata { * @param has_delete_meta Does the fragment contains delete metadata. */ FragmentMetadata( - StorageManager* storage_manager, + ContextResources* resources, MemoryTracker* memory_tracker, const shared_ptr& array_schema, const URI& fragment_uri, @@ -1176,12 +1175,12 @@ class FragmentMetadata { return gt_offsets_; } - /** set the SM pointer during deserialization*/ - void set_storage_manager(StorageManager* sm) { - storage_manager_ = sm; + /** set the CR pointer during deserialization*/ + void set_context_resources(ContextResources* cr) { + context_resources_ = cr; } - /** set the SM pointer during deserialization*/ + /** set the memory tracker pointer during deserialization*/ void set_memory_tracker(MemoryTracker* memory_tracker) { memory_tracker_ = memory_tracker; } @@ -1222,7 +1221,7 @@ class FragmentMetadata { /* ********************************* */ /** The storage manager. */ - StorageManager* storage_manager_; + ContextResources* context_resources_; /** * The memory tracker of the array this fragment metadata corresponds to. diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index d389075cf6a..99f6bfa8f20 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -192,7 +192,8 @@ Status GlobalOrderWriter::alloc_global_write_state() { // Alloc FragmentMetadata object global_write_state_->frag_meta_ = make_shared(HERE()); // Used in serialization when FragmentMetadata is built from ground up - global_write_state_->frag_meta_->set_storage_manager(storage_manager_); + global_write_state_->frag_meta_->set_context_resources( + &storage_manager_->resources()); return Status::Ok(); } diff --git a/tiledb/sm/query/writers/unordered_writer.cc b/tiledb/sm/query/writers/unordered_writer.cc index 2875df2b37b..f707ccf2604 100644 --- a/tiledb/sm/query/writers/unordered_writer.cc +++ b/tiledb/sm/query/writers/unordered_writer.cc @@ -183,7 +183,7 @@ Status UnorderedWriter::alloc_frag_meta() { // Alloc FragmentMetadata object. frag_meta_ = make_shared(HERE()); // Used in serialization when FragmentMetadata is built from ground up. - frag_meta_->set_storage_manager(storage_manager_); + frag_meta_->set_context_resources(&storage_manager_->resources()); return Status::Ok(); } diff --git a/tiledb/sm/query/writers/writer_base.cc b/tiledb/sm/query/writers/writer_base.cc index 7aa0640c4d6..130eacb78e4 100644 --- a/tiledb/sm/query/writers/writer_base.cc +++ b/tiledb/sm/query/writers/writer_base.cc @@ -810,7 +810,7 @@ Status WriterBase::create_fragment( buffers_.count(constants::delete_timestamps) != 0; frag_meta = make_shared( HERE(), - storage_manager_, + &storage_manager_->resources(), nullptr, array_->array_schema_latest_ptr(), fragment_uri_, diff --git a/tiledb/sm/serialization/array.cc b/tiledb/sm/serialization/array.cc index 861ef705917..89e8227b5ce 100644 --- a/tiledb/sm/serialization/array.cc +++ b/tiledb/sm/serialization/array.cc @@ -314,7 +314,7 @@ Status array_from_capnp( array->array_schema_latest_ptr(), frag_meta_reader, meta, - storage_manager, + &storage_manager->resources(), array->memory_tracker())); if (client_side) { meta->set_rtree_loaded(); diff --git a/tiledb/sm/serialization/fragment_metadata.cc b/tiledb/sm/serialization/fragment_metadata.cc index 7c1763ea9e8..36f636aab75 100644 --- a/tiledb/sm/serialization/fragment_metadata.cc +++ b/tiledb/sm/serialization/fragment_metadata.cc @@ -120,12 +120,12 @@ Status fragment_metadata_from_capnp( const shared_ptr& array_schema, const capnp::FragmentMetadata::Reader& frag_meta_reader, shared_ptr frag_meta, - StorageManager* storage_manager, + ContextResources* resources, MemoryTracker* memory_tracker) { // TODO: consider a new constructor for fragment meta or using the // existing one - if (storage_manager) { - frag_meta->set_storage_manager(storage_manager); + if (resources) { + frag_meta->set_context_resources(resources); } if (memory_tracker) { frag_meta->set_memory_tracker(memory_tracker); diff --git a/tiledb/sm/serialization/fragment_metadata.h b/tiledb/sm/serialization/fragment_metadata.h index be740a10f49..59868fd4794 100644 --- a/tiledb/sm/serialization/fragment_metadata.h +++ b/tiledb/sm/serialization/fragment_metadata.h @@ -57,7 +57,7 @@ namespace serialization { * @param array_schema the schema of the array the metadata belongs * @param frag_meta_reader cap'n proto class * @param frag_meta fragment metadata object to deserialize into - * @param storage_manager storage manager associated + * @param resources ContextResources associated * @param memory_tracker memory tracker associated * @return Status */ @@ -65,7 +65,7 @@ Status fragment_metadata_from_capnp( const shared_ptr& array_schema, const capnp::FragmentMetadata::Reader& frag_meta_reader, shared_ptr frag_meta, - StorageManager* storage_manager = nullptr, + ContextResources* resources = nullptr, MemoryTracker* memory_tracker = nullptr); /** diff --git a/tiledb/sm/storage_manager/storage_manager.cc b/tiledb/sm/storage_manager/storage_manager.cc index e7377c6658b..81cf0e0d1d8 100644 --- a/tiledb/sm/storage_manager/storage_manager.cc +++ b/tiledb/sm/storage_manager/storage_manager.cc @@ -1956,7 +1956,7 @@ StorageManagerCanonical::load_fragment_metadata( RETURN_NOT_OK(vfs()->is_file(coords_uri, &sparse)); metadata = make_shared( HERE(), - this, + &resources(), memory_tracker, array_schema_latest, sf.uri_, @@ -1966,7 +1966,7 @@ StorageManagerCanonical::load_fragment_metadata( // Fragment format version > 2 metadata = make_shared( HERE(), - this, + &resources(), memory_tracker, array_schema_latest, sf.uri_, From 51115a164f2f4abeb6aebc3d99a6ee02802f0a10 Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Tue, 24 Oct 2023 13:44:38 +0300 Subject: [PATCH 02/11] Decouple `FragmentInfo` from `StorageManager`. --- tiledb/sm/c_api/tiledb.cc | 2 +- .../sm/consolidator/fragment_consolidator.cc | 4 +- tiledb/sm/fragment/fragment_info.cc | 37 ++++++++----------- tiledb/sm/fragment/fragment_info.h | 8 ++-- 4 files changed, 22 insertions(+), 29 deletions(-) diff --git a/tiledb/sm/c_api/tiledb.cc b/tiledb/sm/c_api/tiledb.cc index 111474dd9d4..43b6898f06a 100644 --- a/tiledb/sm/c_api/tiledb.cc +++ b/tiledb/sm/c_api/tiledb.cc @@ -4488,7 +4488,7 @@ int32_t tiledb_fragment_info_alloc( // Allocate a fragment info object (*fragment_info)->fragment_info_ = - new (std::nothrow) tiledb::sm::FragmentInfo(uri, ctx->storage_manager()); + new (std::nothrow) tiledb::sm::FragmentInfo(uri, ctx->resources()); if ((*fragment_info)->fragment_info_ == nullptr) { delete *fragment_info; *fragment_info = nullptr; diff --git a/tiledb/sm/consolidator/fragment_consolidator.cc b/tiledb/sm/consolidator/fragment_consolidator.cc index 58c818c604d..4f9f2d8bd7e 100644 --- a/tiledb/sm/consolidator/fragment_consolidator.cc +++ b/tiledb/sm/consolidator/fragment_consolidator.cc @@ -107,7 +107,7 @@ Status FragmentConsolidator::consolidate( // must be fetched (even before `config_.timestamp_start_`), // to compute the anterior ND range that can help determine // which dense fragments are consolidatable. - FragmentInfo fragment_info(URI(array_name), storage_manager_); + FragmentInfo fragment_info(URI(array_name), storage_manager_->resources()); auto st = fragment_info.load( array_for_reads->array_directory(), config_.timestamp_start_, @@ -216,7 +216,7 @@ Status FragmentConsolidator::consolidate_fragments( } // Get all fragment info - FragmentInfo fragment_info(URI(array_name), storage_manager_); + FragmentInfo fragment_info(URI(array_name), storage_manager_->resources()); auto st = fragment_info.load( array_for_reads->array_directory(), 0, diff --git a/tiledb/sm/fragment/fragment_info.cc b/tiledb/sm/fragment/fragment_info.cc index 255e2747666..a33a6260100 100644 --- a/tiledb/sm/fragment/fragment_info.cc +++ b/tiledb/sm/fragment/fragment_info.cc @@ -51,15 +51,14 @@ namespace tiledb::sm { /* ****************************** */ FragmentInfo::FragmentInfo() - : storage_manager_(nullptr) + : resources_(nullptr) , unconsolidated_metadata_num_(0) { } -FragmentInfo::FragmentInfo( - const URI& array_uri, StorageManager* storage_manager) +FragmentInfo::FragmentInfo(const URI& array_uri, ContextResources& resources) : array_uri_(array_uri) - , config_(storage_manager->config()) - , storage_manager_(storage_manager) + , config_(resources.config()) + , resources_(&resources) , unconsolidated_metadata_num_(0) { } @@ -778,7 +777,7 @@ shared_ptr FragmentInfo::get_array_schema(uint32_t fid) { EncryptionKey encryption_key; return ArrayDirectory::load_array_schema_from_uri( - storage_manager_->resources(), schema_uri, encryption_key); + *resources_, schema_uri, encryption_key); } Status FragmentInfo::get_array_schema_name( @@ -825,7 +824,7 @@ Status FragmentInfo::load() { RETURN_NOT_OK(set_default_timestamp_range()); if (array_uri_.is_tiledb()) { - auto rest_client = storage_manager_->rest_client(); + auto rest_client = resources_->rest_client(); if (rest_client == nullptr) { return LOG_STATUS(Status_ArrayError( "Cannot load fragment info; remote array with no REST client.")); @@ -841,10 +840,7 @@ Status FragmentInfo::load() { // Create an ArrayDirectory object and load ArrayDirectory array_dir( - storage_manager_->resources(), - array_uri_, - timestamp_start_, - timestamp_end_); + *resources_, array_uri_, timestamp_start_, timestamp_end_); return load(array_dir); } @@ -858,10 +854,7 @@ Status FragmentInfo::load( // Create an ArrayDirectory object and load ArrayDirectory array_dir( - storage_manager_->resources(), - array_uri_, - timestamp_start_, - timestamp_end_); + *resources_, array_uri_, timestamp_start_, timestamp_end_); return load(array_dir); } @@ -890,7 +883,7 @@ Status FragmentInfo::load(const ArrayDirectory& array_dir) { // Get the array schemas and fragment metadata. auto&& [st_schemas, array_schema_latest, array_schemas_all, fragment_metadata] = - storage_manager_->load_array_schemas_and_fragment_metadata( + resources_->load_array_schemas_and_fragment_metadata( array_dir, nullptr, enc_key_); RETURN_NOT_OK(st_schemas); const auto& fragment_metadata_value = fragment_metadata.value(); @@ -901,7 +894,7 @@ Status FragmentInfo::load(const ArrayDirectory& array_dir) { // Get fragment sizes std::vector sizes(fragment_num, 0); RETURN_NOT_OK(parallel_for( - storage_manager_->compute_tp(), + &resources_->compute_tp(), 0, fragment_num, [this, &fragment_metadata_value, &sizes, preload_rtrees](uint64_t i) { @@ -1032,7 +1025,7 @@ Status FragmentInfo::set_default_timestamp_range() { tuple> FragmentInfo::load( const URI& new_fragment_uri) const { SingleFragmentInfo ret; - auto vfs = storage_manager_->vfs(); + auto& vfs = resources_->vfs(); const auto& array_schema_latest = single_fragment_info_vec_.back().meta()->array_schema(); @@ -1049,7 +1042,7 @@ tuple> FragmentInfo::load( if (fragment_version <= 2) { URI coords_uri = new_fragment_uri.join_path(constants::coords + constants::file_suffix); - RETURN_NOT_OK_TUPLE(vfs->is_file(coords_uri, &sparse), nullopt); + RETURN_NOT_OK_TUPLE(vfs.is_file(coords_uri, &sparse), nullopt); } else { // Do nothing. It does not matter what the `sparse` value // is, since the FragmentMetadata object will load the correct @@ -1061,7 +1054,7 @@ tuple> FragmentInfo::load( // Get fragment non-empty domain auto meta = make_shared( HERE(), - &storage_manager_->resources(), + resources_, nullptr, array_schema_latest, new_fragment_uri, @@ -1138,7 +1131,7 @@ FragmentInfo FragmentInfo::clone() const { clone.array_schemas_all_ = array_schemas_all_; clone.config_ = config_; clone.single_fragment_info_vec_ = single_fragment_info_vec_; - clone.storage_manager_ = storage_manager_; + clone.resources_ = resources_; clone.to_vacuum_ = to_vacuum_; clone.unconsolidated_metadata_num_ = unconsolidated_metadata_num_; clone.anterior_ndrange_ = anterior_ndrange_; @@ -1154,7 +1147,7 @@ void FragmentInfo::swap(FragmentInfo& fragment_info) { std::swap(array_schemas_all_, fragment_info.array_schemas_all_); std::swap(config_, fragment_info.config_); std::swap(single_fragment_info_vec_, fragment_info.single_fragment_info_vec_); - std::swap(storage_manager_, fragment_info.storage_manager_); + std::swap(resources_, fragment_info.resources_); std::swap(to_vacuum_, fragment_info.to_vacuum_); std::swap( unconsolidated_metadata_num_, fragment_info.unconsolidated_metadata_num_); diff --git a/tiledb/sm/fragment/fragment_info.h b/tiledb/sm/fragment/fragment_info.h index 1539af3834f..902e9458b91 100644 --- a/tiledb/sm/fragment/fragment_info.h +++ b/tiledb/sm/fragment/fragment_info.h @@ -40,7 +40,7 @@ #include "tiledb/sm/crypto/encryption_key.h" #include "tiledb/sm/filesystem/uri.h" #include "tiledb/sm/fragment/single_fragment_info.h" -#include "tiledb/sm/storage_manager/storage_manager.h" +#include "tiledb/sm/storage_manager/context_resources.h" using namespace tiledb::common; @@ -58,7 +58,7 @@ class FragmentInfo { FragmentInfo(); /** Constructor. */ - FragmentInfo(const URI& array_uri, StorageManager* storage_manager); + FragmentInfo(const URI& array_uri, ContextResources& resources); /** Destructor. */ ~FragmentInfo(); @@ -395,8 +395,8 @@ class FragmentInfo { /** Information about fragments in the array. */ std::vector single_fragment_info_vec_; - /** The storage manager. */ - StorageManager* storage_manager_; + /** The context resources. */ + ContextResources* resources_; /** The URIs of the fragments to vacuum. */ std::vector to_vacuum_; From 5494b01f84ba65bef2274c5d03fc42a7bea8ec19 Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Tue, 24 Oct 2023 15:18:49 +0300 Subject: [PATCH 03/11] Update includes to account for missing `storage_header.h`. --- tiledb/sm/query/readers/filtered_data.h | 1 + tiledb/sm/serialization/array.cc | 1 + tiledb/sm/serialization/query.cc | 2 +- tiledb/sm/serialization/query_aggregates.h | 1 + tiledb/sm/subarray/subarray.cc | 1 + 5 files changed, 5 insertions(+), 1 deletion(-) diff --git a/tiledb/sm/query/readers/filtered_data.h b/tiledb/sm/query/readers/filtered_data.h index 2a899f9e0d7..0c2b2a9fb0e 100644 --- a/tiledb/sm/query/readers/filtered_data.h +++ b/tiledb/sm/query/readers/filtered_data.h @@ -35,6 +35,7 @@ #include "tiledb/common/common.h" #include "tiledb/common/status.h" +#include "tiledb/sm/storage_manager/storage_manager.h" using namespace tiledb::common; diff --git a/tiledb/sm/serialization/array.cc b/tiledb/sm/serialization/array.cc index 89e8227b5ce..812430e97ac 100644 --- a/tiledb/sm/serialization/array.cc +++ b/tiledb/sm/serialization/array.cc @@ -50,6 +50,7 @@ #include "tiledb/sm/serialization/array_directory.h" #include "tiledb/sm/serialization/array_schema.h" #include "tiledb/sm/serialization/fragment_metadata.h" +#include "tiledb/sm/storage_manager/storage_manager.h" using namespace tiledb::common; using namespace tiledb::sm::stats; diff --git a/tiledb/sm/serialization/query.cc b/tiledb/sm/serialization/query.cc index 44e8dabc693..65070c0f40e 100644 --- a/tiledb/sm/serialization/query.cc +++ b/tiledb/sm/serialization/query.cc @@ -74,7 +74,7 @@ #include "tiledb/sm/serialization/config.h" #include "tiledb/sm/serialization/fragment_metadata.h" #include "tiledb/sm/serialization/query.h" -#include "tiledb/sm/storage_manager/storage_manager_declaration.h" +#include "tiledb/sm/storage_manager/storage_manager.h" #include "tiledb/sm/subarray/subarray_partitioner.h" using namespace tiledb::common; diff --git a/tiledb/sm/serialization/query_aggregates.h b/tiledb/sm/serialization/query_aggregates.h index 774c9439c2d..9b254c9c750 100644 --- a/tiledb/sm/serialization/query_aggregates.h +++ b/tiledb/sm/serialization/query_aggregates.h @@ -35,6 +35,7 @@ #ifdef TILEDB_SERIALIZATION #include "tiledb/sm/serialization/capnp_utils.h" +#include "tiledb/sm/query/query.h" #endif namespace tiledb::sm::serialization { diff --git a/tiledb/sm/subarray/subarray.cc b/tiledb/sm/subarray/subarray.cc index 275615ddf90..cdd51e7b6fe 100644 --- a/tiledb/sm/subarray/subarray.cc +++ b/tiledb/sm/subarray/subarray.cc @@ -59,6 +59,7 @@ #include "tiledb/sm/stats/global_stats.h" #include "tiledb/sm/subarray/relevant_fragment_generator.h" #include "tiledb/sm/subarray/subarray.h" +#include "tiledb/sm/storage_manager/storage_manager.h" #include "tiledb/type/apply_with_type.h" #include "tiledb/type/range/range.h" From 54f3ea77dda17fbf02dc6303069cbff1283185a2 Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Tue, 24 Oct 2023 15:35:42 +0300 Subject: [PATCH 04/11] Move `load_array_schemas_and_fragment_metadata` and its dependencies to `FragmentInfo` and `FragmentMetadata`. --- tiledb/sm/array/array.cc | 11 +- tiledb/sm/fragment/fragment_info.cc | 119 ++++++++++- tiledb/sm/fragment/fragment_info.h | 29 +++ tiledb/sm/fragment/fragment_metadata.cc | 80 ++++++++ tiledb/sm/fragment/fragment_metadata.h | 40 ++++ tiledb/sm/storage_manager/storage_manager.cc | 188 +----------------- .../storage_manager_canonical.h | 94 --------- 7 files changed, 263 insertions(+), 298 deletions(-) diff --git a/tiledb/sm/array/array.cc b/tiledb/sm/array/array.cc index 730063ae24f..512de382a9c 100644 --- a/tiledb/sm/array/array.cc +++ b/tiledb/sm/array/array.cc @@ -1259,16 +1259,13 @@ tuple< Array::open_for_reads() { auto timer_se = resources_.stats().start_timer( "array_open_read_load_schemas_and_fragment_meta"); - auto&& [st, array_schema_latest, array_schemas_all, fragment_metadata] = - storage_manager_->load_array_schemas_and_fragment_metadata( - array_directory(), memory_tracker(), *encryption_key()); + auto result = FragmentInfo::load_array_schemas_and_fragment_metadata( + resources_, array_directory(), memory_tracker(), *encryption_key()); - throw_if_not_ok(st); - - auto version = array_schema_latest.value()->version(); + auto version = std::get<0>(result)->version(); ensure_supported_schema_version_for_read(version); - return {array_schema_latest, array_schemas_all, fragment_metadata}; + return result; } tuple< diff --git a/tiledb/sm/fragment/fragment_info.cc b/tiledb/sm/fragment/fragment_info.cc index a33a6260100..45dfcf8be99 100644 --- a/tiledb/sm/fragment/fragment_info.cc +++ b/tiledb/sm/fragment/fragment_info.cc @@ -42,6 +42,7 @@ #include "tiledb/sm/misc/tdb_time.h" #include "tiledb/sm/misc/utils.h" #include "tiledb/sm/rest/rest_client.h" +#include "tiledb/sm/tile/generic_tile_io.h" #include "tiledb/storage_format/uri/parse_uri.h" namespace tiledb::sm { @@ -882,14 +883,11 @@ Status FragmentInfo::load(const ArrayDirectory& array_dir) { } // Get the array schemas and fragment metadata. - auto&& [st_schemas, array_schema_latest, array_schemas_all, fragment_metadata] = - resources_->load_array_schemas_and_fragment_metadata( - array_dir, nullptr, enc_key_); - RETURN_NOT_OK(st_schemas); - const auto& fragment_metadata_value = fragment_metadata.value(); - array_schema_latest_ = array_schema_latest.value(); - array_schemas_all_ = std::move(array_schemas_all.value()); - auto fragment_num = (uint32_t)fragment_metadata_value.size(); + std::vector> fragment_metadata; + std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata) = + load_array_schemas_and_fragment_metadata( + *resources_, array_dir, nullptr, enc_key_); + auto fragment_num = (uint32_t)fragment_metadata.size(); // Get fragment sizes std::vector sizes(fragment_num, 0); @@ -897,10 +895,10 @@ Status FragmentInfo::load(const ArrayDirectory& array_dir) { &resources_->compute_tp(), 0, fragment_num, - [this, &fragment_metadata_value, &sizes, preload_rtrees](uint64_t i) { + [this, &fragment_metadata, &sizes, preload_rtrees](uint64_t i) { // Get fragment size. Applicable only to relevant fragments, including // fragments that are in the range [timestamp_start_, timestamp_end_]. - auto meta = fragment_metadata_value[i]; + auto meta = fragment_metadata[i]; if (meta->timestamp_range().first >= timestamp_start_ && meta->timestamp_range().second <= timestamp_end_) { sizes[i] = meta->fragment_size(); @@ -919,7 +917,7 @@ Status FragmentInfo::load(const ArrayDirectory& array_dir) { // Create the vector that will store the SingleFragmentInfo objects for (uint64_t fid = 0; fid < fragment_num; fid++) { - const auto meta = fragment_metadata_value[fid]; + const auto meta = fragment_metadata[fid]; const auto& array_schema = meta->array_schema(); const auto& non_empty_domain = meta->non_empty_domain(); @@ -980,6 +978,105 @@ Status FragmentInfo::load_and_replace( return Status::Ok(); } +tuple, optional>>> +load_consolidated_fragment_meta( + ContextResources& resources, + const URI& uri, + const EncryptionKey& enc_key) { + auto timer_se = + resources.stats().start_timer("sm_read_load_consolidated_frag_meta"); + + // No consolidated fragment metadata file + if (uri.to_string().empty()) + return {nullopt, nullopt}; + + auto&& tile = GenericTileIO::load(resources, uri, 0, enc_key); + + resources.stats().add_counter("consolidated_frag_meta_size", tile.size()); + + uint32_t fragment_num; + Deserializer deserializer(tile.data(), tile.size()); + fragment_num = deserializer.read(); + + uint64_t name_size, offset; + std::string name; + std::vector> ret; + ret.reserve(fragment_num); + for (uint32_t f = 0; f < fragment_num; ++f) { + name_size = deserializer.read(); + name.resize(name_size); + deserializer.read(&name[0], name_size); + offset = deserializer.read(); + ret.emplace_back(name, offset); + } + + return {std::move(tile), std::move(ret)}; +} + +std::tuple< + shared_ptr, + std::unordered_map>, + std::vector>> +FragmentInfo::load_array_schemas_and_fragment_metadata( + ContextResources& resources, + const ArrayDirectory& array_dir, + MemoryTracker* memory_tracker, + const EncryptionKey& enc_key) { + auto timer_se = resources.stats().start_timer( + "sm_load_array_schemas_and_fragment_metadata"); + + // Load array schemas + auto&& [array_schema_latest, array_schemas_all] = + array_dir.load_array_schemas(enc_key); + + const auto filtered_fragment_uris = [&]() { + auto timer_se = + resources.stats().start_timer("sm_load_filtered_fragment_uris"); + return array_dir.filtered_fragment_uris(array_schema_latest->dense()); + }(); + const auto& meta_uris = array_dir.fragment_meta_uris(); + const auto& fragments_to_load = filtered_fragment_uris.fragment_uris(); + + // Get the consolidated fragment metadatas + std::vector> fragment_metadata_tiles(meta_uris.size()); + std::vector>> offsets_vectors( + meta_uris.size()); + auto status = + parallel_for(&resources.compute_tp(), 0, meta_uris.size(), [&](size_t i) { + auto&& [tile_opt, offsets] = + load_consolidated_fragment_meta(resources, meta_uris[i], enc_key); + fragment_metadata_tiles[i] = + make_shared(HERE(), std::move(*tile_opt)); + offsets_vectors[i] = std::move(offsets.value()); + return Status::Ok(); + }); + throw_if_not_ok(status); + + // Get the unique fragment metadatas into a map. + std::unordered_map> offsets; + for (uint64_t i = 0; i < offsets_vectors.size(); i++) { + for (auto& offset : offsets_vectors[i]) { + if (offsets.count(offset.first) == 0) { + offsets.emplace( + offset.first, + std::make_pair(fragment_metadata_tiles[i].get(), offset.second)); + } + } + } + + // Load the fragment metadata + auto&& fragment_metadata = FragmentMetadata::load( + resources, + memory_tracker, + array_schema_latest, + array_schemas_all, + enc_key, + fragments_to_load, + offsets); + + return {array_schema_latest, array_schemas_all, fragment_metadata}; +} + const std::vector& FragmentInfo::single_fragment_info_vec() const { return single_fragment_info_vec_; diff --git a/tiledb/sm/fragment/fragment_info.h b/tiledb/sm/fragment/fragment_info.h index 902e9458b91..04c095b86cb 100644 --- a/tiledb/sm/fragment/fragment_info.h +++ b/tiledb/sm/fragment/fragment_info.h @@ -298,6 +298,35 @@ class FragmentInfo { const URI& new_fragment_uri, const std::vector& to_replace); + /** + * Returns the array schemas and fragment metadata for the given array. + * The function will focus only on relevant schemas and metadata as + * dictated by the input URI manager. + * + * @param array_dir The ArrayDirectory object used to retrieve the + * various URIs in the array directory. + * @param memory_tracker The memory tracker of the array + * for which the fragment metadata is loaded. + * @param enc_key The encryption key to use. + * @return tuple of Status, latest ArraySchema, map of all array schemas and + * vector of FragmentMetadata + * Status Ok on success, else error + * ArraySchema The array schema to be retrieved after the + * array is opened. + * ArraySchemaMap Map of all array schemas found keyed by name + * fragment_metadata The fragment metadata to be retrieved + * after the array is opened. + */ + static tuple< + shared_ptr, + std::unordered_map>, + std::vector>> + load_array_schemas_and_fragment_metadata( + ContextResources& resources, + const ArrayDirectory& array_dir, + MemoryTracker* memory_tracker, + const EncryptionKey& enc_key); + /** Returns the vector with the info about individual fragments. */ const std::vector& single_fragment_info_vec() const; diff --git a/tiledb/sm/fragment/fragment_metadata.cc b/tiledb/sm/fragment/fragment_metadata.cc index 5c678273f64..e6f1d35ef6a 100644 --- a/tiledb/sm/fragment/fragment_metadata.cc +++ b/tiledb/sm/fragment/fragment_metadata.cc @@ -784,6 +784,86 @@ Status FragmentMetadata::init(const NDRange& non_empty_domain) { return Status::Ok(); } +std::vector> +FragmentMetadata::load( + ContextResources& resources, + MemoryTracker* memory_tracker, + const shared_ptr array_schema_latest, + const std::unordered_map>& + array_schemas_all, + const EncryptionKey& encryption_key, + const std::vector& fragments_to_load, + const std::unordered_map>& + offsets) { + auto timer_se = resources.stats().start_timer("sm_load_fragment_metadata"); + + // Load the metadata for each fragment + auto fragment_num = fragments_to_load.size(); + std::vector> fragment_metadata; + fragment_metadata.resize(fragment_num); + auto status = parallel_for(&resources.compute_tp(), 0, fragment_num, [&](size_t f) { + const auto& sf = fragments_to_load[f]; + + URI coords_uri = + sf.uri_.join_path(constants::coords + constants::file_suffix); + + auto name = sf.uri_.remove_trailing_slash().last_path_part(); + auto format_version = utils::parse::get_fragment_version(name); + + // Note that the fragment metadata version is >= the array schema + // version. Therefore, the check below is defensive and will always + // ensure backwards compatibility. + shared_ptr metadata; + if (format_version <= 2) { + bool sparse; + RETURN_NOT_OK(resources.vfs().is_file(coords_uri, &sparse)); + metadata = make_shared( + HERE(), + &resources, + memory_tracker, + array_schema_latest, + sf.uri_, + sf.timestamp_range_, + !sparse); + } else { + // Fragment format version > 2 + metadata = make_shared( + HERE(), + &resources, + memory_tracker, + array_schema_latest, + sf.uri_, + sf.timestamp_range_); + } + + // Potentially find the basic fragment metadata in the consolidated + // metadata buffer + Tile* fragment_metadata_tile = nullptr; + uint64_t offset = 0; + + auto it = offsets.end(); + if (metadata->format_version() >= 9) { + it = offsets.find(name); + } else { + it = offsets.find(sf.uri_.to_string()); + } + if (it != offsets.end()) { + fragment_metadata_tile = it->second.first; + offset = it->second.second; + } + + // Load fragment metadata + RETURN_NOT_OK(metadata->load( + encryption_key, fragment_metadata_tile, offset, array_schemas_all)); + + fragment_metadata[f] = metadata; + return Status::Ok(); + }); + throw_if_not_ok(status); + + return fragment_metadata; +} + Status FragmentMetadata::load( const EncryptionKey& encryption_key, Tile* fragment_metadata_tile, diff --git a/tiledb/sm/fragment/fragment_metadata.h b/tiledb/sm/fragment/fragment_metadata.h index 68a43dcc950..61509202b5d 100644 --- a/tiledb/sm/fragment/fragment_metadata.h +++ b/tiledb/sm/fragment/fragment_metadata.h @@ -45,6 +45,7 @@ #include "tiledb/sm/filesystem/uri.h" #include "tiledb/sm/misc/types.h" #include "tiledb/sm/rtree/rtree.h" +#include "tiledb/sm/storage_manager/context_resources.h" using namespace tiledb::common; using namespace tiledb::type; @@ -452,6 +453,45 @@ class FragmentMetadata { uint64_t offset, std::unordered_map> array_schemas); + /** + * Loads the fragment metadata of an open array given a vector of + * fragment URIs `fragments_to_load`. + * The function stores the fragment metadata of each fragment + * in `fragments_to_load` into the returned vector, such + * that there is a one-to-one correspondence between the two vectors. + * + * If `meta_buf` has data, then some fragment metadata may be contained + * in there and does not need to be loaded from storage. In that + * case, `offsets` helps identifying each fragment metadata in the + * buffer. + * + * @param resources A context resources instance. + * @param memory_tracker The memory tracker of the array + * for which the metadata is loaded. This will be passed to + * the constructor of each of the metadata loaded. + * @param array_schema_latest The latest array schema. + * @param array_schemas_all All the array schemas in a map keyed by the + * schema filename. + * @param encryption_key The encryption key to use. + * @param fragments_to_load The fragments whose metadata to load. + * @param offsets A map from a fragment name to an offset in `meta_buff` + * where the basic fragment metadata can be found. If the offset + * cannot be found, then the metadata of that fragment will be loaded from + * storage instead. + * @return Vector of FragmentMetadata is the fragment metadata to be + * retrieved. + */ + static std::vector> load( + ContextResources& resources, + MemoryTracker* memory_tracker, + const shared_ptr array_schema, + const std::unordered_map>& + array_schemas_all, + const EncryptionKey& encryption_key, + const std::vector& fragments_to_load, + const std::unordered_map>& + offsets); + /** Stores all the metadata to storage. */ void store(const EncryptionKey& encryption_key); diff --git a/tiledb/sm/storage_manager/storage_manager.cc b/tiledb/sm/storage_manager/storage_manager.cc index 81cf0e0d1d8..60dc4cf2710 100644 --- a/tiledb/sm/storage_manager/storage_manager.cc +++ b/tiledb/sm/storage_manager/storage_manager.cc @@ -169,68 +169,6 @@ Status StorageManagerCanonical::group_close_for_writes(Group* group) { return Status::Ok(); } -std::tuple< - Status, - optional>, - optional>>, - optional>>> -StorageManagerCanonical::load_array_schemas_and_fragment_metadata( - const ArrayDirectory& array_dir, - MemoryTracker* memory_tracker, - const EncryptionKey& enc_key) { - auto timer_se = - stats()->start_timer("sm_load_array_schemas_and_fragment_metadata"); - - // Load array schemas - auto&& [array_schema_latest, array_schemas_all] = - array_dir.load_array_schemas(enc_key); - - const auto filtered_fragment_uris = - load_filtered_fragment_uris(array_schema_latest->dense(), array_dir); - const auto& meta_uris = array_dir.fragment_meta_uris(); - const auto& fragments_to_load = filtered_fragment_uris.fragment_uris(); - - // Get the consolidated fragment metadatas - std::vector> fragment_metadata_tiles(meta_uris.size()); - std::vector>> offsets_vectors( - meta_uris.size()); - auto status = parallel_for(compute_tp(), 0, meta_uris.size(), [&](size_t i) { - auto&& [st, tile_opt, offsets] = - load_consolidated_fragment_meta(meta_uris[i], enc_key); - RETURN_NOT_OK(st); - fragment_metadata_tiles[i] = - make_shared(HERE(), std::move(*tile_opt)); - offsets_vectors[i] = std::move(offsets.value()); - return st; - }); - RETURN_NOT_OK_TUPLE(status, nullopt, nullopt, nullopt); - - // Get the unique fragment metadatas into a map. - std::unordered_map> offsets; - for (uint64_t i = 0; i < offsets_vectors.size(); i++) { - for (auto& offset : offsets_vectors[i]) { - if (offsets.count(offset.first) == 0) { - offsets.emplace( - offset.first, - std::make_pair(fragment_metadata_tiles[i].get(), offset.second)); - } - } - } - - // Load the fragment metadata - auto&& [st_fragment_meta, fragment_metadata] = load_fragment_metadata( - memory_tracker, - array_schema_latest, - array_schemas_all, - enc_key, - fragments_to_load, - offsets); - RETURN_NOT_OK_TUPLE(st_fragment_meta, nullopt, nullopt, nullopt); - - return { - Status::Ok(), array_schema_latest, array_schemas_all, fragment_metadata}; -} - tuple>>> StorageManagerCanonical::array_load_fragments( Array* array, const std::vector& fragments_to_load) { @@ -238,14 +176,14 @@ StorageManagerCanonical::array_load_fragments( // Load the fragment metadata std::unordered_map> offsets; - auto&& [st_fragment_meta, fragment_metadata] = load_fragment_metadata( + auto&& fragment_metadata = FragmentMetadata::load( + resources(), array->memory_tracker(), array->array_schema_latest_ptr(), array->array_schemas_all(), *array->encryption_key(), fragments_to_load, offsets); - RETURN_NOT_OK_TUPLE(st_fragment_meta, nullopt); return {Status::Ok(), fragment_metadata}; } @@ -1922,128 +1860,6 @@ void StorageManagerCanonical::load_group_metadata( /* PRIVATE METHODS */ /* ****************************** */ -tuple>>> -StorageManagerCanonical::load_fragment_metadata( - MemoryTracker* memory_tracker, - const shared_ptr& array_schema_latest, - const std::unordered_map>& - array_schemas_all, - const EncryptionKey& encryption_key, - const std::vector& fragments_to_load, - const std::unordered_map>& - offsets) { - auto timer_se = stats()->start_timer("sm_load_fragment_metadata"); - - // Load the metadata for each fragment - auto fragment_num = fragments_to_load.size(); - std::vector> fragment_metadata; - fragment_metadata.resize(fragment_num); - auto status = parallel_for(compute_tp(), 0, fragment_num, [&](size_t f) { - const auto& sf = fragments_to_load[f]; - - URI coords_uri = - sf.uri_.join_path(constants::coords + constants::file_suffix); - - auto name = sf.uri_.remove_trailing_slash().last_path_part(); - auto format_version = utils::parse::get_fragment_version(name); - - // Note that the fragment metadata version is >= the array schema - // version. Therefore, the check below is defensive and will always - // ensure backwards compatibility. - shared_ptr metadata; - if (format_version <= 2) { - bool sparse; - RETURN_NOT_OK(vfs()->is_file(coords_uri, &sparse)); - metadata = make_shared( - HERE(), - &resources(), - memory_tracker, - array_schema_latest, - sf.uri_, - sf.timestamp_range_, - !sparse); - } else { - // Fragment format version > 2 - metadata = make_shared( - HERE(), - &resources(), - memory_tracker, - array_schema_latest, - sf.uri_, - sf.timestamp_range_); - } - - // Potentially find the basic fragment metadata in the consolidated - // metadata buffer - Tile* fragment_metadata_tile = nullptr; - uint64_t offset = 0; - - auto it = offsets.end(); - if (metadata->format_version() >= 9) { - it = offsets.find(name); - } else { - it = offsets.find(sf.uri_.to_string()); - } - if (it != offsets.end()) { - fragment_metadata_tile = it->second.first; - offset = it->second.second; - } - - // Load fragment metadata - RETURN_NOT_OK(metadata->load( - encryption_key, fragment_metadata_tile, offset, array_schemas_all)); - - fragment_metadata[f] = metadata; - return Status::Ok(); - }); - RETURN_NOT_OK_TUPLE(status, nullopt); - - return {Status::Ok(), fragment_metadata}; -} - -tuple< - Status, - optional, - optional>>> -StorageManagerCanonical::load_consolidated_fragment_meta( - const URI& uri, const EncryptionKey& enc_key) { - auto timer_se = stats()->start_timer("sm_read_load_consolidated_frag_meta"); - - // No consolidated fragment metadata file - if (uri.to_string().empty()) - return {Status::Ok(), nullopt, nullopt}; - - auto&& tile = GenericTileIO::load(resources_, uri, 0, enc_key); - - stats()->add_counter("consolidated_frag_meta_size", tile.size()); - - uint32_t fragment_num; - Deserializer deserializer(tile.data(), tile.size()); - fragment_num = deserializer.read(); - - uint64_t name_size, offset; - std::string name; - std::vector> ret; - ret.reserve(fragment_num); - for (uint32_t f = 0; f < fragment_num; ++f) { - name_size = deserializer.read(); - name.resize(name_size); - deserializer.read(&name[0], name_size); - offset = deserializer.read(); - ret.emplace_back(name, offset); - } - - return {Status::Ok(), std::move(tile), ret}; -} - -const ArrayDirectory::FilteredFragmentUris -StorageManagerCanonical::load_filtered_fragment_uris( - const bool dense, const ArrayDirectory& array_dir) { - auto timer_se = stats()->start_timer("sm_load_filtered_fragment_uris"); - - return array_dir.filtered_fragment_uris(dense); -} - Status StorageManagerCanonical::set_default_tags() { const auto version = std::to_string(constants::library_version[0]) + "." + std::to_string(constants::library_version[1]) + "." + diff --git a/tiledb/sm/storage_manager/storage_manager_canonical.h b/tiledb/sm/storage_manager/storage_manager_canonical.h index d2a502978f9..3c5a2e26909 100644 --- a/tiledb/sm/storage_manager/storage_manager_canonical.h +++ b/tiledb/sm/storage_manager/storage_manager_canonical.h @@ -227,35 +227,6 @@ class StorageManagerCanonical { tdb_shared_ptr group, const EncryptionKey& encryption_key); - /** - * Returns the array schemas and fragment metadata for the given array. - * The function will focus only on relevant schemas and metadata as - * dictated by the input URI manager. - * - * @param array_dir The ArrayDirectory object used to retrieve the - * various URIs in the array directory. - * @param memory_tracker The memory tracker of the array - * for which the fragment metadata is loaded. - * @param enc_key The encryption key to use. - * @return tuple of Status, latest ArraySchema, map of all array schemas and - * vector of FragmentMetadata - * Status Ok on success, else error - * ArraySchema The array schema to be retrieved after the - * array is opened. - * ArraySchemaMap Map of all array schemas found keyed by name - * fragment_metadata The fragment metadata to be retrieved - * after the array is opened. - */ - tuple< - Status, - optional>, - optional>>, - optional>>> - load_array_schemas_and_fragment_metadata( - const ArrayDirectory& array_dir, - MemoryTracker* memory_tracker, - const EncryptionKey& enc_key); - /** * Opens an group for reads. * @@ -929,71 +900,6 @@ class StorageManagerCanonical { /** Increment the count of in-progress queries. */ void increment_in_progress(); - /** - * Loads the fragment metadata of an open array given a vector of - * fragment URIs `fragments_to_load`. - * The function stores the fragment metadata of each fragment - * in `fragments_to_load` into the returned vector, such - * that there is a one-to-one correspondence between the two vectors. - * - * If `meta_buf` has data, then some fragment metadata may be contained - * in there and does not need to be loaded from storage. In that - * case, `offsets` helps identifying each fragment metadata in the - * buffer. - * - * @param memory_tracker The memory tracker of the array - * for which the metadata is loaded. This will be passed to - * the constructor of each of the metadata loaded. - * @param array_schema_latest The latest array schema. - * @param array_schemas_all All the array schemas in a map keyed by the - * schema filename. - * @param encryption_key The encryption key to use. - * @param fragments_to_load The fragments whose metadata to load. - * @param offsets A map from a fragment name to an offset in `meta_buff` - * where the basic fragment metadata can be found. If the offset - * cannot be found, then the metadata of that fragment will be loaded from - * storage instead. - * @return tuple of Status and vector of FragmentMetadata - * Status Ok on success, else error - * Vector of FragmentMetadata is the fragment metadata to be retrieved. - */ - tuple>>> - load_fragment_metadata( - MemoryTracker* memory_tracker, - const shared_ptr& array_schema, - const std::unordered_map>& - array_schemas_all, - const EncryptionKey& encryption_key, - const std::vector& fragments_to_load, - const std::unordered_map>& - offsets); - - /** - * Loads the latest consolidated fragment metadata from storage. - * - * @param uri The URI of the consolidated fragment metadata. - * @param enc_key The encryption key that may be needed to access the file. - * @param f_buff The buffer to hold the consolidated fragment metadata. - * @return Status, vector from the fragment name to the offset in `f_buff` - * where the basic fragment metadata starts. - */ - tuple< - Status, - optional, - optional>>> - load_consolidated_fragment_meta(const URI& uri, const EncryptionKey& enc_key); - - /** - * Loads the filtered fragment URIs from the array directory. - * - * @param dense Is this a dense array. - * @param array_dir The ArrayDirectory object used to retrieve the - * various URIs in the array directory. - * @return Filtered fragment URIs. - */ - const ArrayDirectory::FilteredFragmentUris load_filtered_fragment_uris( - const bool dense, const ArrayDirectory& array_dir); - /** Block until there are zero in-progress queries. */ void wait_for_zero_in_progress(); From f1ea9c98af32e8cd287e0c6c941d9d886866be38 Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Tue, 24 Oct 2023 15:46:27 +0300 Subject: [PATCH 05/11] Move `array_load_fragments` to `Array` and refactor it a bit. --- test/src/unit-average-cell-size.cc | 2 +- tiledb/sm/array/array.cc | 55 ++++++++----------- tiledb/sm/array/array.h | 19 +++---- .../sm/consolidator/fragment_consolidator.cc | 2 +- tiledb/sm/storage_manager/storage_manager.cc | 19 ------- .../storage_manager_canonical.h | 11 ---- 6 files changed, 34 insertions(+), 74 deletions(-) diff --git a/test/src/unit-average-cell-size.cc b/test/src/unit-average-cell-size.cc index f05cd410eb7..a6663fb1e34 100644 --- a/test/src/unit-average-cell-size.cc +++ b/test/src/unit-average-cell-size.cc @@ -212,7 +212,7 @@ struct CPPAverageCellSizeFx { ->open_without_fragments( sm::EncryptionType::NO_ENCRYPTION, nullptr, 0) .ok()); - REQUIRE(array_for_reads->load_fragments(uris).ok()); + array_for_reads->load_fragments(uris); auto avg_cell_sizes = array_for_reads->get_average_var_cell_sizes(); CHECK(avg_cell_sizes["d2"] == d2_size); diff --git a/tiledb/sm/array/array.cc b/tiledb/sm/array/array.cc index 512de382a9c..c53e0fb88be 100644 --- a/tiledb/sm/array/array.cc +++ b/tiledb/sm/array/array.cc @@ -209,10 +209,8 @@ Status Array::open_without_fragments( array_dir_ = ArrayDirectory( resources_, array_uri_, 0, UINT64_MAX, ArrayDirectoryMode::READ); } - auto&& [array_schema, array_schemas] = open_for_reads_without_fragments(); - - array_schema_latest_ = array_schema.value(); - array_schemas_all_ = array_schemas.value(); + std::tie(array_schema_latest_, array_schemas_all_) = + open_for_reads_without_fragments(); } } catch (std::exception& e) { set_array_closed(); @@ -223,15 +221,20 @@ Status Array::open_without_fragments( return Status::Ok(); } -Status Array::load_fragments( +void Array::load_fragments( const std::vector& fragments_to_load) { - auto&& [st, fragment_metadata] = - storage_manager_->array_load_fragments(this, fragments_to_load); - RETURN_NOT_OK(st); + auto timer_se = resources_.stats().start_timer("sm_array_load_fragments"); - fragment_metadata_ = std::move(fragment_metadata.value()); - - return Status::Ok(); + // Load the fragment metadata + std::unordered_map> offsets; + fragment_metadata_ = FragmentMetadata::load( + resources_, + memory_tracker(), + array_schema_latest_ptr(), + array_schemas_all(), + *encryption_key(), + fragments_to_load, + offsets); } Status Array::open( @@ -380,13 +383,8 @@ Status Array::open( array_dir_ = ArrayDirectory( resources_, array_uri_, timestamp_start_, timestamp_end_opened_at_); } - auto&& [array_schema_latest, array_schemas, fragment_metadata] = + std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata_) = open_for_reads(); - - // Set schemas - array_schema_latest_ = array_schema_latest.value(); - array_schemas_all_ = array_schemas.value(); - fragment_metadata_ = fragment_metadata.value(); } else if ( query_type == QueryType::WRITE || query_type == QueryType::MODIFY_EXCLUSIVE) { @@ -895,13 +893,9 @@ Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) { return LOG_STATUS(Status_ArrayDirectoryError(le.what())); } - auto&& [array_schema_latest, array_schemas, fragment_metadata] = + std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata_) = open_for_reads(); - array_schema_latest_ = array_schema_latest.value(); - array_schemas_all_ = array_schemas.value(); - fragment_metadata_ = fragment_metadata.value(); - return Status::Ok(); } @@ -1253,9 +1247,9 @@ std::unordered_map Array::get_average_var_cell_sizes() /* ********************************* */ tuple< - optional>, - optional>>, - optional>>> + shared_ptr, + std::unordered_map>, + std::vector>> Array::open_for_reads() { auto timer_se = resources_.stats().start_timer( "array_open_read_load_schemas_and_fragment_meta"); @@ -1269,20 +1263,19 @@ Array::open_for_reads() { } tuple< - optional>, - optional>>> + shared_ptr, + std::unordered_map>> Array::open_for_reads_without_fragments() { auto timer_se = resources_.stats().start_timer( "array_open_read_without_fragments_load_schemas"); // Load array schemas - auto&& [array_schema_latest, array_schemas_all] = - array_dir_.load_array_schemas(*encryption_key()); + auto result = array_dir_.load_array_schemas(*encryption_key()); - auto version = array_schema_latest->version(); + auto version = std::get<0>(result)->version(); ensure_supported_schema_version_for_read(version); - return {array_schema_latest, array_schemas_all}; + return result; } tuple< diff --git a/tiledb/sm/array/array.h b/tiledb/sm/array/array.h index 24965a75a83..8023c5efc9e 100644 --- a/tiledb/sm/array/array.h +++ b/tiledb/sm/array/array.h @@ -179,9 +179,8 @@ class Array { * Reload the array with the specified fragments. * * @param fragments_to_load The list of fragments to load. - * @return Status */ - Status load_fragments(const std::vector& fragments_to_load); + void load_fragments(const std::vector& fragments_to_load); /** * Opens the array for reading. @@ -716,9 +715,8 @@ class Array { * `timestamp_start` and `timestamp_end`. * * @param array The array to be opened. - * @return tuple of Status, latest ArraySchema, map of all array schemas and + * @return tuple latest ArraySchema, map of all array schemas and * vector of FragmentMetadata - * Status Ok on success, else error * ArraySchema The array schema to be retrieved after the * array is opened. * ArraySchemaMap Map of all array schemas found keyed by name @@ -726,24 +724,23 @@ class Array { * after the array is opened. */ tuple< - optional>, - optional>>, - optional>>> + shared_ptr, + std::unordered_map>, + std::vector>> open_for_reads(); /** * Opens an array for reads without fragments. * * @param array The array to be opened. - * @return tuple of Status, latest ArraySchema and map of all array schemas - * Status Ok on success, else error + * @return tuple of latest ArraySchema and map of all array schemas * ArraySchema The array schema to be retrieved after the * array is opened. * ArraySchemaMap Map of all array schemas found keyed by name */ tuple< - optional>, - optional>>> + shared_ptr, + std::unordered_map>> open_for_reads_without_fragments(); /** Opens an array for writes. diff --git a/tiledb/sm/consolidator/fragment_consolidator.cc b/tiledb/sm/consolidator/fragment_consolidator.cc index 4f9f2d8bd7e..a4045621a80 100644 --- a/tiledb/sm/consolidator/fragment_consolidator.cc +++ b/tiledb/sm/consolidator/fragment_consolidator.cc @@ -371,7 +371,7 @@ Status FragmentConsolidator::consolidate_internal( URI* new_fragment_uri) { auto timer_se = stats_->start_timer("consolidate_internal"); - RETURN_NOT_OK(array_for_reads->load_fragments(to_consolidate)); + array_for_reads->load_fragments(to_consolidate); if (array_for_reads->is_empty()) { return Status::Ok(); diff --git a/tiledb/sm/storage_manager/storage_manager.cc b/tiledb/sm/storage_manager/storage_manager.cc index 60dc4cf2710..13773ed8ba5 100644 --- a/tiledb/sm/storage_manager/storage_manager.cc +++ b/tiledb/sm/storage_manager/storage_manager.cc @@ -169,25 +169,6 @@ Status StorageManagerCanonical::group_close_for_writes(Group* group) { return Status::Ok(); } -tuple>>> -StorageManagerCanonical::array_load_fragments( - Array* array, const std::vector& fragments_to_load) { - auto timer_se = stats()->start_timer("sm_array_load_fragments"); - - // Load the fragment metadata - std::unordered_map> offsets; - auto&& fragment_metadata = FragmentMetadata::load( - resources(), - array->memory_tracker(), - array->array_schema_latest_ptr(), - array->array_schemas_all(), - *array->encryption_key(), - fragments_to_load, - offsets); - - return {Status::Ok(), fragment_metadata}; -} - Status StorageManagerCanonical::array_consolidate( const char* array_name, EncryptionType encryption_type, diff --git a/tiledb/sm/storage_manager/storage_manager_canonical.h b/tiledb/sm/storage_manager/storage_manager_canonical.h index 3c5a2e26909..903157f833d 100644 --- a/tiledb/sm/storage_manager/storage_manager_canonical.h +++ b/tiledb/sm/storage_manager/storage_manager_canonical.h @@ -246,17 +246,6 @@ class StorageManagerCanonical { std::tuple>> group_open_for_writes(Group* group); - /** - * Load fragments for an already open array. - * - * @param array The open array. - * @param fragment_info The list of fragment info. - * @return Status, the fragment metadata to be loaded. - */ - tuple>>> - array_load_fragments( - Array* array, const std::vector& fragment_info); - /** * Consolidates the fragments of an array into a single one. * From 8e4f1abfd7155f84eeddd886a40fd8bfd4569d0c Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Tue, 24 Oct 2023 20:04:43 +0300 Subject: [PATCH 06/11] clang-format --- tiledb/sm/fragment/fragment_info.cc | 4 +- tiledb/sm/fragment/fragment_metadata.cc | 112 ++++++++++----------- tiledb/sm/serialization/query_aggregates.h | 3 +- tiledb/sm/subarray/subarray.cc | 2 +- 4 files changed, 60 insertions(+), 61 deletions(-) diff --git a/tiledb/sm/fragment/fragment_info.cc b/tiledb/sm/fragment/fragment_info.cc index 45dfcf8be99..dd0835394a0 100644 --- a/tiledb/sm/fragment/fragment_info.cc +++ b/tiledb/sm/fragment/fragment_info.cc @@ -980,9 +980,7 @@ Status FragmentInfo::load_and_replace( tuple, optional>>> load_consolidated_fragment_meta( - ContextResources& resources, - const URI& uri, - const EncryptionKey& enc_key) { + ContextResources& resources, const URI& uri, const EncryptionKey& enc_key) { auto timer_se = resources.stats().start_timer("sm_read_load_consolidated_frag_meta"); diff --git a/tiledb/sm/fragment/fragment_metadata.cc b/tiledb/sm/fragment/fragment_metadata.cc index e6f1d35ef6a..e93f90f9c2b 100644 --- a/tiledb/sm/fragment/fragment_metadata.cc +++ b/tiledb/sm/fragment/fragment_metadata.cc @@ -784,8 +784,7 @@ Status FragmentMetadata::init(const NDRange& non_empty_domain) { return Status::Ok(); } -std::vector> -FragmentMetadata::load( +std::vector> FragmentMetadata::load( ContextResources& resources, MemoryTracker* memory_tracker, const shared_ptr array_schema_latest, @@ -801,64 +800,65 @@ FragmentMetadata::load( auto fragment_num = fragments_to_load.size(); std::vector> fragment_metadata; fragment_metadata.resize(fragment_num); - auto status = parallel_for(&resources.compute_tp(), 0, fragment_num, [&](size_t f) { - const auto& sf = fragments_to_load[f]; - - URI coords_uri = - sf.uri_.join_path(constants::coords + constants::file_suffix); - - auto name = sf.uri_.remove_trailing_slash().last_path_part(); - auto format_version = utils::parse::get_fragment_version(name); - - // Note that the fragment metadata version is >= the array schema - // version. Therefore, the check below is defensive and will always - // ensure backwards compatibility. - shared_ptr metadata; - if (format_version <= 2) { - bool sparse; - RETURN_NOT_OK(resources.vfs().is_file(coords_uri, &sparse)); - metadata = make_shared( - HERE(), - &resources, - memory_tracker, - array_schema_latest, - sf.uri_, - sf.timestamp_range_, - !sparse); - } else { - // Fragment format version > 2 - metadata = make_shared( - HERE(), - &resources, - memory_tracker, - array_schema_latest, - sf.uri_, - sf.timestamp_range_); - } + auto status = + parallel_for(&resources.compute_tp(), 0, fragment_num, [&](size_t f) { + const auto& sf = fragments_to_load[f]; + + URI coords_uri = + sf.uri_.join_path(constants::coords + constants::file_suffix); + + auto name = sf.uri_.remove_trailing_slash().last_path_part(); + auto format_version = utils::parse::get_fragment_version(name); + + // Note that the fragment metadata version is >= the array schema + // version. Therefore, the check below is defensive and will always + // ensure backwards compatibility. + shared_ptr metadata; + if (format_version <= 2) { + bool sparse; + RETURN_NOT_OK(resources.vfs().is_file(coords_uri, &sparse)); + metadata = make_shared( + HERE(), + &resources, + memory_tracker, + array_schema_latest, + sf.uri_, + sf.timestamp_range_, + !sparse); + } else { + // Fragment format version > 2 + metadata = make_shared( + HERE(), + &resources, + memory_tracker, + array_schema_latest, + sf.uri_, + sf.timestamp_range_); + } - // Potentially find the basic fragment metadata in the consolidated - // metadata buffer - Tile* fragment_metadata_tile = nullptr; - uint64_t offset = 0; + // Potentially find the basic fragment metadata in the consolidated + // metadata buffer + Tile* fragment_metadata_tile = nullptr; + uint64_t offset = 0; - auto it = offsets.end(); - if (metadata->format_version() >= 9) { - it = offsets.find(name); - } else { - it = offsets.find(sf.uri_.to_string()); - } - if (it != offsets.end()) { - fragment_metadata_tile = it->second.first; - offset = it->second.second; - } + auto it = offsets.end(); + if (metadata->format_version() >= 9) { + it = offsets.find(name); + } else { + it = offsets.find(sf.uri_.to_string()); + } + if (it != offsets.end()) { + fragment_metadata_tile = it->second.first; + offset = it->second.second; + } - // Load fragment metadata - RETURN_NOT_OK(metadata->load( - encryption_key, fragment_metadata_tile, offset, array_schemas_all)); + // Load fragment metadata + RETURN_NOT_OK(metadata->load( + encryption_key, fragment_metadata_tile, offset, array_schemas_all)); - fragment_metadata[f] = metadata; - return Status::Ok(); - }); + fragment_metadata[f] = metadata; + return Status::Ok(); + }); throw_if_not_ok(status); return fragment_metadata; diff --git a/tiledb/sm/serialization/query_aggregates.h b/tiledb/sm/serialization/query_aggregates.h index 9b254c9c750..c54cfa503cc 100644 --- a/tiledb/sm/serialization/query_aggregates.h +++ b/tiledb/sm/serialization/query_aggregates.h @@ -34,8 +34,9 @@ #define TILEDB_SERIALIZATION_QUERY_AGGREGATES_H #ifdef TILEDB_SERIALIZATION -#include "tiledb/sm/serialization/capnp_utils.h" #include "tiledb/sm/query/query.h" +#include "tiledb/sm/serialization/capnp_utils.h" + #endif namespace tiledb::sm::serialization { diff --git a/tiledb/sm/subarray/subarray.cc b/tiledb/sm/subarray/subarray.cc index cdd51e7b6fe..6c3ec9600e1 100644 --- a/tiledb/sm/subarray/subarray.cc +++ b/tiledb/sm/subarray/subarray.cc @@ -57,9 +57,9 @@ #include "tiledb/sm/rest/rest_client.h" #include "tiledb/sm/rtree/rtree.h" #include "tiledb/sm/stats/global_stats.h" +#include "tiledb/sm/storage_manager/storage_manager.h" #include "tiledb/sm/subarray/relevant_fragment_generator.h" #include "tiledb/sm/subarray/subarray.h" -#include "tiledb/sm/storage_manager/storage_manager.h" #include "tiledb/type/apply_with_type.h" #include "tiledb/type/range/range.h" From 62276506616dabc3bc68452dff780d9ce1bb531f Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Tue, 24 Oct 2023 21:14:08 +0300 Subject: [PATCH 07/11] Fix compiler errors. We cannot capture structured binding variables in lambdas. --- tiledb/sm/fragment/fragment_info.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tiledb/sm/fragment/fragment_info.cc b/tiledb/sm/fragment/fragment_info.cc index dd0835394a0..d0ec3934ed5 100644 --- a/tiledb/sm/fragment/fragment_info.cc +++ b/tiledb/sm/fragment/fragment_info.cc @@ -1024,7 +1024,10 @@ FragmentInfo::load_array_schemas_and_fragment_metadata( "sm_load_array_schemas_and_fragment_metadata"); // Load array schemas - auto&& [array_schema_latest, array_schemas_all] = + std::shared_ptr array_schema_latest; + std::unordered_map> + array_schemas_all; + std::tie(array_schema_latest, array_schemas_all) = array_dir.load_array_schemas(enc_key); const auto filtered_fragment_uris = [&]() { From b51147ab913837bc1ed4c7145bb049b41e65c14d Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Tue, 24 Oct 2023 22:42:42 +0300 Subject: [PATCH 08/11] Fix the standalone unit tests. --- tiledb/sm/array/test/unit_consistency.h | 1 + tiledb/sm/query_plan/test/unit_query_plan.cc | 1 + 2 files changed, 2 insertions(+) diff --git a/tiledb/sm/array/test/unit_consistency.h b/tiledb/sm/array/test/unit_consistency.h index 2b37c7292ac..8e2aea539fe 100644 --- a/tiledb/sm/array/test/unit_consistency.h +++ b/tiledb/sm/array/test/unit_consistency.h @@ -43,6 +43,7 @@ #include "tiledb/sm/enums/array_type.h" #include "tiledb/sm/enums/encryption_type.h" #include "tiledb/sm/enums/layout.h" +#include "tiledb/sm/storage_manager/storage_manager.h" #include "tiledb/storage_format/uri/parse_uri.h" using namespace tiledb; diff --git a/tiledb/sm/query_plan/test/unit_query_plan.cc b/tiledb/sm/query_plan/test/unit_query_plan.cc index de34d6b11e5..45709aa8cf9 100644 --- a/tiledb/sm/query_plan/test/unit_query_plan.cc +++ b/tiledb/sm/query_plan/test/unit_query_plan.cc @@ -40,6 +40,7 @@ #include "tiledb/sm/enums/layout.h" #include "tiledb/sm/query/query.h" #include "tiledb/sm/stats/stats.h" +#include "tiledb/sm/storage_manager/storage_manager.h" #include "tiledb/storage_format/uri/parse_uri.h" using namespace tiledb; From 038a4a45327810f416e566284c2de1dc27ca2b31 Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Wed, 25 Oct 2023 00:24:53 +0300 Subject: [PATCH 09/11] Rename `context_resources_` to `resources_`. --- tiledb/sm/fragment/fragment_metadata.cc | 112 +++++++++++------------- tiledb/sm/fragment/fragment_metadata.h | 4 +- 2 files changed, 52 insertions(+), 64 deletions(-) diff --git a/tiledb/sm/fragment/fragment_metadata.cc b/tiledb/sm/fragment/fragment_metadata.cc index e93f90f9c2b..b8f823d2fdb 100644 --- a/tiledb/sm/fragment/fragment_metadata.cc +++ b/tiledb/sm/fragment/fragment_metadata.cc @@ -89,7 +89,7 @@ FragmentMetadata::FragmentMetadata( bool dense, bool has_timestamps, bool has_deletes_meta) - : context_resources_(resources) + : resources_(resources) , memory_tracker_(memory_tracker) , array_schema_(array_schema) , dense_(dense) @@ -115,7 +115,7 @@ FragmentMetadata::~FragmentMetadata() = default; // Copy initialization FragmentMetadata::FragmentMetadata(const FragmentMetadata& other) { - context_resources_ = other.context_resources_; + resources_ = other.resources_; array_schema_ = other.array_schema_; dense_ = other.dense_; fragment_uri_ = other.fragment_uri_; @@ -136,7 +136,7 @@ FragmentMetadata::FragmentMetadata(const FragmentMetadata& other) { } FragmentMetadata& FragmentMetadata::operator=(const FragmentMetadata& other) { - context_resources_ = other.context_resources_; + resources_ = other.resources_; array_schema_ = other.array_schema_; dense_ = other.dense_; fragment_uri_ = other.fragment_uri_; @@ -377,7 +377,7 @@ void FragmentMetadata::compute_fragment_min_max_sum_null_count() { // Process all attributes in parallel. throw_if_not_ok(parallel_for( - &context_resources_->compute_tp(), 0, idx_map_.size(), [&](uint64_t n) { + &resources_->compute_tp(), 0, idx_map_.size(), [&](uint64_t n) { // For easy reference. const auto& name = names[n]; const auto& idx = idx_map_[name]; @@ -680,8 +680,7 @@ uint64_t FragmentMetadata::fragment_size() const { if (meta_file_size == 0) { auto meta_uri = fragment_uri_.join_path( std::string(constants::fragment_metadata_filename)); - throw_if_not_ok( - context_resources_->vfs().file_size(meta_uri, &meta_file_size)); + throw_if_not_ok(resources_->vfs().file_size(meta_uri, &meta_file_size)); } // Validate that the meta_file_size is not zero, either preloaded or fetched // above @@ -874,8 +873,7 @@ Status FragmentMetadata::load( // Load the metadata file size when we are not reading from consolidated // buffer if (fragment_metadata_tile == nullptr) { - RETURN_NOT_OK( - context_resources_->vfs().file_size(meta_uri, &meta_file_size_)); + RETURN_NOT_OK(resources_->vfs().file_size(meta_uri, &meta_file_size_)); } // Get fragment name version @@ -891,8 +889,7 @@ Status FragmentMetadata::load( } void FragmentMetadata::store(const EncryptionKey& encryption_key) { - auto timer_se = - context_resources_->stats().start_timer("write_store_frag_meta"); + auto timer_se = resources_->stats().start_timer("write_store_frag_meta"); if (version_ < 7) { auto fragment_metadata_uri = @@ -970,7 +967,7 @@ Status FragmentMetadata::store_v7_v10(const EncryptionKey& encryption_key) { throw_if_not_ok(store_footer(encryption_key)); // Close file - return context_resources_->vfs().close_file(fragment_metadata_uri); + return resources_->vfs().close_file(fragment_metadata_uri); } Status FragmentMetadata::store_v11(const EncryptionKey& encryption_key) { @@ -1052,7 +1049,7 @@ Status FragmentMetadata::store_v11(const EncryptionKey& encryption_key) { RETURN_NOT_OK_ELSE(store_footer(encryption_key), clean_up()); // Close file - return context_resources_->vfs().close_file(fragment_metadata_uri); + return resources_->vfs().close_file(fragment_metadata_uri); } Status FragmentMetadata::store_v12_v14(const EncryptionKey& encryption_key) { @@ -1139,7 +1136,7 @@ Status FragmentMetadata::store_v12_v14(const EncryptionKey& encryption_key) { throw_if_not_ok(store_footer(encryption_key)); // Close file - return context_resources_->vfs().close_file(fragment_metadata_uri); + return resources_->vfs().close_file(fragment_metadata_uri); } Status FragmentMetadata::store_v15_or_higher( @@ -1232,7 +1229,7 @@ Status FragmentMetadata::store_v15_or_higher( throw_if_not_ok(store_footer(encryption_key)); // Close file - return context_resources_->vfs().close_file(fragment_metadata_uri); + return resources_->vfs().close_file(fragment_metadata_uri); } Status FragmentMetadata::set_num_tiles(uint64_t num_tiles) { @@ -1558,7 +1555,7 @@ Status FragmentMetadata::load_fragment_min_max_sum_null_count( RETURN_NOT_OK(st); auto& tile = *tile_opt; - context_resources_->stats().add_counter( + resources_->stats().add_counter( "read_fragment_min_max_sum_null_count_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); @@ -1586,7 +1583,7 @@ Status FragmentMetadata::load_processed_conditions( RETURN_NOT_OK(st); auto& tile = *tile_opt; - context_resources_->stats().add_counter( + resources_->stats().add_counter( "read_processed_conditions_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); @@ -2082,7 +2079,7 @@ Status FragmentMetadata::load_rtree(const EncryptionKey& encryption_key) { RETURN_NOT_OK(st); auto& tile = *tile_opt; - context_resources_->stats().add_counter("read_rtree_size", tile.size()); + resources_->stats().add_counter("read_rtree_size", tile.size()); // Use the serialized buffer size to approximate memory usage of the rtree. if (memory_tracker_ != nullptr && @@ -2206,11 +2203,10 @@ Status FragmentMetadata::get_footer_offset_and_size( URI fragment_metadata_uri = fragment_uri_.join_path( std::string(constants::fragment_metadata_filename)); uint64_t size_offset = meta_file_size_ - sizeof(uint64_t); - RETURN_NOT_OK(context_resources_->vfs().read( + RETURN_NOT_OK(resources_->vfs().read( fragment_metadata_uri, size_offset, size, sizeof(uint64_t))); *offset = meta_file_size_ - *size - sizeof(uint64_t); - context_resources_->stats().add_counter( - "read_frag_meta_size", sizeof(uint64_t)); + resources_->stats().add_counter("read_frag_meta_size", sizeof(uint64_t)); } return Status::Ok(); @@ -2487,8 +2483,7 @@ Status FragmentMetadata::load_tile_offsets( RETURN_NOT_OK(st); auto& tile = *tile_opt; - context_resources_->stats().add_counter( - "read_tile_offsets_size", tile.size()); + resources_->stats().add_counter("read_tile_offsets_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_offsets(idx, deserializer); @@ -2517,8 +2512,7 @@ Status FragmentMetadata::load_tile_var_offsets( RETURN_NOT_OK(st); auto& tile = *tile_opt; - context_resources_->stats().add_counter( - "read_tile_var_offsets_size", tile.size()); + resources_->stats().add_counter("read_tile_var_offsets_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_var_offsets(idx, deserializer); @@ -2543,8 +2537,7 @@ Status FragmentMetadata::load_tile_var_sizes( RETURN_NOT_OK(st); auto& tile = *tile_opt; - context_resources_->stats().add_counter( - "read_tile_var_sizes_size", tile.size()); + resources_->stats().add_counter("read_tile_var_sizes_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_var_sizes(idx, deserializer); @@ -2569,7 +2562,7 @@ Status FragmentMetadata::load_tile_validity_offsets( RETURN_NOT_OK(st); auto& tile = *tile_opt; - context_resources_->stats().add_counter( + resources_->stats().add_counter( "read_tile_validity_offsets_size", tile.size()); ConstBuffer cbuff(tile.data(), tile.size()); @@ -2595,7 +2588,7 @@ Status FragmentMetadata::load_tile_min_values( RETURN_NOT_OK(st); auto& tile = *tile_opt; - context_resources_->stats().add_counter("read_tile_min_size", tile.size()); + resources_->stats().add_counter("read_tile_min_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_min_values(idx, deserializer); @@ -2620,7 +2613,7 @@ Status FragmentMetadata::load_tile_max_values( RETURN_NOT_OK(st); auto& tile = *tile_opt; - context_resources_->stats().add_counter("read_tile_max_size", tile.size()); + resources_->stats().add_counter("read_tile_max_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_max_values(idx, deserializer); @@ -2645,7 +2638,7 @@ Status FragmentMetadata::load_tile_sum_values( RETURN_NOT_OK(st); auto& tile = *tile_opt; - context_resources_->stats().add_counter("read_tile_sum_size", tile.size()); + resources_->stats().add_counter("read_tile_sum_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_sum_values(idx, deserializer); @@ -2670,8 +2663,7 @@ Status FragmentMetadata::load_tile_null_count_values( RETURN_NOT_OK(st); auto& tile = *tile_opt; - context_resources_->stats().add_counter( - "read_tile_null_count_size", tile.size()); + resources_->stats().add_counter("read_tile_null_count_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_null_count_values(idx, deserializer); @@ -3665,13 +3657,13 @@ Status FragmentMetadata::load_v1_v2( URI fragment_metadata_uri = fragment_uri_.join_path( std::string(constants::fragment_metadata_filename)); // Read metadata - GenericTileIO tile_io(*context_resources_, fragment_metadata_uri); + GenericTileIO tile_io(*resources_, fragment_metadata_uri); auto&& [st, tile_opt] = - tile_io.read_generic(0, encryption_key, context_resources_->config()); + tile_io.read_generic(0, encryption_key, resources_->config()); RETURN_NOT_OK(st); auto& tile = *tile_opt; - context_resources_->stats().add_counter("read_frag_meta_size", tile.size()); + resources_->stats().add_counter("read_frag_meta_size", tile.size()); // Pre-v10 format fragments we need to set the schema and schema name to // the "old" schema. This way "old" fragments are still loaded fine @@ -3950,7 +3942,7 @@ Status FragmentMetadata::store_rtree( const EncryptionKey& encryption_key, uint64_t* nbytes) { auto rtree_tile = write_rtree(); RETURN_NOT_OK(write_generic_tile_to_file(encryption_key, rtree_tile, nbytes)); - context_resources_->stats().add_counter("write_rtree_size", *nbytes); + resources_->stats().add_counter("write_rtree_size", *nbytes); return Status::Ok(); } @@ -4015,9 +4007,9 @@ tuple> FragmentMetadata::read_generic_tile_from_file( std::string(constants::fragment_metadata_filename)); // Read metadata - GenericTileIO tile_io(*context_resources_, fragment_metadata_uri); - auto&& [st, tile_opt] = tile_io.read_generic( - offset, encryption_key, context_resources_->config()); + GenericTileIO tile_io(*resources_, fragment_metadata_uri); + auto&& [st, tile_opt] = + tile_io.read_generic(offset, encryption_key, resources_->config()); RETURN_NOT_OK_TUPLE(st, nullopt); return {Status::Ok(), std::move(*tile_opt)}; @@ -4035,7 +4027,7 @@ Status FragmentMetadata::read_file_footer( tile = make_shared(HERE(), Tile::from_generic(*footer_size)); - context_resources_->stats().add_counter("read_frag_meta_size", *footer_size); + resources_->stats().add_counter("read_frag_meta_size", *footer_size); if (memory_tracker_ != nullptr && !memory_tracker_->take_memory( @@ -4049,7 +4041,7 @@ Status FragmentMetadata::read_file_footer( } // Read footer - return context_resources_->vfs().read( + return resources_->vfs().read( fragment_metadata_uri, *footer_offset, tile->data_as(), @@ -4063,7 +4055,7 @@ Status FragmentMetadata::write_generic_tile_to_file( URI fragment_metadata_uri = fragment_uri_.join_path( std::string(constants::fragment_metadata_filename)); - GenericTileIO tile_io(*context_resources_, fragment_metadata_uri); + GenericTileIO tile_io(*resources_, fragment_metadata_uri); RETURN_NOT_OK(tile_io.write_generic(&tile, encryption_key, nbytes)); return Status::Ok(); @@ -4074,12 +4066,12 @@ Status FragmentMetadata::write_footer_to_file(WriterTile& tile) const { std::string(constants::fragment_metadata_filename)); uint64_t size = tile.size(); - RETURN_NOT_OK(context_resources_->vfs().write( - fragment_metadata_uri, tile.data(), tile.size())); + RETURN_NOT_OK( + resources_->vfs().write(fragment_metadata_uri, tile.data(), tile.size())); // Write the size in the end if there is at least one var-sized dimension if (!array_schema_->domain().all_dims_fixed() || version_ >= 10) - return context_resources_->vfs().write( + return resources_->vfs().write( fragment_metadata_uri, &size, sizeof(uint64_t)); return Status::Ok(); } @@ -4096,7 +4088,7 @@ void FragmentMetadata::store_tile_offsets( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - context_resources_->stats().add_counter("write_tile_offsets_size", *nbytes); + resources_->stats().add_counter("write_tile_offsets_size", *nbytes); } void FragmentMetadata::write_tile_offsets( @@ -4124,8 +4116,7 @@ void FragmentMetadata::store_tile_var_offsets( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - context_resources_->stats().add_counter( - "write_tile_var_offsets_size", *nbytes); + resources_->stats().add_counter("write_tile_var_offsets_size", *nbytes); } void FragmentMetadata::write_tile_var_offsets( @@ -4154,7 +4145,7 @@ void FragmentMetadata::store_tile_var_sizes( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - context_resources_->stats().add_counter("write_tile_var_sizes_size", *nbytes); + resources_->stats().add_counter("write_tile_var_sizes_size", *nbytes); } void FragmentMetadata::write_tile_var_sizes( @@ -4182,8 +4173,7 @@ void FragmentMetadata::store_tile_validity_offsets( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - context_resources_->stats().add_counter( - "write_tile_validity_offsets_size", *nbytes); + resources_->stats().add_counter("write_tile_validity_offsets_size", *nbytes); } void FragmentMetadata::write_tile_validity_offsets( @@ -4212,7 +4202,7 @@ void FragmentMetadata::store_tile_mins( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - context_resources_->stats().add_counter("write_mins_size", *nbytes); + resources_->stats().add_counter("write_mins_size", *nbytes); } void FragmentMetadata::write_tile_mins(unsigned idx, Serializer& serializer) { @@ -4249,7 +4239,7 @@ void FragmentMetadata::store_tile_maxs( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - context_resources_->stats().add_counter("write_maxs_size", *nbytes); + resources_->stats().add_counter("write_maxs_size", *nbytes); } void FragmentMetadata::write_tile_maxs(unsigned idx, Serializer& serializer) { @@ -4286,7 +4276,7 @@ void FragmentMetadata::store_tile_sums( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - context_resources_->stats().add_counter("write_sums_size", *nbytes); + resources_->stats().add_counter("write_sums_size", *nbytes); } void FragmentMetadata::write_tile_sums(unsigned idx, Serializer& serializer) { @@ -4312,7 +4302,7 @@ void FragmentMetadata::store_tile_null_counts( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - context_resources_->stats().add_counter("write_null_counts_size", *nbytes); + resources_->stats().add_counter("write_null_counts_size", *nbytes); } void FragmentMetadata::write_tile_null_counts( @@ -4366,7 +4356,7 @@ void FragmentMetadata::store_fragment_min_max_sum_null_count( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - context_resources_->stats().add_counter("write_null_counts_size", *nbytes); + resources_->stats().add_counter("write_null_counts_size", *nbytes); } void FragmentMetadata::store_processed_conditions( @@ -4393,8 +4383,7 @@ void FragmentMetadata::store_processed_conditions( throw_if_not_ok(write_generic_tile_to_file(encryption_key, tile, nbytes)); - context_resources_->stats().add_counter( - "write_processed_conditions_size", *nbytes); + resources_->stats().add_counter("write_processed_conditions_size", *nbytes); } template @@ -4723,8 +4712,7 @@ Status FragmentMetadata::store_footer(const EncryptionKey&) { RETURN_NOT_OK(write_footer(serializer)); RETURN_NOT_OK(write_footer_to_file(tile)); - context_resources_->stats().add_counter( - "write_frag_meta_footer_size", tile.size()); + resources_->stats().add_counter("write_frag_meta_footer_size", tile.size()); return Status::Ok(); } @@ -4750,8 +4738,8 @@ void FragmentMetadata::clean_up() { auto fragment_metadata_uri = fragment_uri_.join_path(constants::fragment_metadata_filename); - throw_if_not_ok(context_resources_->vfs().close_file(fragment_metadata_uri)); - throw_if_not_ok(context_resources_->vfs().remove_file(fragment_metadata_uri)); + throw_if_not_ok(resources_->vfs().close_file(fragment_metadata_uri)); + throw_if_not_ok(resources_->vfs().remove_file(fragment_metadata_uri)); } const shared_ptr& FragmentMetadata::array_schema() const { diff --git a/tiledb/sm/fragment/fragment_metadata.h b/tiledb/sm/fragment/fragment_metadata.h index 61509202b5d..e1c6c4c382b 100644 --- a/tiledb/sm/fragment/fragment_metadata.h +++ b/tiledb/sm/fragment/fragment_metadata.h @@ -1217,7 +1217,7 @@ class FragmentMetadata { /** set the CR pointer during deserialization*/ void set_context_resources(ContextResources* cr) { - context_resources_ = cr; + resources_ = cr; } /** set the memory tracker pointer during deserialization*/ @@ -1261,7 +1261,7 @@ class FragmentMetadata { /* ********************************* */ /** The storage manager. */ - ContextResources* context_resources_; + ContextResources* resources_; /** * The memory tracker of the array this fragment metadata corresponds to. From 2907840c7a73e773fbcd31a3dde6962fe6ec7838 Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Thu, 26 Oct 2023 18:08:17 +0300 Subject: [PATCH 10/11] clang-format --- tiledb/sm/fragment/fragment_metadata.cc | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/tiledb/sm/fragment/fragment_metadata.cc b/tiledb/sm/fragment/fragment_metadata.cc index bb11cb7db99..85980985d45 100644 --- a/tiledb/sm/fragment/fragment_metadata.cc +++ b/tiledb/sm/fragment/fragment_metadata.cc @@ -863,8 +863,7 @@ void FragmentMetadata::load( // Load the metadata file size when we are not reading from consolidated // buffer if (fragment_metadata_tile == nullptr) { - throw_if_not_ok( - resources_->vfs().file_size(meta_uri, &meta_file_size_)); + throw_if_not_ok(resources_->vfs().file_size(meta_uri, &meta_file_size_)); } // Get fragment name version @@ -2457,8 +2456,7 @@ void FragmentMetadata::load_tile_var_offsets( auto tile = read_generic_tile_from_file( encryption_key, gt_offsets_.tile_var_offsets_[idx]); - resources_->stats().add_counter( - "read_tile_var_offsets_size", tile.size()); + resources_->stats().add_counter("read_tile_var_offsets_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_var_offsets(idx, deserializer); @@ -2480,8 +2478,7 @@ void FragmentMetadata::load_tile_var_sizes( auto tile = read_generic_tile_from_file( encryption_key, gt_offsets_.tile_var_sizes_[idx]); - resources_->stats().add_counter( - "read_tile_var_sizes_size", tile.size()); + resources_->stats().add_counter("read_tile_var_sizes_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_var_sizes(idx, deserializer); @@ -2592,8 +2589,7 @@ void FragmentMetadata::load_tile_null_count_values( auto tile = read_generic_tile_from_file( encryption_key, gt_offsets_.tile_null_count_offsets_[idx]); - resources_->stats().add_counter( - "read_tile_null_count_size", tile.size()); + resources_->stats().add_counter("read_tile_null_count_size", tile.size()); Deserializer deserializer(tile.data(), tile.size()); load_tile_null_count_values(idx, deserializer); @@ -3972,8 +3968,8 @@ void FragmentMetadata::write_footer_to_file(WriterTile& tile) const { std::string(constants::fragment_metadata_filename)); uint64_t size = tile.size(); - throw_if_not_ok(resources_->vfs().write( - fragment_metadata_uri, tile.data(), tile.size())); + throw_if_not_ok( + resources_->vfs().write(fragment_metadata_uri, tile.data(), tile.size())); // Write the size in the end if there is at least one var-sized dimension if (!array_schema_->domain().all_dims_fixed() || version_ >= 10) { @@ -4603,8 +4599,7 @@ void FragmentMetadata::store_footer(const EncryptionKey&) { write_footer(serializer); write_footer_to_file(tile); - resources_->stats().add_counter( - "write_frag_meta_footer_size", tile.size()); + resources_->stats().add_counter("write_frag_meta_footer_size", tile.size()); } void FragmentMetadata::resize_tile_offsets_vectors(uint64_t size) { From b6e4f5650faa921478634a8c511426ac3f6c0ac7 Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Thu, 26 Oct 2023 18:26:22 +0300 Subject: [PATCH 11/11] Address PR review. --- tiledb/sm/fragment/fragment_info.cc | 14 +++++++------- tiledb/sm/fragment/fragment_info.h | 3 +-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/tiledb/sm/fragment/fragment_info.cc b/tiledb/sm/fragment/fragment_info.cc index 63d9b4ac562..41354d2166b 100644 --- a/tiledb/sm/fragment/fragment_info.cc +++ b/tiledb/sm/fragment/fragment_info.cc @@ -978,7 +978,7 @@ Status FragmentInfo::load_and_replace( return Status::Ok(); } -tuple, optional>>> +tuple>> load_consolidated_fragment_meta( ContextResources& resources, const URI& uri, const EncryptionKey& enc_key) { auto timer_se = @@ -986,7 +986,8 @@ load_consolidated_fragment_meta( // No consolidated fragment metadata file if (uri.to_string().empty()) - return {nullopt, nullopt}; + throw StatusException(Status_FragmentInfoError( + "Cannot load consolidated fragment metadata; URI is empty.")); auto&& tile = GenericTileIO::load(resources, uri, 0, enc_key); @@ -1042,16 +1043,15 @@ FragmentInfo::load_array_schemas_and_fragment_metadata( std::vector> fragment_metadata_tiles(meta_uris.size()); std::vector>> offsets_vectors( meta_uris.size()); - auto status = + throw_if_not_ok( parallel_for(&resources.compute_tp(), 0, meta_uris.size(), [&](size_t i) { auto&& [tile_opt, offsets] = load_consolidated_fragment_meta(resources, meta_uris[i], enc_key); fragment_metadata_tiles[i] = - make_shared(HERE(), std::move(*tile_opt)); - offsets_vectors[i] = std::move(offsets.value()); + make_shared(HERE(), std::move(tile_opt)); + offsets_vectors[i] = std::move(offsets); return Status::Ok(); - }); - throw_if_not_ok(status); + })); // Get the unique fragment metadatas into a map. std::unordered_map> offsets; diff --git a/tiledb/sm/fragment/fragment_info.h b/tiledb/sm/fragment/fragment_info.h index 04c095b86cb..e488af94c8a 100644 --- a/tiledb/sm/fragment/fragment_info.h +++ b/tiledb/sm/fragment/fragment_info.h @@ -308,9 +308,8 @@ class FragmentInfo { * @param memory_tracker The memory tracker of the array * for which the fragment metadata is loaded. * @param enc_key The encryption key to use. - * @return tuple of Status, latest ArraySchema, map of all array schemas and + * @return tuple latest ArraySchema, map of all array schemas and * vector of FragmentMetadata - * Status Ok on success, else error * ArraySchema The array schema to be retrieved after the * array is opened. * ArraySchemaMap Map of all array schemas found keyed by name