Skip to content

Commit

Permalink
bugfix: handle format errors in KafkaSource (#513)
Browse files Browse the repository at this point in the history
  • Loading branch information
zliang-min authored Jan 22, 2024
1 parent ef51197 commit 6110219
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 6 deletions.
12 changes: 10 additions & 2 deletions src/Formats/ProtobufReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#if USE_PROTOBUF
# include <IO/ReadHelpers.h>


namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -437,12 +436,21 @@ void ProtobufReader::ignoreGroup()
ErrorCodes::UNKNOWN_PROTOBUF_FORMAT);
}


/// proton: starts
void ProtobufReader::setReadBuffer(ReadBuffer & buf)
{
in.swap(buf);
/// reset states
cursor = 0;
current_message_level = 0;
current_message_end = 0;
parent_message_ends.clear();
field_number = 0;
next_field_number = 0;
field_end = 0;
}
/// proton: ends

}

#endif
2 changes: 2 additions & 0 deletions src/Formats/ProtobufReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class ProtobufReader
void readStringAndAppend(PaddedPODArray<UInt8> & str);

bool eof() const { return in.eof(); }
/// proton: starts
void setReadBuffer(ReadBuffer & buf);
/// proton: ends

private:
void readBinary(void * data, size_t size);
Expand Down
25 changes: 21 additions & 4 deletions src/Storages/ExternalStream/Kafka/KafkaSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,16 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage)
external_stream_counter->addToReadBytes(kmessage->len);
external_stream_counter->addToReadCounts(new_rows);

if (format_error)
{
LOG_ERROR(log, "Failed to parse message at {}: {}", kmessage->offset, format_error.value());
format_error.reset();
}

if (!new_rows)
return;

auto result_block = non_virtual_header.cloneWithColumns(format_executor->getResultColumns());
auto result_block = non_virtual_header.cloneWithColumns(format_executor->getResultColumns());
convert_non_virtual_to_physical_action->execute(result_block);

MutableColumns new_data(result_block.mutateColumns());
Expand Down Expand Up @@ -234,11 +240,22 @@ void KafkaSource::initFormatExecutor(const Kafka * kafka)
{
const auto & data_format = kafka->dataFormat();

auto input_format
= FormatFactory::instance().getInputFormat(data_format, read_buffer, non_virtual_header, query_context, max_block_size);
auto input_format = FormatFactory::instance().getInputFormat(
data_format,
read_buffer,
non_virtual_header,
query_context,
max_block_size,
kafka->getFormatSettings(query_context));

format_executor = std::make_unique<StreamingFormatExecutor>(
non_virtual_header, std::move(input_format), [](const MutableColumns &, Exception &) -> size_t { return 0; });
non_virtual_header,
std::move(input_format),
[this](const MutableColumns &, Exception & ex) -> size_t
{
format_error = ex.what();
return 0;
});

auto converting_dag = ActionsDAG::makeConvertingActions(
non_virtual_header.cloneEmpty().getColumnsWithTypeAndName(),
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ExternalStream/Kafka/KafkaSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class KafkaSource final : public ISource

bool request_virtual_columns = false;

std::optional<String> format_error;
std::vector<Chunk> result_chunks;
std::vector<Chunk>::iterator iter;
MutableColumns current_batch;
Expand Down

0 comments on commit 6110219

Please sign in to comment.