Skip to content

Commit

Permalink
Add support for serialization of vacuum and consolidation requests (#…
Browse files Browse the repository at this point in the history
…3902) (#4229)

This adds Cap'n Proto support for the consolidation and vacuuming
requests.

Co-authored-by: Seth Shelnutt <[email protected]>
  • Loading branch information
KiterLuc and Shelnutt2 authored Aug 2, 2023
1 parent 28e42f8 commit bf4447d
Show file tree
Hide file tree
Showing 13 changed files with 1,647 additions and 0 deletions.
6 changes: 6 additions & 0 deletions tiledb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ set(TILEDB_CORE_SOURCES
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_metadata.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/group.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/query.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/consolidation.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/vacuum.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/stats/global_stats.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/stats/stats.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/storage_manager/context.cc
Expand Down Expand Up @@ -329,6 +331,8 @@ if (TILEDB_SERIALIZATION)
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_metadata.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/group.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/query.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/consolidation.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/vacuum.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/tiledb-rest.capnp.c++
PROPERTIES
COMPILE_FLAGS "-Wno-unused-parameter"
Expand All @@ -346,6 +350,8 @@ if (TILEDB_SERIALIZATION)
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/fragment_metadata.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/group.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/query.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/consolidation.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/vacuum.cc
${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/serialization/tiledb-rest.capnp.c++
PROPERTIES
# C4267: 'argument': conversion from 'size_t' to '<various-lesser-types>', possible loss of data
Expand Down
61 changes: 61 additions & 0 deletions tiledb/sm/rest/rest_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
#include "tiledb/sm/serialization/array_schema_evolution.h"
#include "tiledb/sm/serialization/array.h"
#include "tiledb/sm/serialization/config.h"
#include "tiledb/sm/serialization/consolidation.h"
#include "tiledb/sm/serialization/fragment_info.h"
#include "tiledb/sm/serialization/group.h"
#include "tiledb/sm/serialization/query.h"
#include "tiledb/sm/serialization/tiledb-rest.h"
#include "tiledb/sm/serialization/vacuum.h"
#include "tiledb/sm/rest/curl.h" // must be included last to avoid Windows.h
#endif
// clang-format on
Expand Down Expand Up @@ -1334,6 +1336,55 @@ Status RestClient::ensure_json_null_delimited_string(Buffer* buffer) {
return Status::Ok();
}

Status RestClient::post_consolidation_to_rest(
const URI& uri, const Config& config) {
Buffer buff;
RETURN_NOT_OK(serialization::array_consolidation_request_serialize(
config, serialization_type_, &buff));
// Wrap in a list
BufferList serialized;
RETURN_NOT_OK(serialized.add_buffer(std::move(buff)));

// Init curl and form the URL
Curl curlc(logger_);
std::string array_ns, array_uri;
RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
const std::string cache_key = array_ns + ":" + array_uri;
RETURN_NOT_OK(
curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
"/" + curlc.url_escape(array_uri) + "/consolidate";

// Get the data
Buffer returned_data;
return curlc.post_data(
stats_, url, serialization_type_, &serialized, &returned_data, cache_key);
}

Status RestClient::post_vacuum_to_rest(const URI& uri, const Config& config) {
Buffer buff;
RETURN_NOT_OK(serialization::array_vacuum_request_serialize(
config, serialization_type_, &buff));
// Wrap in a list
BufferList serialized;
RETURN_NOT_OK(serialized.add_buffer(std::move(buff)));

// Init curl and form the URL
Curl curlc(logger_);
std::string array_ns, array_uri;
RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
const std::string cache_key = array_ns + ":" + array_uri;
RETURN_NOT_OK(
curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
"/" + curlc.url_escape(array_uri) + "/vacuum";

// Get the data
Buffer returned_data;
return curlc.post_data(
stats_, url, serialization_type_, &serialized, &returned_data, cache_key);
}

#else

RestClient::RestClient() {
Expand Down Expand Up @@ -1484,6 +1535,16 @@ void RestClient::delete_group_from_rest(const URI&, bool) {
Status_RestError("Cannot use rest client; serialization not enabled."));
}

Status RestClient::post_consolidation_to_rest(const URI&, const Config&) {
throw StatusException(
Status_RestError("Cannot use rest client; serialization not enabled."));
}

Status RestClient::post_vacuum_to_rest(const URI&, const Config&) {
throw StatusException(
Status_RestError("Cannot use rest client; serialization not enabled."));
}

#endif // TILEDB_SERIALIZATION

} // namespace sm
Expand Down
18 changes: 18 additions & 0 deletions tiledb/sm/rest/rest_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,24 @@ class RestClient {
*/
Status post_group_create_to_rest(const URI& uri, Group* group);

/**
* Post array consolidation request to the REST server.
*
* @param uri Array URI
* @param config config
* @return
*/
Status post_consolidation_to_rest(const URI& uri, const Config& config);

/**
* Post array vacuum request to the REST server.
*
* @param uri Array URI
* @param config config
* @return
*/
Status post_vacuum_to_rest(const URI& uri, const Config& config);

private:
/* ********************************* */
/* PRIVATE ATTRIBUTES */
Expand Down
207 changes: 207 additions & 0 deletions tiledb/sm/serialization/consolidation.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/**
* @file consolidation.cc
*
* @section LICENSE
*
* The MIT License
*
* @copyright Copyright (c) 2017-2022 TileDB, Inc.
* @copyright Copyright (c) 2016 MIT and Intel Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* @section DESCRIPTION
*
* This file defines serialization for the Consolidation class
*/

#include <sstream>

// clang-format off
#ifdef TILEDB_SERIALIZATION
#include <capnp/compat/json.h>
#include <capnp/message.h>
#include <capnp/serialize.h>
#include "tiledb/sm/serialization/capnp_utils.h"
#endif
// clang-format on

#include "tiledb/common/logger_public.h"
#include "tiledb/sm/enums/serialization_type.h"
#include "tiledb/sm/serialization/config.h"

using namespace tiledb::common;

namespace tiledb {
namespace sm {
namespace serialization {

#ifdef TILEDB_SERIALIZATION

Status array_consolidation_request_to_capnp(
const Config& config,
capnp::ArrayConsolidationRequest::Builder*
array_consolidation_request_builder) {
auto config_builder = array_consolidation_request_builder->initConfig();
RETURN_NOT_OK(config_to_capnp(config, &config_builder));
return Status::Ok();
}

Status array_consolidation_request_from_capnp(
const capnp::ArrayConsolidationRequest::Reader&
array_consolidation_request_reader,
tdb_unique_ptr<Config>* config) {
auto config_reader = array_consolidation_request_reader.getConfig();
RETURN_NOT_OK(config_from_capnp(config_reader, config));
return Status::Ok();
}

Status array_consolidation_request_serialize(
const Config& config,
SerializationType serialize_type,
Buffer* serialized_buffer) {
try {
::capnp::MallocMessageBuilder message;
capnp::ArrayConsolidationRequest::Builder ArrayConsolidationRequestBuilder =
message.initRoot<capnp::ArrayConsolidationRequest>();
RETURN_NOT_OK(array_consolidation_request_to_capnp(
config, &ArrayConsolidationRequestBuilder));

serialized_buffer->reset_size();
serialized_buffer->reset_offset();

switch (serialize_type) {
case SerializationType::JSON: {
::capnp::JsonCodec json;
kj::String capnp_json = json.encode(ArrayConsolidationRequestBuilder);
const auto json_len = capnp_json.size();
const char nul = '\0';
// size does not include needed null terminator, so add +1
RETURN_NOT_OK(serialized_buffer->realloc(json_len + 1));
RETURN_NOT_OK(serialized_buffer->write(capnp_json.cStr(), json_len));
RETURN_NOT_OK(serialized_buffer->write(&nul, 1));
break;
}
case SerializationType::CAPNP: {
kj::Array<::capnp::word> protomessage = messageToFlatArray(message);
kj::ArrayPtr<const char> message_chars = protomessage.asChars();
const auto nbytes = message_chars.size();
RETURN_NOT_OK(serialized_buffer->realloc(nbytes));
RETURN_NOT_OK(serialized_buffer->write(message_chars.begin(), nbytes));
break;
}
default: {
return LOG_STATUS(Status_SerializationError(
"Error serializing config; Unknown serialization type "
"passed"));
}
}

} catch (kj::Exception& e) {
return LOG_STATUS(Status_SerializationError(
"Error serializing config; kj::Exception: " +
std::string(e.getDescription().cStr())));
} catch (std::exception& e) {
return LOG_STATUS(Status_SerializationError(
"Error serializing config; exception " + std::string(e.what())));
}

return Status::Ok();
}

Status array_consolidation_request_deserialize(
Config** config,
SerializationType serialize_type,
const Buffer& serialized_buffer) {
try {
tdb_unique_ptr<Config> decoded_config = nullptr;

switch (serialize_type) {
case SerializationType::JSON: {
::capnp::JsonCodec json;
::capnp::MallocMessageBuilder message_builder;
capnp::ArrayConsolidationRequest::Builder
array_consolidation_request_builder =
message_builder.initRoot<capnp::ArrayConsolidationRequest>();
json.decode(
kj::StringPtr(static_cast<const char*>(serialized_buffer.data())),
array_consolidation_request_builder);
capnp::ArrayConsolidationRequest::Reader
array_consolidation_request_reader =
array_consolidation_request_builder.asReader();
RETURN_NOT_OK(array_consolidation_request_from_capnp(
array_consolidation_request_reader, &decoded_config));
break;
}
case SerializationType::CAPNP: {
const auto mBytes =
reinterpret_cast<const kj::byte*>(serialized_buffer.data());
::capnp::FlatArrayMessageReader reader(kj::arrayPtr(
reinterpret_cast<const ::capnp::word*>(mBytes),
serialized_buffer.size() / sizeof(::capnp::word)));
capnp::ArrayConsolidationRequest::Reader
array_consolidation_request_reader =
reader.getRoot<capnp::ArrayConsolidationRequest>();
RETURN_NOT_OK(array_consolidation_request_from_capnp(
array_consolidation_request_reader, &decoded_config));
break;
}
default: {
return LOG_STATUS(Status_SerializationError(
"Error deserializing config; Unknown serialization type "
"passed"));
}
}

if (decoded_config == nullptr)
return LOG_STATUS(Status_SerializationError(
"Error serializing config; deserialized config is null"));

*config = decoded_config.release();
} catch (kj::Exception& e) {
return LOG_STATUS(Status_SerializationError(
"Error deserializing config; kj::Exception: " +
std::string(e.getDescription().cStr())));
} catch (std::exception& e) {
return LOG_STATUS(Status_SerializationError(
"Error deserializing config; exception " + std::string(e.what())));
}

return Status::Ok();
}

#else

Status array_consolidation_request_serialize(
const Config&, SerializationType, Buffer*) {
return LOG_STATUS(Status_SerializationError(
"Cannot serialize; serialization not enabled."));
}

Status array_consolidation_request_deserialize(
Config**, SerializationType, const Buffer&) {
return LOG_STATUS(Status_SerializationError(
"Cannot deserialize; serialization not enabled."));
}

#endif // TILEDB_SERIALIZATION

} // namespace serialization
} // namespace sm
} // namespace tiledb
Loading

0 comments on commit bf4447d

Please sign in to comment.