Skip to content

Commit

Permalink
WIP: query_deserialize() transaction
Browse files Browse the repository at this point in the history
This change ensures that if query_deserialize() returns
a non-OK status, both 'copy_state' and 'query' args are
in the same state that they entered the routine in.

Changes:
1. The existing query_deserialize() implementation has
   been moved to do_query_deserialize().
2. query_deserialize() maintains the same interface.
3. query_deserialize() makes a copy of the
   'copy_state'.
4. query_deserialize() serializes 'query' into a backup
   buffer.
5. query_deserialize() invokes do_query_deserialize(). If
   it fails, query_deserialize() will reset 'copy_state' to
   the copy in change #3 and it will deserialize the backup
   buffer from change #4 into 'query.
6. Reverts the logical changes in #1424 because we now handle
   all deserialization failures, not just user buffer overflow.

This is marked as a work-in-progress because it is untested, and
will not be checked in until tested.
  • Loading branch information
Joe maley committed Nov 14, 2019
1 parent e651eb9 commit 13b9925
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 93 deletions.
3 changes: 1 addition & 2 deletions tiledb/sm/c_api/tiledb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4172,8 +4172,7 @@ int32_t tiledb_deserialize_query(
(tiledb::sm::SerializationType)serialize_type,
client_side == 1,
nullptr,
query->query_,
nullptr)))
query->query_)))
return TILEDB_ERR;

return TILEDB_OK;
Expand Down
6 changes: 6 additions & 0 deletions tiledb/sm/misc/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ inline Status LOG_STATUS(const Status& st) {
}
#endif

/** Logs an error and exits with a non-zero status. */
inline void LOG_FATAL(const std::string& msg) {
global_logger().error(msg.c_str());
exit(1);
}

} // namespace sm
} // namespace tiledb

Expand Down
54 changes: 13 additions & 41 deletions tiledb/sm/rest/rest_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ Status RestClient::submit_query_to_rest(const URI& uri, Query* query) {
// Local state tracking for the current offsets into the user's query buffers.
// This allows resubmission of incomplete queries while appending to the
// same user buffers.
std::unordered_map<std::string, serialization::QueryBufferCopyState>
copy_state;
serialization::CopyState copy_state;

RETURN_NOT_OK(post_query_submit(uri, query, &copy_state));

Expand All @@ -243,10 +242,7 @@ Status RestClient::submit_query_to_rest(const URI& uri, Query* query) {
}

Status RestClient::post_query_submit(
const URI& uri,
Query* query,
std::unordered_map<std::string, serialization::QueryBufferCopyState>*
copy_state) {
const URI& uri, Query* query, serialization::CopyState* copy_state) {
// Get array
const Array* array = query->array();
if (array == nullptr) {
Expand Down Expand Up @@ -276,7 +272,6 @@ Status RestClient::post_query_submit(
// Create the callback that will process the response buffers as they
// are received.
Buffer scratch;
bool user_buffers_overflowed = false;
auto write_cb = std::bind(
&RestClient::post_data_write_cb,
this,
Expand All @@ -286,19 +281,16 @@ Status RestClient::post_query_submit(
std::placeholders::_4,
&scratch,
query,
copy_state,
&user_buffers_overflowed);
copy_state);

const Status st = curlc.post_data(
url, serialization_type_, &serialized, std::move(write_cb));

// When 'user_buffers_overflowed' is true, we will return all of
// the known query data even if the response parsing was unsuccessful.
// This is a temporary work-around to allow returning an incomplete
// query when the client-side buffer is too small to completely
// store the returned query.
if (!user_buffers_overflowed) {
RETURN_NOT_OK(st);
// We can safely return 'query' to the client even when the request
// fails, because we garauntee that 'query' will be in a good state,
// even if we were unable to deserialize all query objects.
if (!st.ok()) {
LOG_STATUS(st);
}

if (copy_state->empty()) {
Expand All @@ -316,9 +308,7 @@ size_t RestClient::post_data_write_cb(
bool* const skip_retries,
Buffer* const scratch,
Query* query,
std::unordered_map<std::string, serialization::QueryBufferCopyState>*
copy_state,
bool* const user_buffers_overflowed) {
serialization::CopyState* copy_state) {
// All return statements in this function must pass through this wrapper.
// This is responsible for two things:
// 1. The 'bytes_processed' may be negative in error scenarios. The negative
Expand All @@ -341,12 +331,6 @@ size_t RestClient::post_data_write_cb(
return bytes_processed;
};

// If 'user_buffers_overflowed' has been set, we should prevent all future
// response processing.
if (*user_buffers_overflowed) {
return return_wrapper(0);
}

// This is the return value that represents the amount of bytes processed
// in 'contents'. This will act as the return value and will always be
// less-than-or-equal-to 'content_nbytes'.
Expand Down Expand Up @@ -423,12 +407,7 @@ size_t RestClient::post_data_write_cb(
// error status.
aux.reset_offset();
st = serialization::query_deserialize(
aux,
serialization_type_,
true,
copy_state,
query,
user_buffers_overflowed);
aux, serialization_type_, true, copy_state, query);
if (!st.ok()) {
scratch->set_offset(scratch->offset() - 8);
scratch->set_size(scratch->offset());
Expand All @@ -440,12 +419,7 @@ size_t RestClient::post_data_write_cb(
// data when deserializing read queries, this will return an
// error status.
st = serialization::query_deserialize(
*scratch,
serialization_type_,
true,
copy_state,
query,
user_buffers_overflowed);
*scratch, serialization_type_, true, copy_state, query);
if (!st.ok()) {
scratch->set_offset(scratch->offset() - 8);
scratch->set_size(scratch->offset());
Expand Down Expand Up @@ -521,7 +495,7 @@ Status RestClient::finalize_query_to_rest(const URI& uri, Query* query) {
// Deserialize data returned
returned_data.set_offset(0);
return serialization::query_deserialize(
returned_data, serialization_type_, true, nullptr, query, nullptr);
returned_data, serialization_type_, true, nullptr, query);
}

Status RestClient::subarray_to_str(
Expand Down Expand Up @@ -598,9 +572,7 @@ Status RestClient::subarray_to_str(
}

Status RestClient::update_attribute_buffer_sizes(
const std::unordered_map<std::string, serialization::QueryBufferCopyState>&
copy_state,
Query* query) const {
const serialization::CopyState& copy_state, Query* query) const {
// Applicable only to reads
if (query->type() != QueryType::READ)
return Status::Ok();
Expand Down
17 changes: 3 additions & 14 deletions tiledb/sm/rest/rest_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,7 @@ class RestClient {
* @return
*/
Status post_query_submit(
const URI& uri,
Query* query,
std::unordered_map<std::string, serialization::QueryBufferCopyState>*
copy_state);
const URI& uri, Query* query, serialization::CopyState* copy_state);

/**
* Callback to invoke as partial, buffered response data is received from
Expand All @@ -200,9 +197,6 @@ class RestClient {
* @param copy_state Map of copy state per attribute. As attribute data is
* copied into user buffers on reads, the state of each attribute in this
* map is updated accordingly.
* @param user_buffers_overflowed If mutated to true, the 'query' and
* 'copy_state' are in incomplete but valid states and may be returned
* to the user regardless of the return status.
* @return Number of acknowledged bytes
*/
size_t post_data_write_cb(
Expand All @@ -212,9 +206,7 @@ class RestClient {
bool* skip_retries,
Buffer* scratch,
Query* query,
std::unordered_map<std::string, serialization::QueryBufferCopyState>*
copy_state,
bool* user_buffers_overflowed);
serialization::CopyState* copy_state);

/**
* Returns a string representation of the given subarray. The format is:
Expand All @@ -240,10 +232,7 @@ class RestClient {
* @return Status
*/
Status update_attribute_buffer_sizes(
const std::unordered_map<
std::string,
serialization::QueryBufferCopyState>& copy_state,
Query* query) const;
const serialization::CopyState& copy_state, Query* query) const;
};

} // namespace sm
Expand Down
87 changes: 56 additions & 31 deletions tiledb/sm/serialization/query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
* The MIT License
*
* @copyright Copyright (c) 2017-2018 TileDB, Inc.
* @copyright Copyright (c) 2017-2019 TileDB, Inc.
* @copyright Copyright (c) 2016 MIT and Intel Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
Expand Down Expand Up @@ -472,11 +472,10 @@ Status query_to_capnp(

Status query_from_capnp(
const capnp::Query::Reader& query_reader,
bool clientside,
const bool clientside,
void* buffer_start,
std::unordered_map<std::string, QueryBufferCopyState>* copy_state,
Query* query,
bool* const user_buffers_overflowed) {
CopyState* const copy_state,
Query* const query) {
using namespace tiledb::sm;

auto type = query->type();
Expand Down Expand Up @@ -575,9 +574,6 @@ Status query_from_capnp(
if ((var_size && (offset_size_left < fixedlen_size ||
data_size_left < varlen_size)) ||
(!var_size && data_size_left < fixedlen_size)) {
if (user_buffers_overflowed != NULL) {
*user_buffers_overflowed = true;
}
return LOG_STATUS(Status::SerializationError(
"Error deserializing read query; buffer too small for attribute "
"'" +
Expand Down Expand Up @@ -842,13 +838,12 @@ Status query_serialize(
STATS_FUNC_OUT(serialization_query_serialize);
}

Status query_deserialize(
Status do_query_deserialize(
const Buffer& serialized_buffer,
SerializationType serialize_type,
bool clientside,
std::unordered_map<std::string, QueryBufferCopyState>* copy_state,
Query* query,
bool* const user_buffers_overflowed) {
const bool clientside,
CopyState* const copy_state,
Query* query) {
STATS_FUNC_IN(serialization_query_deserialize);

if (serialize_type == SerializationType::JSON)
Expand All @@ -868,12 +863,7 @@ Status query_deserialize(
query_builder);
capnp::Query::Reader query_reader = query_builder.asReader();
return query_from_capnp(
query_reader,
clientside,
nullptr,
copy_state,
query,
user_buffers_overflowed);
query_reader, clientside, nullptr, copy_state, query);
}
case SerializationType::CAPNP: {
// Capnp FlatArrayMessageReader requires 64-bit alignment.
Expand All @@ -899,12 +889,7 @@ Status query_deserialize(
auto attribute_buffer_start = reader.getEnd();
auto buffer_start = const_cast<::capnp::word*>(attribute_buffer_start);
return query_from_capnp(
query_reader,
clientside,
buffer_start,
copy_state,
query,
user_buffers_overflowed);
query_reader, clientside, buffer_start, copy_state, query);
}
default:
return LOG_STATUS(Status::SerializationError(
Expand All @@ -923,6 +908,51 @@ Status query_deserialize(
STATS_FUNC_OUT(serialization_query_deserialize);
}

Status query_deserialize(
const Buffer& serialized_buffer,
SerializationType serialize_type,
bool clientside,
CopyState* copy_state,
Query* query) {
// Create an original, serialized copy of the 'query' that we will revert
// to if we are unable to deserialize 'serialized_buffer'.
BufferList original_bufferlist;
Status st =
query_serialize(query, serialize_type, clientside, &original_bufferlist);
if (!st.ok()) {
return st;
}

// The first buffer is always the serialized Query object.
tiledb::sm::Buffer* original_buffer;
st = original_bufferlist.get_buffer(0, &original_buffer);
if (!st.ok()) {
return st;
}

// Similarly, we must create a copy of 'copy_state'.
const CopyState original_copy_state(*copy_state);

// Deserialize 'serialized_buffer'.
st = do_query_deserialize(
serialized_buffer, serialize_type, clientside, copy_state, query);

// If the deserialization failed, deserialize 'serialized_query_original'
// into 'query' to ensure that 'query' is in the state it was before the
// deserialization of 'serialized_buffer' failed.
if (!st.ok()) {
*copy_state = original_copy_state;

const Status st2 = do_query_deserialize(
*original_buffer, serialize_type, clientside, copy_state, query);
if (!st2.ok()) {
LOG_FATAL(st2.message());
}
}

return st;
}

#else

Status query_serialize(Query*, SerializationType, bool, BufferList*) {
Expand All @@ -931,12 +961,7 @@ Status query_serialize(Query*, SerializationType, bool, BufferList*) {
}

Status query_deserialize(
const Buffer&,
SerializationType,
bool,
std::unordered_map<std::string, QueryBufferCopyState>*,
Query*,
bool*) {
const Buffer&, SerializationType, bool, CopyState*, Query*, bool*) {
return LOG_STATUS(Status::SerializationError(
"Cannot serialize; serialization not enabled."));
}
Expand Down
24 changes: 19 additions & 5 deletions tiledb/sm/serialization/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,25 @@ struct QueryBufferCopyState {
: offset_size(0)
, data_size(0) {
}

/** Copy constructor. */
QueryBufferCopyState(const QueryBufferCopyState& rhs)
: offset_size(rhs.offset_size)
, data_size(rhs.data_size) {
}

/** Assignment operator. */
QueryBufferCopyState& operator=(const QueryBufferCopyState& rhs) {
offset_size = rhs.offset_size;
data_size = rhs.data_size;
return *this;
}
};

/** Maps a buffer name to an associated QueryBufferCopyState. */
using CopyState =
std::unordered_map<std::string, serialization::QueryBufferCopyState>;

/**
* Serialize a query
*
Expand All @@ -85,16 +102,13 @@ Status query_serialize(
* query's buffer sizes are updated directly. If it is not null, the buffer
* sizes are not modified but the entries in the map are.
* @param query Query to deserialize into
* @param user_buffers_overflowed If non-null, set to true if the user buffer
* was not large enough to deserialize the query.
*/
Status query_deserialize(
const Buffer& serialized_buffer,
SerializationType serialize_type,
bool clientside,
std::unordered_map<std::string, QueryBufferCopyState>* copy_state,
Query* query,
bool* user_buffers_overflowed);
CopyState* copy_state,
Query* query);

} // namespace serialization
} // namespace sm
Expand Down

0 comments on commit 13b9925

Please sign in to comment.