Skip to content

Commit

Permalink
query_deserialize() transaction
Browse files Browse the repository at this point in the history
This change ensures that if query_deserialize() returns
a non-OK status, both 'copy_state' and 'query' args are
in the same state that they entered the routine in.

Changes:
1. The existing query_deserialize() implementation has
   been moved to do_query_deserialize().
2. query_deserialize() maintains the same interface.
3. query_deserialize() makes a copy of the
   'copy_state'.
4. query_deserialize() serializes 'query' into a backup
   buffer.
5. query_deserialize() invokes do_query_deserialize(). If
   it fails, query_deserialize() will reset 'copy_state' to
   the copy in change #3 and it will deserialize the backup
   buffer from change #4 into 'query.
6. Reverts the logical changes in #1424 because we now handle
   all deserialization failures, not just user buffer overflow.
  • Loading branch information
Joe maley committed Nov 15, 2019
1 parent c04b3fe commit 0ef61f8
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 102 deletions.
3 changes: 1 addition & 2 deletions tiledb/sm/c_api/tiledb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4172,8 +4172,7 @@ int32_t tiledb_deserialize_query(
(tiledb::sm::SerializationType)serialize_type,
client_side == 1,
nullptr,
query->query_,
nullptr)))
query->query_)))
return TILEDB_ERR;

return TILEDB_OK;
Expand Down
6 changes: 6 additions & 0 deletions tiledb/sm/misc/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ inline Status LOG_STATUS(const Status& st) {
}
#endif

/** Logs an error and exits with a non-zero status. */
inline void LOG_FATAL(const std::string& msg) {
global_logger().error(msg.c_str());
exit(1);
}

} // namespace sm
} // namespace tiledb

Expand Down
11 changes: 10 additions & 1 deletion tiledb/sm/misc/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ std::string to_str(const T& value);
namespace datatype {

/**
* Check if a given type T is quivalent to the tiledb::sm::DataTtpe
* Check if a given type T is quivalent to the tiledb::sm::DataType
* @tparam T
* @param datatype to compare T to
* @return Status indicating Ok() on equal data types else Status:Error()
Expand Down Expand Up @@ -438,6 +438,15 @@ inline T decode_be(const void* const data) {

} // namespace endianness

/**
* Constructs an object of type T and wraps it in a std::unique_ptr.
* This is available in the std namespace in C++14.
*/
template <typename T, typename... Args>
std::unique_ptr<T> make_unique(Args&&... args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}

} // namespace utils

} // namespace sm
Expand Down
54 changes: 13 additions & 41 deletions tiledb/sm/rest/rest_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ Status RestClient::submit_query_to_rest(const URI& uri, Query* query) {
// Local state tracking for the current offsets into the user's query buffers.
// This allows resubmission of incomplete queries while appending to the
// same user buffers.
std::unordered_map<std::string, serialization::QueryBufferCopyState>
copy_state;
serialization::CopyState copy_state;

RETURN_NOT_OK(post_query_submit(uri, query, &copy_state));

Expand All @@ -243,10 +242,7 @@ Status RestClient::submit_query_to_rest(const URI& uri, Query* query) {
}

Status RestClient::post_query_submit(
const URI& uri,
Query* query,
std::unordered_map<std::string, serialization::QueryBufferCopyState>*
copy_state) {
const URI& uri, Query* query, serialization::CopyState* copy_state) {
// Get array
const Array* array = query->array();
if (array == nullptr) {
Expand Down Expand Up @@ -276,7 +272,6 @@ Status RestClient::post_query_submit(
// Create the callback that will process the response buffers as they
// are received.
Buffer scratch;
bool user_buffers_overflowed = false;
auto write_cb = std::bind(
&RestClient::post_data_write_cb,
this,
Expand All @@ -286,19 +281,16 @@ Status RestClient::post_query_submit(
std::placeholders::_4,
&scratch,
query,
copy_state,
&user_buffers_overflowed);
copy_state);

const Status st = curlc.post_data(
url, serialization_type_, &serialized, std::move(write_cb));

// When 'user_buffers_overflowed' is true, we will return all of
// the known query data even if the response parsing was unsuccessful.
// This is a temporary work-around to allow returning an incomplete
// query when the client-side buffer is too small to completely
// store the returned query.
if (!user_buffers_overflowed) {
RETURN_NOT_OK(st);
// We can safely return 'query' to the client even when the request
// fails, because we garauntee that 'query' will be in a good state,
// even if we were unable to deserialize all query objects.
if (!st.ok()) {
LOG_STATUS(st);
}

if (copy_state->empty()) {
Expand All @@ -316,9 +308,7 @@ size_t RestClient::post_data_write_cb(
bool* const skip_retries,
Buffer* const scratch,
Query* query,
std::unordered_map<std::string, serialization::QueryBufferCopyState>*
copy_state,
bool* const user_buffers_overflowed) {
serialization::CopyState* copy_state) {
// All return statements in this function must pass through this wrapper.
// This is responsible for two things:
// 1. The 'bytes_processed' may be negative in error scenarios. The negative
Expand All @@ -341,12 +331,6 @@ size_t RestClient::post_data_write_cb(
return bytes_processed;
};

// If 'user_buffers_overflowed' has been set, we should prevent all future
// response processing.
if (*user_buffers_overflowed) {
return return_wrapper(0);
}

// This is the return value that represents the amount of bytes processed
// in 'contents'. This will act as the return value and will always be
// less-than-or-equal-to 'content_nbytes'.
Expand Down Expand Up @@ -423,12 +407,7 @@ size_t RestClient::post_data_write_cb(
// error status.
aux.reset_offset();
st = serialization::query_deserialize(
aux,
serialization_type_,
true,
copy_state,
query,
user_buffers_overflowed);
aux, serialization_type_, true, copy_state, query);
if (!st.ok()) {
scratch->set_offset(scratch->offset() - 8);
scratch->set_size(scratch->offset());
Expand All @@ -440,12 +419,7 @@ size_t RestClient::post_data_write_cb(
// data when deserializing read queries, this will return an
// error status.
st = serialization::query_deserialize(
*scratch,
serialization_type_,
true,
copy_state,
query,
user_buffers_overflowed);
*scratch, serialization_type_, true, copy_state, query);
if (!st.ok()) {
scratch->set_offset(scratch->offset() - 8);
scratch->set_size(scratch->offset());
Expand Down Expand Up @@ -521,7 +495,7 @@ Status RestClient::finalize_query_to_rest(const URI& uri, Query* query) {
// Deserialize data returned
returned_data.set_offset(0);
return serialization::query_deserialize(
returned_data, serialization_type_, true, nullptr, query, nullptr);
returned_data, serialization_type_, true, nullptr, query);
}

Status RestClient::subarray_to_str(
Expand Down Expand Up @@ -598,9 +572,7 @@ Status RestClient::subarray_to_str(
}

Status RestClient::update_attribute_buffer_sizes(
const std::unordered_map<std::string, serialization::QueryBufferCopyState>&
copy_state,
Query* query) const {
const serialization::CopyState& copy_state, Query* query) const {
// Applicable only to reads
if (query->type() != QueryType::READ)
return Status::Ok();
Expand Down
17 changes: 3 additions & 14 deletions tiledb/sm/rest/rest_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,7 @@ class RestClient {
* @return
*/
Status post_query_submit(
const URI& uri,
Query* query,
std::unordered_map<std::string, serialization::QueryBufferCopyState>*
copy_state);
const URI& uri, Query* query, serialization::CopyState* copy_state);

/**
* Callback to invoke as partial, buffered response data is received from
Expand All @@ -200,9 +197,6 @@ class RestClient {
* @param copy_state Map of copy state per attribute. As attribute data is
* copied into user buffers on reads, the state of each attribute in this
* map is updated accordingly.
* @param user_buffers_overflowed If mutated to true, the 'query' and
* 'copy_state' are in incomplete but valid states and may be returned
* to the user regardless of the return status.
* @return Number of acknowledged bytes
*/
size_t post_data_write_cb(
Expand All @@ -212,9 +206,7 @@ class RestClient {
bool* skip_retries,
Buffer* scratch,
Query* query,
std::unordered_map<std::string, serialization::QueryBufferCopyState>*
copy_state,
bool* user_buffers_overflowed);
serialization::CopyState* copy_state);

/**
* Returns a string representation of the given subarray. The format is:
Expand All @@ -240,10 +232,7 @@ class RestClient {
* @return Status
*/
Status update_attribute_buffer_sizes(
const std::unordered_map<
std::string,
serialization::QueryBufferCopyState>& copy_state,
Query* query) const;
const serialization::CopyState& copy_state, Query* query) const;
};

} // namespace sm
Expand Down
Loading

0 comments on commit 0ef61f8

Please sign in to comment.