Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent consolidation with fragment list in dense arrays if it could result in data loss. #5251

Merged
163 changes: 163 additions & 0 deletions test/src/unit-cppapi-consolidation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ void create_array(const std::string& array_name) {
Array::create(array_name, schema);
}

void create_array_2d(const std::string& array_name) {
Context ctx;
Domain domain(ctx);
auto d1 = Dimension::create<int>(ctx, "d1", {{1, 10}}, 2);
auto d2 = Dimension::create<int>(ctx, "d2", {{1, 10}}, 2);
domain.add_dimensions(d1);
domain.add_dimensions(d2);
auto a = Attribute::create<int>(ctx, "a");
ArraySchema schema(ctx, TILEDB_DENSE);
schema.set_domain(domain);
schema.add_attributes(a);
Array::create(array_name, schema);
}

void write_array(
const std::string& array_name,
const std::vector<int>& subarray,
Expand Down Expand Up @@ -249,6 +263,155 @@ TEST_CASE(
remove_array(array_name);
}

TEST_CASE(
"C++ API: Test consolidation with wrong fragment list",
"[cppapi][consolidation][fragment-list-consolidation]") {
std::string array_name = "cppapi_consolidation";
remove_array(array_name);

Context ctx;
Config config;
std::string fragment_name1;
std::string fragment_name2;
bool throws = false;
int32_t number_of_fragments_before_consolidation = 0;

SECTION("Throws exception") {
throws = true;
create_array_2d(array_name);
// In this case we request to consolidate frag2 and frag4. We can see that
// frag1 has been created prior to frag3 so the first condition to abort
// the consolidation is satisfied. Additionally, frag1's
// domain intersects with the union of the domains of the
// selected fragments for consolidation(frag2, frag4), so the second
// condition is also satisfied. An exception is expected.
write_array(array_name, {1, 3, 7, 9}, {1, 2, 3, 4, 5, 6, 7, 8, 9});
write_array(array_name, {2, 4, 2, 3}, {10, 11, 12, 13, 14, 15});
write_array(array_name, {3, 5, 4, 5}, {16, 17, 18, 19, 20, 21});
write_array(array_name, {7, 9, 6, 8}, {22, 23, 24, 25, 26, 27, 28, 29, 30});

number_of_fragments_before_consolidation =
tiledb::test::num_fragments(array_name);
CHECK(number_of_fragments_before_consolidation == 4);

FragmentInfo fragment_info(ctx, array_name);
fragment_info.load();
fragment_name1 = fragment_info.fragment_uri(1);
fragment_name2 = fragment_info.fragment_uri(3);
}

SECTION("Throws exception because of overlap in extended domain") {
throws = true;
create_array_2d(array_name);
// In this case we request to consolidate frag1 and frag3. We can see that
// frag2 has been created prior to frag3 so the first condition to abort
// the consolidation is satisfied. Additionally, even though frag2's
// domain does not directly intersect with the union of the domains of the
// selected fragments for consolidation(frag1, frag3), it intersects with
// their expanded domain(Full tiles) because the tile extend is set to 2 and
// the domain range is 1-10.
write_array(array_name, {2, 4, 2, 3}, {1, 2, 3, 4, 5, 6});
write_array(array_name, {10, 10, 4, 4}, {16});
write_array(array_name, {7, 9, 6, 8}, {7, 8, 9, 10, 11, 12, 13, 14, 15});

number_of_fragments_before_consolidation =
tiledb::test::num_fragments(array_name);
CHECK(number_of_fragments_before_consolidation == 3);

FragmentInfo fragment_info(ctx, array_name);
fragment_info.load();
fragment_name1 = fragment_info.fragment_uri(0);
fragment_name2 = fragment_info.fragment_uri(2);
}

SECTION(
"Throws exception because of overlap with already consolidated "
"fragment") {
throws = true;
create_array_2d(array_name);
// In this case we request to consolidate frag1 and frag3. Before this main
// consolidation we run another secondary consolidation between frag2 and
// frag4. The consolidated frag2_frag4 has been created after frag3 but its
// start timestamp is older than frag3's start timestamp so the first
// condition to abort the consolidation is satisfied. Frag2_frag4's domain
// intersects with the union of the domains of the selected fragments for
// consolidation(frag1, frag3), so the second condition is also satisfied.
// An exception is expected.
write_array(array_name, {2, 4, 2, 3}, {10, 11, 12, 13, 14, 15});
write_array(array_name, {8, 9, 3, 4}, {32, 33, 34, 35});
write_array(array_name, {7, 9, 6, 8}, {22, 23, 24, 25, 26, 27, 28, 29, 30});
write_array(array_name, {7, 8, 3, 4}, {31, 32, 33, 34});

FragmentInfo fragment_info(ctx, array_name);
fragment_info.load();
fragment_name1 = fragment_info.fragment_uri(1);
fragment_name2 = fragment_info.fragment_uri(3);

std::string short_fragment_name1 =
fragment_name1.substr(fragment_name1.find_last_of('/') + 1);
std::string short_fragment_name2 =
fragment_name2.substr(fragment_name2.find_last_of('/') + 1);

const char* fragment_uris[2] = {
short_fragment_name1.c_str(), short_fragment_name2.c_str()};

REQUIRE_NOTHROW(
Array::consolidate(ctx, array_name, fragment_uris, 2, &config));

fragment_name1 = fragment_info.fragment_uri(0);
fragment_name2 = fragment_info.fragment_uri(2);

number_of_fragments_before_consolidation =
tiledb::test::num_fragments(array_name);
CHECK(number_of_fragments_before_consolidation == 5);
}

SECTION("Does not throw exception") {
create_array_2d(array_name);
// In this case we request to consolidate frag1 and frag2. We can see that
// frag3 has not been created prior to frag3 so the first condition to abort
// the consolidation is not satisfied. Frag3's domain intersects with the
// union of the domains of the selected fragments for consolidation(frag1,
// frag2), so the second condition is satisfied. An exception is expected.
write_array(array_name, {2, 4, 2, 3}, {10, 11, 12, 13, 14, 15});
write_array(array_name, {7, 9, 6, 8}, {22, 23, 24, 25, 26, 27, 28, 29, 30});
write_array(array_name, {7, 8, 3, 4}, {31, 32, 33, 34});

number_of_fragments_before_consolidation =
tiledb::test::num_fragments(array_name);
CHECK(number_of_fragments_before_consolidation == 3);

FragmentInfo fragment_info(ctx, array_name);
fragment_info.load();
fragment_name1 = fragment_info.fragment_uri(0);
fragment_name2 = fragment_info.fragment_uri(1);
}

std::string short_fragment_name1 =
fragment_name1.substr(fragment_name1.find_last_of('/') + 1);
std::string short_fragment_name2 =
fragment_name2.substr(fragment_name2.find_last_of('/') + 1);

const char* fragment_uris[2] = {
short_fragment_name1.c_str(), short_fragment_name2.c_str()};

if (throws) {
DimitrisStaratzis marked this conversation as resolved.
Show resolved Hide resolved
REQUIRE_THROWS_WITH(
Array::consolidate(ctx, array_name, fragment_uris, 2, &config),
Catch::Matchers::ContainsSubstring(
"Cannot consolidate; The non-empty domain of the fragment"));
} else {
REQUIRE_NOTHROW(
Array::consolidate(ctx, array_name, fragment_uris, 2, &config));

CHECK(
tiledb::test::num_fragments(array_name) ==
number_of_fragments_before_consolidation + 1);
}

remove_array(array_name);
}

DimitrisStaratzis marked this conversation as resolved.
Show resolved Hide resolved
TEST_CASE(
"C++ API: Test consolidation with timestamp and max domain",
"[cppapi][consolidation][timestamp][maxdomain]") {
Expand Down
2 changes: 0 additions & 2 deletions tiledb/sm/c_api/tiledb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1772,8 +1772,6 @@ int32_t tiledb_array_consolidate_fragments(
const char** fragment_uris,
const uint64_t num_fragments,
tiledb_config_t* config) {
// Sanity checks

// Convert the list of fragments to a vector
std::vector<std::string> uris;
uris.reserve(num_fragments);
Expand Down
55 changes: 55 additions & 0 deletions tiledb/sm/consolidator/fragment_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "tiledb/sm/consolidator/fragment_consolidator.h"
#include "tiledb/common/logger.h"
#include "tiledb/sm/array_schema/array_schema.h"
#include "tiledb/sm/enums/array_type.h"
#include "tiledb/sm/enums/datatype.h"
#include "tiledb/sm/enums/query_status.h"
#include "tiledb/sm/enums/query_type.h"
Expand Down Expand Up @@ -407,6 +408,60 @@ Status FragmentConsolidator::consolidate_fragments(
std::to_string(fragment_uris.size()) + " required fragments.");
}

// In case we have a dense array check that the fragments can be consolidated
// without data loss. More specifically, if the union of the non-empty domains
// of the fragments which are selected for consolidation (which is equal to
// the non-empty domain of the resulting consolidated fragment) overlaps with
// any fragment created prior to this subset, then the subset is marked as
// non-consolidatable. Therefore, empty regions in the non-emtpy
// domain of the consolidated fragment will be filled with special values.
// Those values may erroneously overwrite older valid cell values.
if (array_for_reads->array_schema_latest().array_type() == ArrayType::DENSE) {
// Search every other fragment in this array if any of them overlaps in
// ranges and its timestamp range falls between the range of the fragments
// to consolidate throw error

// First calculate the max timestamp among the fragments which are selected
// for consolidation and use it as an upper bound.
uint64_t max_timestamp = std::numeric_limits<uint64_t>::min();
for (const auto& item : to_consolidate) {
const auto& range = item.timestamp_range();
max_timestamp = std::max(max_timestamp, range.second);
}

// Expand domain to full tiles
auto expanded_union_non_empty_domains = union_non_empty_domains;
domain.expand_to_tiles(&expanded_union_non_empty_domains);

// Now iterate all fragments and see if the consolidation can lead to data
// loss
for (auto& frag_info : frag_info_vec) {
// Ignore the fragments that are requested to be consolidated
auto uri = frag_info.uri().last_path_part();
if (to_consolidate_set.count(uri) != 0) {
continue;
}

// Check domain and timestamp overlap. Do timestamp check first as it is
// cheaper. We compare the current fragment's start timestamp against the
// upper bound we calculated previously.
auto timestamp_range{frag_info.timestamp_range()};
bool timestamp_overlap = !(timestamp_range.first > max_timestamp);
if (timestamp_overlap &&
domain.overlap(
expanded_union_non_empty_domains, frag_info.non_empty_domain())) {
throw FragmentConsolidatorException(
"Cannot consolidate; The non-empty domain of the fragment with "
"URI: " +
uri +
" overlaps with the union of the non-empty domains of the "
"fragments selected for consolidation and was created before "
"these fragments. For more information refer to our "
"documentation on consolidation for Dense arrays.");
}
}
}

FragmentConsolidationWorkspace cw(consolidator_memory_tracker_);

// Consolidate the selected fragments
Expand Down
Loading