Skip to content

Commit

Permalink
Prevent consolidation with fragment list in dense arrays if it could …
Browse files Browse the repository at this point in the history
…result in data loss. (#5251)

When using fragment list consolidation with dense arrays, there is a
possibility that consolidating some fragments would lead to data loss
(see
https://docs.tiledb.com/main/background/internal-mechanics/consolidation#algorithm
for more details). This PR fixes this issue.

---
TYPE: IMPROVEMENT
DESC: Prevent consolidation with fragment list in dense arrays if it
could result in data loss.
  • Loading branch information
DimitrisStaratzis authored Sep 9, 2024
1 parent edb48b1 commit 119c46f
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 2 deletions.
163 changes: 163 additions & 0 deletions test/src/unit-cppapi-consolidation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ void create_array(const std::string& array_name) {
Array::create(array_name, schema);
}

void create_array_2d(const std::string& array_name) {
Context ctx;
Domain domain(ctx);
auto d1 = Dimension::create<int>(ctx, "d1", {{1, 10}}, 2);
auto d2 = Dimension::create<int>(ctx, "d2", {{1, 10}}, 2);
domain.add_dimensions(d1);
domain.add_dimensions(d2);
auto a = Attribute::create<int>(ctx, "a");
ArraySchema schema(ctx, TILEDB_DENSE);
schema.set_domain(domain);
schema.add_attributes(a);
Array::create(array_name, schema);
}

void write_array(
const std::string& array_name,
const std::vector<int>& subarray,
Expand Down Expand Up @@ -249,6 +263,155 @@ TEST_CASE(
remove_array(array_name);
}

TEST_CASE(
"C++ API: Test consolidation with wrong fragment list",
"[cppapi][consolidation][fragment-list-consolidation]") {
std::string array_name = "cppapi_consolidation";
remove_array(array_name);

Context ctx;
Config config;
std::string fragment_name1;
std::string fragment_name2;
bool throws = false;
int32_t number_of_fragments_before_consolidation = 0;

SECTION("Throws exception") {
throws = true;
create_array_2d(array_name);
// In this case we request to consolidate frag2 and frag4. We can see that
// frag1 has been created prior to frag3 so the first condition to abort
// the consolidation is satisfied. Additionally, frag1's
// domain intersects with the union of the domains of the
// selected fragments for consolidation(frag2, frag4), so the second
// condition is also satisfied. An exception is expected.
write_array(array_name, {1, 3, 7, 9}, {1, 2, 3, 4, 5, 6, 7, 8, 9});
write_array(array_name, {2, 4, 2, 3}, {10, 11, 12, 13, 14, 15});
write_array(array_name, {3, 5, 4, 5}, {16, 17, 18, 19, 20, 21});
write_array(array_name, {7, 9, 6, 8}, {22, 23, 24, 25, 26, 27, 28, 29, 30});

number_of_fragments_before_consolidation =
tiledb::test::num_fragments(array_name);
CHECK(number_of_fragments_before_consolidation == 4);

FragmentInfo fragment_info(ctx, array_name);
fragment_info.load();
fragment_name1 = fragment_info.fragment_uri(1);
fragment_name2 = fragment_info.fragment_uri(3);
}

SECTION("Throws exception because of overlap in extended domain") {
throws = true;
create_array_2d(array_name);
// In this case we request to consolidate frag1 and frag3. We can see that
// frag2 has been created prior to frag3 so the first condition to abort
// the consolidation is satisfied. Additionally, even though frag2's
// domain does not directly intersect with the union of the domains of the
// selected fragments for consolidation(frag1, frag3), it intersects with
// their expanded domain(Full tiles) because the tile extend is set to 2 and
// the domain range is 1-10.
write_array(array_name, {2, 4, 2, 3}, {1, 2, 3, 4, 5, 6});
write_array(array_name, {10, 10, 4, 4}, {16});
write_array(array_name, {7, 9, 6, 8}, {7, 8, 9, 10, 11, 12, 13, 14, 15});

number_of_fragments_before_consolidation =
tiledb::test::num_fragments(array_name);
CHECK(number_of_fragments_before_consolidation == 3);

FragmentInfo fragment_info(ctx, array_name);
fragment_info.load();
fragment_name1 = fragment_info.fragment_uri(0);
fragment_name2 = fragment_info.fragment_uri(2);
}

SECTION(
"Throws exception because of overlap with already consolidated "
"fragment") {
throws = true;
create_array_2d(array_name);
// In this case we request to consolidate frag1 and frag3. Before this main
// consolidation we run another secondary consolidation between frag2 and
// frag4. The consolidated frag2_frag4 has been created after frag3 but its
// start timestamp is older than frag3's start timestamp so the first
// condition to abort the consolidation is satisfied. Frag2_frag4's domain
// intersects with the union of the domains of the selected fragments for
// consolidation(frag1, frag3), so the second condition is also satisfied.
// An exception is expected.
write_array(array_name, {2, 4, 2, 3}, {10, 11, 12, 13, 14, 15});
write_array(array_name, {8, 9, 3, 4}, {32, 33, 34, 35});
write_array(array_name, {7, 9, 6, 8}, {22, 23, 24, 25, 26, 27, 28, 29, 30});
write_array(array_name, {7, 8, 3, 4}, {31, 32, 33, 34});

FragmentInfo fragment_info(ctx, array_name);
fragment_info.load();
fragment_name1 = fragment_info.fragment_uri(1);
fragment_name2 = fragment_info.fragment_uri(3);

std::string short_fragment_name1 =
fragment_name1.substr(fragment_name1.find_last_of('/') + 1);
std::string short_fragment_name2 =
fragment_name2.substr(fragment_name2.find_last_of('/') + 1);

const char* fragment_uris[2] = {
short_fragment_name1.c_str(), short_fragment_name2.c_str()};

REQUIRE_NOTHROW(
Array::consolidate(ctx, array_name, fragment_uris, 2, &config));

fragment_name1 = fragment_info.fragment_uri(0);
fragment_name2 = fragment_info.fragment_uri(2);

number_of_fragments_before_consolidation =
tiledb::test::num_fragments(array_name);
CHECK(number_of_fragments_before_consolidation == 5);
}

SECTION("Does not throw exception") {
create_array_2d(array_name);
// In this case we request to consolidate frag1 and frag2. We can see that
// frag3 has not been created prior to frag3 so the first condition to abort
// the consolidation is not satisfied. Frag3's domain intersects with the
// union of the domains of the selected fragments for consolidation(frag1,
// frag2), so the second condition is satisfied. An exception is expected.
write_array(array_name, {2, 4, 2, 3}, {10, 11, 12, 13, 14, 15});
write_array(array_name, {7, 9, 6, 8}, {22, 23, 24, 25, 26, 27, 28, 29, 30});
write_array(array_name, {7, 8, 3, 4}, {31, 32, 33, 34});

number_of_fragments_before_consolidation =
tiledb::test::num_fragments(array_name);
CHECK(number_of_fragments_before_consolidation == 3);

FragmentInfo fragment_info(ctx, array_name);
fragment_info.load();
fragment_name1 = fragment_info.fragment_uri(0);
fragment_name2 = fragment_info.fragment_uri(1);
}

std::string short_fragment_name1 =
fragment_name1.substr(fragment_name1.find_last_of('/') + 1);
std::string short_fragment_name2 =
fragment_name2.substr(fragment_name2.find_last_of('/') + 1);

const char* fragment_uris[2] = {
short_fragment_name1.c_str(), short_fragment_name2.c_str()};

if (throws) {
REQUIRE_THROWS_WITH(
Array::consolidate(ctx, array_name, fragment_uris, 2, &config),
Catch::Matchers::ContainsSubstring(
"Cannot consolidate; The non-empty domain of the fragment"));
} else {
REQUIRE_NOTHROW(
Array::consolidate(ctx, array_name, fragment_uris, 2, &config));

CHECK(
tiledb::test::num_fragments(array_name) ==
number_of_fragments_before_consolidation + 1);
}

remove_array(array_name);
}

TEST_CASE(
"C++ API: Test consolidation with timestamp and max domain",
"[cppapi][consolidation][timestamp][maxdomain]") {
Expand Down
2 changes: 0 additions & 2 deletions tiledb/sm/c_api/tiledb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1727,8 +1727,6 @@ int32_t tiledb_array_consolidate_fragments(
const char** fragment_uris,
const uint64_t num_fragments,
tiledb_config_t* config) {
// Sanity checks

// Convert the list of fragments to a vector
std::vector<std::string> uris;
uris.reserve(num_fragments);
Expand Down
55 changes: 55 additions & 0 deletions tiledb/sm/consolidator/fragment_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "tiledb/sm/consolidator/fragment_consolidator.h"
#include "tiledb/common/logger.h"
#include "tiledb/sm/array_schema/array_schema.h"
#include "tiledb/sm/enums/array_type.h"
#include "tiledb/sm/enums/datatype.h"
#include "tiledb/sm/enums/query_status.h"
#include "tiledb/sm/enums/query_type.h"
Expand Down Expand Up @@ -407,6 +408,60 @@ Status FragmentConsolidator::consolidate_fragments(
std::to_string(fragment_uris.size()) + " required fragments.");
}

// In case we have a dense array check that the fragments can be consolidated
// without data loss. More specifically, if the union of the non-empty domains
// of the fragments which are selected for consolidation (which is equal to
// the non-empty domain of the resulting consolidated fragment) overlaps with
// any fragment created prior to this subset, then the subset is marked as
// non-consolidatable. Therefore, empty regions in the non-emtpy
// domain of the consolidated fragment will be filled with special values.
// Those values may erroneously overwrite older valid cell values.
if (array_for_reads->array_schema_latest().array_type() == ArrayType::DENSE) {
// Search every other fragment in this array if any of them overlaps in
// ranges and its timestamp range falls between the range of the fragments
// to consolidate throw error

// First calculate the max timestamp among the fragments which are selected
// for consolidation and use it as an upper bound.
uint64_t max_timestamp = std::numeric_limits<uint64_t>::min();
for (const auto& item : to_consolidate) {
const auto& range = item.timestamp_range();
max_timestamp = std::max(max_timestamp, range.second);
}

// Expand domain to full tiles
auto expanded_union_non_empty_domains = union_non_empty_domains;
domain.expand_to_tiles(&expanded_union_non_empty_domains);

// Now iterate all fragments and see if the consolidation can lead to data
// loss
for (auto& frag_info : frag_info_vec) {
// Ignore the fragments that are requested to be consolidated
auto uri = frag_info.uri().last_path_part();
if (to_consolidate_set.count(uri) != 0) {
continue;
}

// Check domain and timestamp overlap. Do timestamp check first as it is
// cheaper. We compare the current fragment's start timestamp against the
// upper bound we calculated previously.
auto timestamp_range{frag_info.timestamp_range()};
bool timestamp_overlap = !(timestamp_range.first > max_timestamp);
if (timestamp_overlap &&
domain.overlap(
expanded_union_non_empty_domains, frag_info.non_empty_domain())) {
throw FragmentConsolidatorException(
"Cannot consolidate; The non-empty domain of the fragment with "
"URI: " +
uri +
" overlaps with the union of the non-empty domains of the "
"fragments selected for consolidation and was created before "
"these fragments. For more information refer to our "
"documentation on consolidation for Dense arrays.");
}
}
}

FragmentConsolidationWorkspace cw(consolidator_memory_tracker_);

// Consolidate the selected fragments
Expand Down

0 comments on commit 119c46f

Please sign in to comment.