Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for UTC offset serialization/deserialization. #8

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions MODULE.bazel.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cpp/src/ffi_go/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ typedef struct {
typedef struct {
StringView m_log_message;
epoch_time_ms_t m_timestamp;
epoch_time_ms_t m_utc_offset;
} LogEventView;

// NOLINTEND(modernize-use-using)
Expand Down
170 changes: 98 additions & 72 deletions cpp/src/ffi_go/ir/deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <cstddef>
#include <cstdint>
#include <functional>
#include <optional>
#include <span>
#include <string_view>
#include <type_traits>
Expand All @@ -16,6 +17,7 @@
#include <clp/ffi/ir_stream/protocol_constants.hpp>
#include <clp/ir/types.hpp>
#include <clp/string_utils/string_utils.hpp>
#include <clp/time_types.hpp>

#include "ffi_go/api_decoration.h"
#include "ffi_go/defs.h"
Expand All @@ -25,10 +27,6 @@

namespace ffi_go::ir {
using clp::BufferReader;
using clp::ffi::ir_stream::cProtocol::Eof;
using clp::ffi::ir_stream::deserialize_preamble;
using clp::ffi::ir_stream::deserialize_tag;
using clp::ffi::ir_stream::get_encoding_type;
using clp::ffi::ir_stream::IRErrorCode;
using clp::ir::eight_byte_encoded_variable_t;
using clp::ir::four_byte_encoded_variable_t;
Expand Down Expand Up @@ -59,6 +57,18 @@ template <class encoded_variable_t>
size_t* matching_query
) -> int;

/**
* Deserializes packets from the given IR buffer until a new log event is fully deserialized.
* @tparam encoded_variable_t
* @param ir_buf
* @param deserializer The deserializer of the current IR stream to buffer the deserialized
* log event.
* @return ffi::ir_stream::IRErrorCode forwarded from the underlying deserialization methods.
*/
template <class encoded_variable_t>
[[nodiscard]] auto
deserialize_to_next_log_event(BufferReader& ir_buf, Deserializer* deserializer) -> IRErrorCode;

template <class encoded_variable_t>
auto deserialize_log_event(
ByteSpan ir_view,
Expand All @@ -72,39 +82,11 @@ auto deserialize_log_event(
BufferReader ir_buf{static_cast<char const*>(ir_view.m_data), ir_view.m_size};
Deserializer* deserializer{static_cast<Deserializer*>(ir_deserializer)};

clp::ffi::ir_stream::encoded_tag_t tag{};
if (auto const err{deserialize_tag(ir_buf, tag)}; IRErrorCode::IRErrorCode_Success != err) {
return static_cast<int>(err);
}
if (Eof == tag) {
return static_cast<int>(IRErrorCode::IRErrorCode_Eof);
}

IRErrorCode err{};
epoch_time_ms_t timestamp{};
if constexpr (std::is_same_v<encoded_variable_t, eight_byte_encoded_variable_t>) {
err = clp::ffi::ir_stream::eight_byte_encoding::deserialize_log_event(
ir_buf,
tag,
deserializer->m_log_event.m_log_message,
timestamp
);
} else if constexpr (std::is_same_v<encoded_variable_t, four_byte_encoded_variable_t>) {
epoch_time_ms_t timestamp_delta{};
err = clp::ffi::ir_stream::four_byte_encoding::deserialize_log_event(
ir_buf,
tag,
deserializer->m_log_event.m_log_message,
timestamp_delta
);
timestamp = deserializer->m_timestamp + timestamp_delta;
} else {
static_assert(cAlwaysFalse<encoded_variable_t>, "Invalid/unhandled encoding type");
}
if (IRErrorCode::IRErrorCode_Success != err) {
return static_cast<int>(err);
if (auto const err{deserialize_to_next_log_event<encoded_variable_t>(ir_buf, deserializer)};
IRErrorCode::IRErrorCode_Success != err)
{
return err;
}
deserializer->m_timestamp = timestamp;

size_t pos{0};
if (clp::ErrorCode_Success != ir_buf.try_get_pos(pos)) {
Expand All @@ -114,6 +96,7 @@ auto deserialize_log_event(
log_event->m_log_message.m_data = deserializer->m_log_event.m_log_message.data();
log_event->m_log_message.m_size = deserializer->m_log_event.m_log_message.size();
log_event->m_timestamp = deserializer->m_timestamp;
log_event->m_utc_offset = static_cast<epoch_time_ms_t>(deserializer->m_utc_offset.count());
return static_cast<int>(IRErrorCode::IRErrorCode_Success);
}

Expand Down Expand Up @@ -173,40 +156,12 @@ auto deserialize_wildcard_match(
query_fn = [](ffi_go::LogMessage const&) -> std::pair<bool, size_t> { return {true, 0}; };
}

IRErrorCode err{};
while (true) {
clp::ffi::ir_stream::encoded_tag_t tag{};
if (err = deserialize_tag(ir_buf, tag); IRErrorCode::IRErrorCode_Success != err) {
return static_cast<int>(err);
}
if (Eof == tag) {
return static_cast<int>(IRErrorCode::IRErrorCode_Eof);
}

epoch_time_ms_t timestamp{};
if constexpr (std::is_same_v<encoded_variable_t, eight_byte_encoded_variable_t>) {
err = clp::ffi::ir_stream::eight_byte_encoding::deserialize_log_event(
ir_buf,
tag,
deserializer->m_log_event.m_log_message,
timestamp
);
} else if constexpr (std::is_same_v<encoded_variable_t, four_byte_encoded_variable_t>) {
epoch_time_ms_t timestamp_delta{};
err = clp::ffi::ir_stream::four_byte_encoding::deserialize_log_event(
ir_buf,
tag,
deserializer->m_log_event.m_log_message,
timestamp_delta
);
timestamp = deserializer->m_timestamp + timestamp_delta;
} else {
static_assert(cAlwaysFalse<encoded_variable_t>, "Invalid/unhandled encoding type");
if (auto const err{deserialize_to_next_log_event<encoded_variable_t>(ir_buf, deserializer)};
IRErrorCode::IRErrorCode_Success != err)
{
return err;
}
if (IRErrorCode::IRErrorCode_Success != err) {
return static_cast<int>(err);
}
deserializer->m_timestamp = timestamp;

if (time_interval.m_upper <= deserializer->m_timestamp) {
// TODO this is an extremely fragile hack until the CLP ffi ir
Expand All @@ -231,10 +186,78 @@ auto deserialize_wildcard_match(
log_event->m_log_message.m_data = deserializer->m_log_event.m_log_message.data();
log_event->m_log_message.m_size = deserializer->m_log_event.m_log_message.size();
log_event->m_timestamp = deserializer->m_timestamp;
log_event->m_utc_offset = static_cast<epoch_time_ms_t>(deserializer->m_utc_offset.count());
*matching_query = matching_query_idx;
return static_cast<int>(IRErrorCode::IRErrorCode_Success);
}
}

template <class encoded_variable_t>
auto deserialize_to_next_log_event(BufferReader& ir_buf, Deserializer* deserializer)
-> IRErrorCode {
static_assert(
(std::is_same_v<encoded_variable_t, eight_byte_encoded_variable_t>
|| std::is_same_v<encoded_variable_t, four_byte_encoded_variable_t>)
);

clp::ffi::ir_stream::encoded_tag_t tag{};
std::optional<clp::UtcOffset> utc_offset;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using std::optional for deserialized utc offsets, and set it only when we return success. This will ensure the state of deserializer is unchanged if an error happens.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is possible to fail outside of deserialize_to_next_log_event (even when it returns success), this isn't actually a complete defense... We would need to only update the stored utc_offset after we decide to return an updated ir position.


while (true) {
if (IRErrorCode const err{clp::ffi::ir_stream::deserialize_tag(ir_buf, tag)};
IRErrorCode::IRErrorCode_Success != err)
{
return err;
}
if (clp::ffi::ir_stream::cProtocol::Payload::UtcOffsetChange != tag) {
break;
}
clp::UtcOffset deserialized_utc_offset{0};
if (IRErrorCode const err{clp::ffi::ir_stream::deserialize_utc_offset_change(
ir_buf,
deserialized_utc_offset
)};
IRErrorCode::IRErrorCode_Success != err)
{
return err;
}
utc_offset.emplace(deserialized_utc_offset);
}

if (clp::ffi::ir_stream::cProtocol::Eof == tag) {
return IRErrorCode::IRErrorCode_Eof;
}

IRErrorCode err{};
epoch_time_ms_t timestamp{};
if constexpr (std::is_same_v<encoded_variable_t, eight_byte_encoded_variable_t>) {
err = clp::ffi::ir_stream::eight_byte_encoding::deserialize_log_event(
ir_buf,
tag,
deserializer->m_log_event.m_log_message,
timestamp
);
} else {
epoch_time_ms_t timestamp_delta{};
err = clp::ffi::ir_stream::four_byte_encoding::deserialize_log_event(
ir_buf,
tag,
deserializer->m_log_event.m_log_message,
timestamp_delta
);
timestamp = deserializer->m_timestamp + timestamp_delta;
}

if (IRErrorCode::IRErrorCode_Success != err) {
return err;
}

deserializer->m_timestamp = timestamp;
if (utc_offset.has_value()) {
deserializer->m_utc_offset = utc_offset.value();
}
return IRErrorCode::IRErrorCode_Success;
}
} // namespace

CLP_FFI_GO_METHOD auto ir_deserializer_close(void* ir_deserializer) -> void {
Expand All @@ -261,16 +284,19 @@ CLP_FFI_GO_METHOD auto ir_deserializer_new_deserializer_with_preamble(
BufferReader ir_buf{static_cast<char const*>(ir_view.m_data), ir_view.m_size};

bool four_byte_encoding{};
if (IRErrorCode const err{get_encoding_type(ir_buf, four_byte_encoding)};
if (IRErrorCode const err{clp::ffi::ir_stream::get_encoding_type(ir_buf, four_byte_encoding)};
IRErrorCode::IRErrorCode_Success != err)
{
return static_cast<int>(err);
}
*ir_encoding = four_byte_encoding ? 1 : 0;

if (IRErrorCode const err{
deserialize_preamble(ir_buf, *metadata_type, *metadata_pos, *metadata_size)
};
if (IRErrorCode const err{clp::ffi::ir_stream::deserialize_preamble(
ir_buf,
*metadata_type,
*metadata_pos,
*metadata_size
)};
IRErrorCode::IRErrorCode_Success != err)
{
return static_cast<int>(err);
Expand Down
23 changes: 16 additions & 7 deletions cpp/src/ffi_go/ir/serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ffi/ir_stream/encoding_methods.hpp>
#include <clp/ir/types.hpp>
#include <clp/time_types.hpp>

#include "ffi_go/api_decoration.h"
#include "ffi_go/defs.h"
Expand Down Expand Up @@ -45,7 +46,6 @@ template <class encoded_variable_t>
auto new_serializer_with_preamble(
StringView ts_pattern,
StringView ts_pattern_syntax,
StringView time_zone_id,
[[maybe_unused]] epoch_time_ms_t reference_ts,
void** ir_serializer_ptr,
ByteSpan* ir_view
Expand All @@ -64,14 +64,14 @@ auto new_serializer_with_preamble(
success = clp::ffi::ir_stream::eight_byte_encoding::serialize_preamble(
std::string_view{ts_pattern.m_data, ts_pattern.m_size},
std::string_view{ts_pattern_syntax.m_data, ts_pattern_syntax.m_size},
std::string_view{time_zone_id.m_data, time_zone_id.m_size},
"", // ignore timezone ID until removed from CLP core
serializer->m_ir_buf
);
} else if constexpr (std::is_same_v<encoded_variable_t, four_byte_encoded_variable_t>) {
success = clp::ffi::ir_stream::four_byte_encoding::serialize_preamble(
std::string_view{ts_pattern.m_data, ts_pattern.m_size},
std::string_view{ts_pattern_syntax.m_data, ts_pattern_syntax.m_size},
std::string_view{time_zone_id.m_data, time_zone_id.m_size},
"", // ignore timezone ID until removed from CLP core
reference_ts,
serializer->m_ir_buf
);
Expand Down Expand Up @@ -137,14 +137,12 @@ CLP_FFI_GO_METHOD auto ir_serializer_close(void* ir_serializer) -> void {
CLP_FFI_GO_METHOD auto ir_serializer_new_eight_byte_serializer_with_preamble(
StringView ts_pattern,
StringView ts_pattern_syntax,
StringView time_zone_id,
void** ir_serializer_ptr,
ByteSpan* ir_view
) -> int {
return new_serializer_with_preamble<eight_byte_encoded_variable_t>(
ts_pattern,
ts_pattern_syntax,
time_zone_id,
0,
ir_serializer_ptr,
ir_view
Expand All @@ -154,15 +152,13 @@ CLP_FFI_GO_METHOD auto ir_serializer_new_eight_byte_serializer_with_preamble(
CLP_FFI_GO_METHOD auto ir_serializer_new_four_byte_serializer_with_preamble(
StringView ts_pattern,
StringView ts_pattern_syntax,
StringView time_zone_id,
epoch_time_ms_t reference_ts,
void** ir_serializer_ptr,
ByteSpan* ir_view
) -> int {
return new_serializer_with_preamble<four_byte_encoded_variable_t>(
ts_pattern,
ts_pattern_syntax,
time_zone_id,
reference_ts,
ir_serializer_ptr,
ir_view
Expand Down Expand Up @@ -196,4 +192,17 @@ CLP_FFI_GO_METHOD auto ir_serializer_serialize_four_byte_log_event(
ir_view
);
}

CLP_FFI_GO_METHOD auto ir_serializer_serialize_utc_offset_change(
epoch_time_ms_t utc_offset_change,
void* ir_serializer,
ByteSpan* ir_view
) -> void {
Serializer* serializer{static_cast<Serializer*>(ir_serializer)};
clp::UtcOffset const utc_offset{utc_offset_change};
serializer->m_ir_buf.clear();
clp::ffi::ir_stream::serialize_utc_offset_change(utc_offset, serializer->m_ir_buf);
ir_view->m_data = serializer->m_ir_buf.data();
ir_view->m_size = serializer->m_ir_buf.size();
}
} // namespace ffi_go::ir
Loading
Loading