Skip to content

Commit

Permalink
feat: read Kafka message headers (#900)
Browse files Browse the repository at this point in the history
  • Loading branch information
zliang-min authored Feb 19, 2025
1 parent 5645f78 commit 202b7e4
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 39 deletions.
2 changes: 2 additions & 0 deletions src/Common/ProtonCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const String RESERVED_EVENT_SEQUENCE_ID = "_tp_sn";
const String RESERVED_DELTA_FLAG = "_tp_delta";
const String RESERVED_SHARD = "_tp_shard";
const String RESERVED_MESSAGE_KEY = "_tp_message_key";
const String RESERVED_MESSAGE_HEADERS = "_tp_message_headers";
const String RESERVED_ERROR = "_tp_error";
const String RESERVED_EVENT_TIME_API_NAME = "event_time_column";
const std::vector<String> RESERVED_COLUMN_NAMES = {RESERVED_EVENT_TIME, RESERVED_INDEX_TIME};
const String DEFAULT_EVENT_TIME = "now64(3, 'UTC')";
Expand Down
11 changes: 7 additions & 4 deletions src/Storages/ExternalStream/Kafka/Kafka.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteBufferFromFile.h>
Expand Down Expand Up @@ -206,8 +207,6 @@ Kafka::ConfPtr createConfFromSettings(const KafkaExternalStreamSettings & settin

}

const String Kafka::VIRTUAL_COLUMN_MESSAGE_KEY = "_message_key";

Kafka::ConfPtr Kafka::createRdConf(KafkaExternalStreamSettings settings_)
{
if (const auto & ca_pem = settings_.ssl_ca_pem.value; !ca_pem.empty())
Expand Down Expand Up @@ -305,7 +304,11 @@ void Kafka::cacheVirtualColumnNamesAndTypes()
NameAndTypePair(ProtonConsts::RESERVED_PROCESS_TIME, std::make_shared<DataTypeDateTime64>(3, "UTC")));
virtual_column_names_and_types.push_back(NameAndTypePair(ProtonConsts::RESERVED_SHARD, std::make_shared<DataTypeInt32>()));
virtual_column_names_and_types.push_back(NameAndTypePair(ProtonConsts::RESERVED_EVENT_SEQUENCE_ID, std::make_shared<DataTypeInt64>()));
virtual_column_names_and_types.push_back(NameAndTypePair(VIRTUAL_COLUMN_MESSAGE_KEY, std::make_shared<DataTypeString>()));
virtual_column_names_and_types.push_back(NameAndTypePair(ProtonConsts::RESERVED_MESSAGE_KEY, std::make_shared<DataTypeString>()));

DataTypes header_types{/*key_type*/ std::make_shared<DataTypeString>(), /*value_type*/ std::make_shared<DataTypeString>()};
virtual_column_names_and_types.push_back(
NameAndTypePair(ProtonConsts::RESERVED_MESSAGE_HEADERS, std::make_shared<DataTypeMap>(header_types)));
}

std::vector<Int64> Kafka::getOffsets(
Expand Down Expand Up @@ -428,7 +431,7 @@ std::optional<UInt64> Kafka::totalRows(const Settings & settings_ref) const

std::vector<int64_t> Kafka::getLastSNs() const
{
auto produce = const_cast<DB::Kafka*>(this)->getProducer();
auto produce = const_cast<DB::Kafka *>(this)->getProducer();
RdKafka::Topic topic{*produce->getHandle(), topicName()};
auto partitions = topic.getPartitionCount();

Expand Down
106 changes: 71 additions & 35 deletions src/Storages/ExternalStream/Kafka/KafkaSource.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
#include <Checkpoint/CheckpointContext.h>
#include <Checkpoint/CheckpointCoordinator.h>
#include <Common/ProtonCommon.h>
#include <Core/Field.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Storages/ExternalStream/Kafka/Kafka.h>
#include <Storages/ExternalStream/Kafka/KafkaSource.h>
#include <Storages/ExternalStream/Kafka/Topic.h>
#include <Common/ProtonCommon.h>

#include <base/ClockUtils.h>

Expand Down Expand Up @@ -114,7 +115,8 @@ Chunk KafkaSource::generate()

if (!consume_started.test_and_set())
{
LOG_INFO(logger, "Start consuming from topic={} shard={} offset={} high_watermark={}", topic->name(), shard, offset, high_watermark);
LOG_INFO(
logger, "Start consuming from topic={} shard={} offset={} high_watermark={}", topic->name(), shard, offset, high_watermark);
consumer->startConsume(*topic, shard, offset, /*check_offset=*/is_streaming);
}

Expand Down Expand Up @@ -143,17 +145,15 @@ void KafkaSource::readAndProcess()
current_batch.clear();
current_batch.reserve(header.columns());

auto callback = [this](void * rkmessage, size_t total_count, void * data) {
parseMessage(rkmessage, total_count, data);
};
auto callback = [this](void * rkmessage, size_t total_count, void * data) { parseMessage(rkmessage, total_count, data); };

auto error_callback = [this](rd_kafka_resp_err_t err)
{
auto error_callback = [this](rd_kafka_resp_err_t err) {
LOG_ERROR(logger, "Failed to consume topic={} shard={} err={}", topic->name(), shard, rd_kafka_err2str(err));
external_stream_counter->addToReadFailed(1);
};

auto current_batch_last_sn = consumer->consumeBatch(*topic, shard, record_consume_batch_count, record_consume_timeout_ms, callback, error_callback);
auto current_batch_last_sn
= consumer->consumeBatch(*topic, shard, record_consume_batch_count, record_consume_timeout_ms, callback, error_callback);

if (current_batch_last_sn >= 0) /// There are new messages
{
Expand All @@ -176,7 +176,7 @@ void KafkaSource::readAndProcess()
iter = result_chunks_with_sns.begin();
}

void KafkaSource::parseMessage(void * rkmessage, size_t /*total_count*/, void * /*data*/)
void KafkaSource::parseMessage(void * rkmessage, size_t /*total_count*/, void * /*data*/)
{
auto * message = static_cast<rd_kafka_message_t *>(rkmessage);
parseFormat(message);
Expand Down Expand Up @@ -267,18 +267,10 @@ void KafkaSource::initFormatExecutor()
const auto & data_format = kafka.dataFormat();

auto input_format = FormatFactory::instance().getInputFormat(
data_format,
read_buffer,
physical_header,
query_context,
max_block_size,
kafka.getFormatSettings(query_context));
data_format, read_buffer, physical_header, query_context, max_block_size, kafka.getFormatSettings(query_context));

format_executor = std::make_unique<StreamingFormatExecutor>(
physical_header,
std::move(input_format),
[this](const MutableColumns &, Exception & ex) -> size_t
{
physical_header, std::move(input_format), [this](const MutableColumns &, Exception & ex) -> size_t {
format_error = ex.what();
return 0;
});
Expand All @@ -288,23 +280,72 @@ void KafkaSource::calculateColumnPositions()
{
for (size_t pos = 0; const auto & column : header)
{
/// For _tp_message_key and _tp_message_header, they always have the same meaning, users are not allowed to override them.
if (column.name == ProtonConsts::RESERVED_MESSAGE_KEY)
{
virtual_col_value_functions[pos]
= [](const rd_kafka_message_t * kmessage) -> String { return {static_cast<char *>(kmessage->key), kmessage->key_len}; };
virtual_col_types[pos] = column.type;
}
else if (column.name == ProtonConsts::RESERVED_MESSAGE_HEADERS)
{
virtual_col_value_functions[pos] = [](const rd_kafka_message_t * kmessage) -> Field {
/// The returned pointer in *hdrsp is associated with the rkmessage and must not be used after destruction of the message object.
rd_kafka_headers_t * hdrs;
auto err = rd_kafka_message_headers(kmessage, &hdrs);

Map result;
if (err == RD_KAFKA_RESP_ERR__NOENT)
return result;

if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
{
result.reserve(1);
result.push_back(
Tuple{ProtonConsts::RESERVED_ERROR, fmt::format("Failed to parse headers, error: {}", rd_kafka_err2str(err))});
return result;
}

size_t idx = 0;
const char * name;
const void * val;
size_t size;

result.reserve(rd_kafka_header_cnt(hdrs));

while (rd_kafka_header_get_all(hdrs, idx++, &name, &val, &size) == RD_KAFKA_RESP_ERR_NO_ERROR)
{
if (val != nullptr)
{
const auto * val_str = static_cast<const char *>(val);
result.push_back(Tuple{name, val_str});
}
else
result.push_back(Tuple{name, "null"});
}

return result;
};
virtual_col_types[pos] = column.type;
}
/// If a virtual column is explicitely defined as a physical column in the stream definition, we should honor it,
/// just as the virutal columns document says, and users are not recommended to do this (and they still can).
if (std::any_of(non_virtual_header.begin(), non_virtual_header.end(), [&column](auto & non_virtual_column) { return non_virtual_column.name == column.name; }))
else if (std::any_of(non_virtual_header.begin(), non_virtual_header.end(), [&column](auto & non_virtual_column) {
return non_virtual_column.name == column.name;
}))
{
physical_header.insert(column);
}
else if (column.name == ProtonConsts::RESERVED_APPEND_TIME)
{
virtual_col_value_functions[pos]
= [](const rd_kafka_message_t * kmessage) {
rd_kafka_timestamp_type_t ts_type;
auto ts = rd_kafka_message_timestamp(kmessage, &ts_type);
/// Only set the append time when the timestamp is actually an append time.
if (ts_type == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
return Decimal64(ts);
return Decimal64();
};
virtual_col_value_functions[pos] = [](const rd_kafka_message_t * kmessage) {
rd_kafka_timestamp_type_t ts_type;
auto ts = rd_kafka_message_timestamp(kmessage, &ts_type);
/// Only set the append time when the timestamp is actually an append time.
if (ts_type == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
return Decimal64(ts);
return Decimal64();
};
/// We are assuming all virtual timestamp columns have the same data type
virtual_col_types[pos] = column.type;
}
Expand Down Expand Up @@ -335,11 +376,6 @@ void KafkaSource::calculateColumnPositions()
virtual_col_value_functions[pos] = [](const rd_kafka_message_t * kmessage) -> Int64 { return kmessage->offset; };
virtual_col_types[pos] = column.type;
}
else if (column.name == Kafka::VIRTUAL_COLUMN_MESSAGE_KEY)
{
virtual_col_value_functions[pos] = [](const rd_kafka_message_t * kmessage) -> String { return {static_cast<char *>(kmessage->key), kmessage->key_len}; };
virtual_col_types[pos] = column.type;
}
else
{
physical_header.insert(column);
Expand Down

0 comments on commit 202b7e4

Please sign in to comment.