Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple the code in the fragment directory from StorageManager. #4453

Merged
merged 13 commits into from
Oct 26, 2023
Merged
2 changes: 1 addition & 1 deletion test/src/unit-average-cell-size.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ struct CPPAverageCellSizeFx {
->open_without_fragments(
sm::EncryptionType::NO_ENCRYPTION, nullptr, 0)
.ok());
REQUIRE(array_for_reads->load_fragments(uris).ok());
array_for_reads->load_fragments(uris);
auto avg_cell_sizes = array_for_reads->get_average_var_cell_sizes();

CHECK(avg_cell_sizes["d2"] == d2_size);
Expand Down
66 changes: 28 additions & 38 deletions tiledb/sm/array/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,8 @@ Status Array::open_without_fragments(
array_dir_ = ArrayDirectory(
resources_, array_uri_, 0, UINT64_MAX, ArrayDirectoryMode::READ);
}
auto&& [array_schema, array_schemas] = open_for_reads_without_fragments();

array_schema_latest_ = array_schema.value();
array_schemas_all_ = array_schemas.value();
std::tie(array_schema_latest_, array_schemas_all_) =
open_for_reads_without_fragments();
}
} catch (std::exception& e) {
set_array_closed();
Expand All @@ -223,15 +221,20 @@ Status Array::open_without_fragments(
return Status::Ok();
}

Status Array::load_fragments(
void Array::load_fragments(
const std::vector<TimestampedURI>& fragments_to_load) {
auto&& [st, fragment_metadata] =
storage_manager_->array_load_fragments(this, fragments_to_load);
RETURN_NOT_OK(st);
auto timer_se = resources_.stats().start_timer("sm_array_load_fragments");

fragment_metadata_ = std::move(fragment_metadata.value());

return Status::Ok();
// Load the fragment metadata
std::unordered_map<std::string, std::pair<Tile*, uint64_t>> offsets;
fragment_metadata_ = FragmentMetadata::load(
resources_,
memory_tracker(),
array_schema_latest_ptr(),
array_schemas_all(),
*encryption_key(),
fragments_to_load,
offsets);
}

Status Array::open(
Expand Down Expand Up @@ -380,13 +383,8 @@ Status Array::open(
array_dir_ = ArrayDirectory(
resources_, array_uri_, timestamp_start_, timestamp_end_opened_at_);
}
auto&& [array_schema_latest, array_schemas, fragment_metadata] =
std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata_) =
open_for_reads();

// Set schemas
array_schema_latest_ = array_schema_latest.value();
array_schemas_all_ = array_schemas.value();
fragment_metadata_ = fragment_metadata.value();
} else if (
query_type == QueryType::WRITE ||
query_type == QueryType::MODIFY_EXCLUSIVE) {
Expand Down Expand Up @@ -895,13 +893,9 @@ Status Array::reopen(uint64_t timestamp_start, uint64_t timestamp_end) {
return LOG_STATUS(Status_ArrayDirectoryError(le.what()));
}

auto&& [array_schema_latest, array_schemas, fragment_metadata] =
std::tie(array_schema_latest_, array_schemas_all_, fragment_metadata_) =
open_for_reads();

array_schema_latest_ = array_schema_latest.value();
array_schemas_all_ = array_schemas.value();
fragment_metadata_ = fragment_metadata.value();

return Status::Ok();
}

Expand Down Expand Up @@ -1254,39 +1248,35 @@ std::unordered_map<std::string, uint64_t> Array::get_average_var_cell_sizes()
/* ********************************* */

tuple<
optional<shared_ptr<ArraySchema>>,
optional<std::unordered_map<std::string, shared_ptr<ArraySchema>>>,
optional<std::vector<shared_ptr<FragmentMetadata>>>>
shared_ptr<ArraySchema>,
std::unordered_map<std::string, shared_ptr<ArraySchema>>,
std::vector<shared_ptr<FragmentMetadata>>>
Array::open_for_reads() {
auto timer_se = resources_.stats().start_timer(
"array_open_read_load_schemas_and_fragment_meta");
auto&& [st, array_schema_latest, array_schemas_all, fragment_metadata] =
storage_manager_->load_array_schemas_and_fragment_metadata(
array_directory(), memory_tracker(), *encryption_key());
auto result = FragmentInfo::load_array_schemas_and_fragment_metadata(
resources_, array_directory(), memory_tracker(), *encryption_key());

throw_if_not_ok(st);

auto version = array_schema_latest.value()->version();
auto version = std::get<0>(result)->version();
ensure_supported_schema_version_for_read(version);

return {array_schema_latest, array_schemas_all, fragment_metadata};
return result;
}

tuple<
optional<shared_ptr<ArraySchema>>,
optional<std::unordered_map<std::string, shared_ptr<ArraySchema>>>>
shared_ptr<ArraySchema>,
std::unordered_map<std::string, shared_ptr<ArraySchema>>>
Array::open_for_reads_without_fragments() {
auto timer_se = resources_.stats().start_timer(
"array_open_read_without_fragments_load_schemas");

// Load array schemas
auto&& [array_schema_latest, array_schemas_all] =
array_dir_.load_array_schemas(*encryption_key());
auto result = array_dir_.load_array_schemas(*encryption_key());

auto version = array_schema_latest->version();
auto version = std::get<0>(result)->version();
ensure_supported_schema_version_for_read(version);

return {array_schema_latest, array_schemas_all};
return result;
}

tuple<
Expand Down
19 changes: 8 additions & 11 deletions tiledb/sm/array/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,8 @@ class Array {
* Reload the array with the specified fragments.
*
* @param fragments_to_load The list of fragments to load.
* @return Status
*/
Status load_fragments(const std::vector<TimestampedURI>& fragments_to_load);
void load_fragments(const std::vector<TimestampedURI>& fragments_to_load);

/**
* Opens the array for reading.
Expand Down Expand Up @@ -716,34 +715,32 @@ class Array {
* `timestamp_start` and `timestamp_end`.
*
* @param array The array to be opened.
* @return tuple of Status, latest ArraySchema, map of all array schemas and
* @return tuple latest ArraySchema, map of all array schemas and
* vector of FragmentMetadata
* Status Ok on success, else error
* ArraySchema The array schema to be retrieved after the
* array is opened.
* ArraySchemaMap Map of all array schemas found keyed by name
* fragment_metadata The fragment metadata to be retrieved
* after the array is opened.
*/
tuple<
optional<shared_ptr<ArraySchema>>,
optional<std::unordered_map<std::string, shared_ptr<ArraySchema>>>,
optional<std::vector<shared_ptr<FragmentMetadata>>>>
shared_ptr<ArraySchema>,
std::unordered_map<std::string, shared_ptr<ArraySchema>>,
std::vector<shared_ptr<FragmentMetadata>>>
open_for_reads();

/**
* Opens an array for reads without fragments.
*
* @param array The array to be opened.
* @return tuple of Status, latest ArraySchema and map of all array schemas
* Status Ok on success, else error
* @return tuple of latest ArraySchema and map of all array schemas
* ArraySchema The array schema to be retrieved after the
* array is opened.
* ArraySchemaMap Map of all array schemas found keyed by name
*/
tuple<
optional<shared_ptr<ArraySchema>>,
optional<std::unordered_map<std::string, shared_ptr<ArraySchema>>>>
shared_ptr<ArraySchema>,
std::unordered_map<std::string, shared_ptr<ArraySchema>>>
open_for_reads_without_fragments();

/** Opens an array for writes.
Expand Down
1 change: 1 addition & 0 deletions tiledb/sm/array/test/unit_consistency.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "tiledb/sm/enums/array_type.h"
#include "tiledb/sm/enums/encryption_type.h"
#include "tiledb/sm/enums/layout.h"
#include "tiledb/sm/storage_manager/storage_manager.h"
#include "tiledb/storage_format/uri/parse_uri.h"

using namespace tiledb;
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/c_api/tiledb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4504,7 +4504,7 @@ int32_t tiledb_fragment_info_alloc(

// Allocate a fragment info object
(*fragment_info)->fragment_info_ =
new (std::nothrow) tiledb::sm::FragmentInfo(uri, ctx->storage_manager());
new (std::nothrow) tiledb::sm::FragmentInfo(uri, ctx->resources());
if ((*fragment_info)->fragment_info_ == nullptr) {
delete *fragment_info;
*fragment_info = nullptr;
Expand Down
6 changes: 3 additions & 3 deletions tiledb/sm/consolidator/fragment_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Status FragmentConsolidator::consolidate(
// must be fetched (even before `config_.timestamp_start_`),
// to compute the anterior ND range that can help determine
// which dense fragments are consolidatable.
FragmentInfo fragment_info(URI(array_name), storage_manager_);
FragmentInfo fragment_info(URI(array_name), storage_manager_->resources());
auto st = fragment_info.load(
array_for_reads->array_directory(),
config_.timestamp_start_,
Expand Down Expand Up @@ -216,7 +216,7 @@ Status FragmentConsolidator::consolidate_fragments(
}

// Get all fragment info
FragmentInfo fragment_info(URI(array_name), storage_manager_);
FragmentInfo fragment_info(URI(array_name), storage_manager_->resources());
auto st = fragment_info.load(
array_for_reads->array_directory(),
0,
Expand Down Expand Up @@ -371,7 +371,7 @@ Status FragmentConsolidator::consolidate_internal(
URI* new_fragment_uri) {
auto timer_se = stats_->start_timer("consolidate_internal");

RETURN_NOT_OK(array_for_reads->load_fragments(to_consolidate));
array_for_reads->load_fragments(to_consolidate);

if (array_for_reads->is_empty()) {
return Status::Ok();
Expand Down
Loading