Skip to content

Commit

Permalink
logging enhancement for kafka external stream
Browse files Browse the repository at this point in the history
  • Loading branch information
zliang-min authored Apr 6, 2024
1 parent b38f2ce commit 3f14c1d
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 19 deletions.
5 changes: 3 additions & 2 deletions src/Storages/ExternalStream/Kafka/Consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace RdKafka
{

/// Consumer will take the ownership of `rk_conf`.
Consumer::Consumer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, Poco::Logger * logger_) : logger(logger_)
Consumer::Consumer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, const String & logger_name_prefix)
{
char errstr[512];
auto * conf = rd_kafka_conf_dup(&rk_conf);
Expand All @@ -27,7 +27,8 @@ Consumer::Consumer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, Poco
throw Exception(klog::mapErrorCode(rd_kafka_last_error()), "Failed to create kafka handle: {}", errstr);
}

LOG_INFO(logger, "Created consumer {}", name());
logger = &Poco::Logger::get(fmt::format("{}.{}", logger_name_prefix, name()));
LOG_INFO(logger, "Created consumer");

poller.scheduleOrThrowOnError([this, poll_timeout_ms] { backgroundPoll(poll_timeout_ms); });
}
Expand Down
5 changes: 3 additions & 2 deletions src/Storages/ExternalStream/Kafka/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace RdKafka
class Consumer : boost::noncopyable
{
public:
Consumer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, Poco::Logger * logger_);
Consumer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, const String & logger_name_prefix);
~Consumer()
{
stopped.test_and_set();
Expand All @@ -33,8 +33,9 @@ class Consumer : boost::noncopyable

void consumeBatch(Topic & topic, Int32 partition, uint32_t count, int32_t timeout_ms, Callback callback, ErrorCallback error_callback) const;

private:
std::string name() const { return rd_kafka_name(rk.get()); }

private:
void backgroundPoll(UInt64 poll_timeout_ms) const;

klog::KafkaPtr rk {nullptr, rd_kafka_destroy};
Expand Down
8 changes: 4 additions & 4 deletions src/Storages/ExternalStream/Kafka/ConsumerPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ class ConsumerPool : public IConsumerPool, private PoolBase<Consumer>
using Entry = IConsumerPool::Entry;
using Base = PoolBase<Consumer>;

ConsumerPool(unsigned size, const StorageID & storage_id, rd_kafka_conf_t & conf, UInt64 poll_timeout_ms_, Poco::Logger * consumer_logger_)
ConsumerPool(unsigned size, const StorageID & storage_id, rd_kafka_conf_t & conf, UInt64 poll_timeout_ms_, const String & consumer_logger_name_prefix_)
: Base(size,
&Poco::Logger::get("RdKafkaConsumerPool (" + storage_id.getFullNameNotQuoted() + ")"))
, rd_conf(conf)
, poll_timeout_ms(poll_timeout_ms_)
, consumer_logger(consumer_logger_)
, consumer_logger_name_prefix(consumer_logger_name_prefix_)
{
}

Expand All @@ -68,13 +68,13 @@ class ConsumerPool : public IConsumerPool, private PoolBase<Consumer>
/** Creates a new object to put in the pool. */
std::shared_ptr<Consumer> allocObject() override
{
return std::make_shared<Consumer>(rd_conf, poll_timeout_ms, consumer_logger);
return std::make_shared<Consumer>(rd_conf, poll_timeout_ms, consumer_logger_name_prefix);
}

private:
rd_kafka_conf_t & rd_conf;
UInt64 poll_timeout_ms {0};
Poco::Logger * consumer_logger;
String consumer_logger_name_prefix;
};

}
Expand Down
21 changes: 17 additions & 4 deletions src/Storages/ExternalStream/Kafka/Kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,13 @@ Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> setting

cacheVirtualColumnNamesAndTypes();

rd_kafka_conf_set_log_cb(conf.get(), &Kafka::onLog);
rd_kafka_conf_set_error_cb(conf.get(), &Kafka::onError);
rd_kafka_conf_set_stats_cb(conf.get(), &Kafka::onStats);
rd_kafka_conf_set_throttle_cb(conf.get(), &Kafka::onThrottle);
rd_kafka_conf_set_dr_msg_cb(conf.get(), &KafkaSink::onMessageDelivery);

consumer_pool = std::make_unique<RdKafka::ConsumerPool>(/*size=*/100, storage_id, *conf, settings->poll_waittime_ms.value, logger);
consumer_pool = std::make_unique<RdKafka::ConsumerPool>(/*size=*/100, storage_id, *conf, settings->poll_waittime_ms.value, getLoggerName());

if (!attach)
/// Only validate cluster / topic for external stream creation
Expand Down Expand Up @@ -482,9 +483,11 @@ Pipe Kafka::read(

LOG_INFO(
logger,
"Starting reading {} streams by seeking to {} in dedicated resource group",
"Starting reading {} streams by seeking to {} with {} in dedicated resource group",
pipes.size(),
query_info.seek_to_info->getSeekTo());
query_info.seek_to_info->getSeekTo(),
consumer->name()
);

auto pipe = Pipe::unitePipes(std::move(pipes));
auto min_threads = context->getSettingsRef().min_threads.value;
Expand All @@ -504,7 +507,7 @@ RdKafka::Producer & Kafka::getProducer()
if (producer)
return *producer;

auto producer_ptr = std::make_unique<RdKafka::Producer>(*conf, settings->poll_waittime_ms.value, logger);
auto producer_ptr = std::make_unique<RdKafka::Producer>(*conf, settings->poll_waittime_ms.value, getLoggerName());
producer.swap(producer_ptr);

return *producer;
Expand Down Expand Up @@ -542,6 +545,16 @@ int Kafka::onStats(struct rd_kafka_s * rk, char * json, size_t json_len, void *
return 0;
}

void Kafka::onLog(const struct rd_kafka_s * rk, int level, const char * fac, const char * buf)
{
if (level < 4)
LOG_ERROR(cbLogger(), "{}|{} buf={}", rd_kafka_name(rk), fac, buf);
else if (level == 4)
LOG_WARNING(cbLogger(), "{}|{} buf={}", rd_kafka_name(rk), fac, buf);
else
LOG_INFO(cbLogger(), "{}|{} buf={}", rd_kafka_name(rk), fac, buf);
}

void Kafka::onError(struct rd_kafka_s * rk, int err, const char * reason, void * /*opaque*/)
{
if (err == RD_KAFKA_RESP_ERR__FATAL)
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ExternalStream/Kafka/Kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class Kafka final : public StorageExternalStreamImpl
static int onStats(struct rd_kafka_s * rk, char * json, size_t json_len, void * opaque);
static void onError(struct rd_kafka_s * rk, int err, const char * reason, void * opaque);
static void onThrottle(struct rd_kafka_s * rk, const char * broker_name, int32_t broker_id, int throttle_time_ms, void * opaque);
static void onLog(const struct rd_kafka_s * rk, int level, const char * fac, const char * buf);

Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ASTs & engine_args_, bool attach, ExternalStreamCounterPtr external_stream_counter_, ContextPtr context);
~Kafka() override = default;
Expand Down
6 changes: 5 additions & 1 deletion src/Storages/ExternalStream/Kafka/KafkaSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ KafkaSink::KafkaSink(
, one_message_per_row(kafka.produceOneMessagePerRow())
, topic_refresh_interval_ms(kafka.topicRefreshIntervalMs())
, external_stream_counter(external_stream_counter_)
, logger(&Poco::Logger::get(fmt::format("{}(sink-{})", kafka.getLoggerName(), context->getCurrentQueryId())))
, logger(&Poco::Logger::get(fmt::format("{}.{}", kafka.getLoggerName(), producer.name())))
{
wb = std::make_unique<WriteBufferFromKafkaSink>([this](char * pos, size_t len) { addMessageToBatch(pos, len); });

Expand Down Expand Up @@ -174,6 +174,7 @@ KafkaSink::KafkaSink(

/// Polling message deliveries.
background_jobs.scheduleOrThrowOnError([this, refresh_interval_ms = static_cast<UInt64>(topic_refresh_interval_ms)]() {
LOG_INFO(logger, "Start topic partition count refreshing job");
auto metadata_refresh_stopwatch = Stopwatch();
/// Use a small sleep interval to avoid blocking operation for a long just (in case refresh_interval_ms is big).
auto sleep_ms = std::min(UInt64(500), refresh_interval_ms);
Expand All @@ -195,6 +196,7 @@ KafkaSink::KafkaSink(
LOG_WARNING(logger, "Failed to describe topic, error code: {}", getCurrentExceptionMessage(true, true));
}
}
LOG_INFO(logger, "Stopped topic partition count refreshing job");
});
}

Expand Down Expand Up @@ -342,6 +344,8 @@ void KafkaSink::onFinish()
if (is_finished.test_and_set())
return;

LOG_INFO(logger, "Stopping producing messages");

background_jobs.wait();

/// if there are no outstandings, no need to do flushing
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/ExternalStream/Kafka/KafkaSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ KafkaSource::KafkaSource(
Kafka & kafka_,
const Block & header_,
const StorageSnapshotPtr & storage_snapshot_,
RdKafka::ConsumerPool::Entry consumer_,
const RdKafka::ConsumerPool::Entry & consumer_,
RdKafka::TopicPtr topic_,
Int32 shard_,
Int64 offset_,
Expand All @@ -50,7 +50,7 @@ KafkaSource::KafkaSource(
, ckpt_data(kafka.topicName(), shard)
, external_stream_counter(external_stream_counter_)
, query_context(std::move(query_context_))
, logger(&Poco::Logger::get(fmt::format("{}(source-{})", kafka.getLoggerName(), query_context->getCurrentQueryId())))
, logger(&Poco::Logger::get(fmt::format("{}.{}", kafka.getLoggerName(), consumer->name())))
{
assert(external_stream_counter);

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ExternalStream/Kafka/KafkaSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class KafkaSource final : public Streaming::ISource
Kafka & kafka_,
const Block & header_,
const StorageSnapshotPtr & storage_snapshot_,
RdKafka::ConsumerPool::Entry consumer_,
const RdKafka::ConsumerPool::Entry & consumer_,
RdKafka::TopicPtr topic_,
Int32 shard_,
Int64 offset_,
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/ExternalStream/Kafka/Producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace RdKafka
{

/// Producer will take the ownership of `rk_conf`.
Producer::Producer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, Poco::Logger * logger_) : logger(logger_)
Producer::Producer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, const String & logger_name_prefix)
{
char errstr[512];
auto * conf = rd_kafka_conf_dup(&rk_conf);
Expand All @@ -27,6 +27,9 @@ Producer::Producer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, Poco
throw Exception(klog::mapErrorCode(rd_kafka_last_error()), "Failed to create kafka handle: {}", errstr);
}

logger = &Poco::Logger::get(fmt::format("{}.{}", logger_name_prefix, name()));
LOG_INFO(logger, "Created producer");

poller.scheduleOrThrowOnError([this, poll_timeout_ms] { backgroundPoll(poll_timeout_ms); });
}

Expand Down
5 changes: 3 additions & 2 deletions src/Storages/ExternalStream/Kafka/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ namespace RdKafka
class Producer : boost::noncopyable
{
public:
Producer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, Poco::Logger * logger_);
Producer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, const String & logger_name_prefix);
~Producer()
{
stopped.test_and_set();
}

rd_kafka_t * getHandle() const { return rk.get(); }

private:
std::string name() const { return rd_kafka_name(rk.get()); }

private:
void backgroundPoll(UInt64 poll_timeout_ms) const;

klog::KafkaPtr rk {nullptr, rd_kafka_destroy};
Expand Down

0 comments on commit 3f14c1d

Please sign in to comment.