Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timestamp start/end for vacuuming/consolidation #2227

Merged
merged 1 commit into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 230 additions & 13 deletions test/src/unit-capi-array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct ArrayFx {
void create_dense_vector(const std::string& path);
void create_dense_array(const std::string& path);
static std::string random_name(const std::string& prefix);
static int get_fragment_timestamps(const char* path, void* data);
};

static const std::string test_ca_path =
Expand Down Expand Up @@ -117,6 +118,20 @@ std::string ArrayFx::random_name(const std::string& prefix) {
return ss.str();
}

int ArrayFx::get_fragment_timestamps(const char* path, void* data) {
joe-maley marked this conversation as resolved.
Show resolved Hide resolved
auto data_vec = (std::vector<uint64_t>*)data;
std::pair<uint64_t, uint64_t> timestamp_range;
if (tiledb::sm::utils::parse::ends_with(
path, tiledb::sm::constants::ok_file_suffix)) {
auto uri = tiledb::sm::URI(path);
if (tiledb::sm::utils::parse::get_timestamp_range(uri, &timestamp_range)
.ok())
data_vec->push_back(timestamp_range.first);
}

return 1;
}

void ArrayFx::create_sparse_vector(const std::string& path) {
int rc;
int64_t dim_domain[] = {-1, 2};
Expand Down Expand Up @@ -747,10 +762,6 @@ TEST_CASE_METHOD(
tiledb_array_free(&array);
tiledb_query_free(&query);

// Get timestamp after first write
auto timestamp = TILEDB_TIMESTAMP_NOW_MS;
std::this_thread::sleep_for(std::chrono::milliseconds(1));

// ---- UPDATE ----
int buffer_upd[] = {50, 60, 70};
uint64_t buffer_upd_size = sizeof(buffer_upd);
Expand Down Expand Up @@ -790,6 +801,15 @@ TEST_CASE_METHOD(
tiledb_array_free(&array);
tiledb_query_free(&query);

std::vector<uint64_t> fragment_timestamps;
rc = tiledb_vfs_ls(
ctx_,
vfs_,
array_name.c_str(),
&get_fragment_timestamps,
&fragment_timestamps);
CHECK(rc == TILEDB_OK);

// ---- NORMAL READ ----
int buffer_read[10];
uint64_t buffer_read_size = sizeof(buffer_read);
Expand Down Expand Up @@ -909,7 +929,10 @@ TEST_CASE_METHOD(
REQUIRE(tiledb_config_alloc(&cfg, &err) == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_config_set(
cfg, "sm.array.timestamp_end", std::to_string(timestamp).c_str(), &err);
cfg,
"sm.array.timestamp_end",
std::to_string(fragment_timestamps[0]).c_str(),
&err);
REQUIRE(rc == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_array_set_config(ctx_, array, cfg);
Expand Down Expand Up @@ -955,16 +978,17 @@ TEST_CASE_METHOD(
CHECK(buffer_read_size == sizeof(buffer_read_at_c));

// ---- READ AT LATER TIMESTAMP ----
uint64_t first_timestamp = timestamp;
timestamp = TILEDB_TIMESTAMP_NOW_MS;
// Open array
rc = tiledb_array_alloc(ctx_, array_name.c_str(), &array);
CHECK(rc == TILEDB_OK);
err = nullptr;
REQUIRE(tiledb_config_alloc(&cfg, &err) == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_config_set(
cfg, "sm.array.timestamp_end", std::to_string(timestamp).c_str(), &err);
cfg,
"sm.array.timestamp_end",
std::to_string(fragment_timestamps[1]).c_str(),
&err);
REQUIRE(rc == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_array_set_config(ctx_, array, cfg);
Expand All @@ -991,7 +1015,7 @@ TEST_CASE_METHOD(
tiledb_config_get(config, "sm.array.timestamp_end", &timestamp_get, &err);
CHECK(rc == TILEDB_OK);
CHECK(err == nullptr);
CHECK(!strcmp(timestamp_get, std::to_string(timestamp).c_str()));
CHECK(!strcmp(timestamp_get, std::to_string(fragment_timestamps[1]).c_str()));

// Submit query
rc = tiledb_query_alloc(ctx_, array, TILEDB_READ, &query);
Expand Down Expand Up @@ -1024,10 +1048,56 @@ TEST_CASE_METHOD(
rc = tiledb_config_set(
cfg,
"sm.array.timestamp_end",
std::to_string(first_timestamp).c_str(),
std::to_string(fragment_timestamps[1] - 1).c_str(),
&err);
REQUIRE(rc == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_array_set_config(ctx_, array, cfg);
REQUIRE(rc == TILEDB_OK);
rc = tiledb_array_reopen(ctx_, array);
CHECK(rc == TILEDB_OK);

// Submit query
rc = tiledb_query_alloc(ctx_, array, TILEDB_READ, &query);
CHECK(rc == TILEDB_OK);
rc = tiledb_query_set_layout(ctx_, query, TILEDB_ROW_MAJOR);
CHECK(rc == TILEDB_OK);
rc = tiledb_query_set_subarray(ctx_, query, subarray_read);
CHECK(rc == TILEDB_OK);
rc =
tiledb_query_set_buffer(ctx_, query, "a", buffer_read, &buffer_read_size);
CHECK(rc == TILEDB_OK);
rc = tiledb_query_submit(ctx_, query);
CHECK(rc == TILEDB_OK);

// Clean up but don't close the array yet (we will reopen it).
tiledb_query_free(&query);

// Check correctness
int buffer_read_reopen_c[] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
CHECK(!std::memcmp(
buffer_read, buffer_read_reopen_c, sizeof(buffer_read_reopen_c)));
CHECK(buffer_read_size == sizeof(buffer_read_reopen_c));

// ---- REOPEN STARTING AT FIRST TIMESTAMP ----
buffer_read_size = sizeof(buffer_read);

// Reopen array
tiledb_config_free(&cfg);
err = nullptr;
REQUIRE(tiledb_config_alloc(&cfg, &err) == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_config_set(
cfg,
"sm.array.timestamp_start",
std::to_string(fragment_timestamps[0] + 1).c_str(),
&err);
REQUIRE(rc == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_config_set(
cfg, "sm.array.timestamp_end", std::to_string(UINT64_MAX).c_str(), &err);
REQUIRE(rc == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_array_set_config(ctx_, array, cfg);
REQUIRE(rc == TILEDB_OK);
rc = tiledb_array_reopen(ctx_, array);
Expand Down Expand Up @@ -1055,10 +1125,157 @@ TEST_CASE_METHOD(
tiledb_config_free(&config);

// Check correctness
int buffer_read_reopen_c[] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
int buffer_read_reopen_start_c[] = {INT_MIN,
INT_MIN,
INT_MIN,
INT_MIN,
50,
60,
70,
INT_MIN,
INT_MIN,
INT_MIN};
CHECK(!std::memcmp(
buffer_read, buffer_read_reopen_c, sizeof(buffer_read_reopen_c)));
CHECK(buffer_read_size == sizeof(buffer_read_reopen_c));
buffer_read,
buffer_read_reopen_start_c,
sizeof(buffer_read_reopen_start_c)));
CHECK(buffer_read_size == sizeof(buffer_read_reopen_start_c));

// ---- OPEN STARTING AT FIRST TIMESTAMP ----
buffer_read_size = sizeof(buffer_read);

// Open array
rc = tiledb_array_alloc(ctx_, array_name.c_str(), &array);
CHECK(rc == TILEDB_OK);
err = nullptr;
REQUIRE(tiledb_config_alloc(&cfg, &err) == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_config_set(
cfg,
"sm.array.timestamp_start",
std::to_string(fragment_timestamps[1]).c_str(),
&err);
REQUIRE(rc == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_array_set_config(ctx_, array, cfg);
REQUIRE(rc == TILEDB_OK);
if (encryption_type_ == TILEDB_NO_ENCRYPTION) {
rc = tiledb_array_open(ctx_, array, TILEDB_READ);
} else {
rc = tiledb_array_open_with_key(
ctx_,
array,
TILEDB_READ,
encryption_type_,
encryption_key_,
(uint32_t)strlen(encryption_key_));
}
CHECK(rc == TILEDB_OK);

// Submit query
rc = tiledb_query_alloc(ctx_, array, TILEDB_READ, &query);
CHECK(rc == TILEDB_OK);
rc = tiledb_query_set_layout(ctx_, query, TILEDB_ROW_MAJOR);
CHECK(rc == TILEDB_OK);
rc = tiledb_query_set_subarray(ctx_, query, subarray_read);
CHECK(rc == TILEDB_OK);
rc =
tiledb_query_set_buffer(ctx_, query, "a", buffer_read, &buffer_read_size);
CHECK(rc == TILEDB_OK);
rc = tiledb_query_submit(ctx_, query);
CHECK(rc == TILEDB_OK);

// Close array and clean up
rc = tiledb_array_close(ctx_, array);
CHECK(rc == TILEDB_OK);
tiledb_query_free(&query);
tiledb_array_free(&array);
tiledb_config_free(&cfg);

// Check correctness
// Check correctness
int buffer_read_open_start_c[] = {INT_MIN,
INT_MIN,
INT_MIN,
INT_MIN,
50,
60,
70,
INT_MIN,
INT_MIN,
INT_MIN};
CHECK(!std::memcmp(
buffer_read, buffer_read_open_start_c, sizeof(buffer_read_open_start_c)));
CHECK(buffer_read_size == sizeof(buffer_read_open_start_c));

// ---- OPEN STARTING AT PAST LAST TIMESTAMP ----
buffer_read_size = sizeof(buffer_read);

// Open array
rc = tiledb_array_alloc(ctx_, array_name.c_str(), &array);
CHECK(rc == TILEDB_OK);
err = nullptr;
REQUIRE(tiledb_config_alloc(&cfg, &err) == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_config_set(
cfg,
"sm.array.timestamp_start",
std::to_string(fragment_timestamps[1] + 1).c_str(),
&err);
REQUIRE(rc == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_array_set_config(ctx_, array, cfg);
REQUIRE(rc == TILEDB_OK);
if (encryption_type_ == TILEDB_NO_ENCRYPTION) {
rc = tiledb_array_open(ctx_, array, TILEDB_READ);
} else {
rc = tiledb_array_open_with_key(
ctx_,
array,
TILEDB_READ,
encryption_type_,
encryption_key_,
(uint32_t)strlen(encryption_key_));
}
CHECK(rc == TILEDB_OK);

// Submit query
rc = tiledb_query_alloc(ctx_, array, TILEDB_READ, &query);
CHECK(rc == TILEDB_OK);
rc = tiledb_query_set_layout(ctx_, query, TILEDB_ROW_MAJOR);
CHECK(rc == TILEDB_OK);
rc = tiledb_query_set_subarray(ctx_, query, subarray_read);
CHECK(rc == TILEDB_OK);
rc =
tiledb_query_set_buffer(ctx_, query, "a", buffer_read, &buffer_read_size);
CHECK(rc == TILEDB_OK);
rc = tiledb_query_submit(ctx_, query);
CHECK(rc == TILEDB_OK);

// Close array and clean up
rc = tiledb_array_close(ctx_, array);
CHECK(rc == TILEDB_OK);
tiledb_query_free(&query);
tiledb_array_free(&array);
tiledb_config_free(&cfg);

// Check correctness
// Check correctness
int buffer_read_open_start_now_c[] = {INT_MIN,
INT_MIN,
INT_MIN,
INT_MIN,
INT_MIN,
INT_MIN,
INT_MIN,
INT_MIN,
INT_MIN,
INT_MIN};
CHECK(!std::memcmp(
buffer_read,
buffer_read_open_start_now_c,
sizeof(buffer_read_open_start_now_c)));
CHECK(buffer_read_size == sizeof(buffer_read_open_start_now_c));

remove_temp_dir(temp_dir);
}
Expand Down
9 changes: 9 additions & 0 deletions test/src/unit-capi-config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ void check_save_to_file() {
ss << "sm.consolidation.step_min_frags 4294967295\n";
ss << "sm.consolidation.step_size_ratio 0.0\n";
ss << "sm.consolidation.steps 4294967295\n";
ss << "sm.consolidation.timestamp_end " << std::to_string(UINT64_MAX) << "\n";
ss << "sm.consolidation.timestamp_start 0\n";
ss << "sm.dedup_coords false\n";
ss << "sm.enable_signal_handlers true\n";
ss << "sm.io_concurrency_level " << std::thread::hardware_concurrency()
Expand All @@ -249,6 +251,8 @@ void check_save_to_file() {
ss << "sm.sub_partitioner_memory_budget 0\n";
ss << "sm.tile_cache_size 10000000\n";
ss << "sm.vacuum.mode fragments\n";
ss << "sm.vacuum.timestamp_end " << std::to_string(UINT64_MAX) << "\n";
ss << "sm.vacuum.timestamp_start 0\n";
ss << "sm.var_offsets.bitsize 64\n";
ss << "sm.var_offsets.extra_element false\n";
ss << "sm.var_offsets.mode bytes\n";
Expand Down Expand Up @@ -542,13 +546,18 @@ TEST_CASE("C API: Test config iter", "[capi], [config]") {
all_param_values["sm.skip_checksum_validation"] = "false";
all_param_values["sm.consolidation.amplification"] = "1.0";
all_param_values["sm.consolidation.steps"] = "4294967295";
all_param_values["sm.consolidation.timestamp_start"] = "0";
all_param_values["sm.consolidation.timestamp_end"] =
std::to_string(UINT64_MAX);
all_param_values["sm.consolidation.step_min_frags"] = "4294967295";
all_param_values["sm.consolidation.step_max_frags"] = "4294967295";
all_param_values["sm.consolidation.buffer_size"] = "50000000";
all_param_values["sm.consolidation.step_size_ratio"] = "0.0";
all_param_values["sm.consolidation.mode"] = "fragments";
all_param_values["sm.read_range_oob"] = "warn";
all_param_values["sm.vacuum.mode"] = "fragments";
all_param_values["sm.vacuum.timestamp_start"] = "0";
all_param_values["sm.vacuum.timestamp_end"] = std::to_string(UINT64_MAX);
all_param_values["sm.var_offsets.bitsize"] = "32";
all_param_values["sm.var_offsets.extra_element"] = "true";
all_param_values["sm.var_offsets.mode"] = "elements";
Expand Down
Loading