diff --git a/src/KafkaLog/KafkaWALCommon.cpp b/src/KafkaLog/KafkaWALCommon.cpp index 7a981001d89..14e4d88052d 100644 --- a/src/KafkaLog/KafkaWALCommon.cpp +++ b/src/KafkaLog/KafkaWALCommon.cpp @@ -130,7 +130,7 @@ initRdKafkaHandle(rd_kafka_type_t type, KConfParams & params, KafkaWALStats * st if (!kafka_handle) { LOG_ERROR(stats->log, "Failed to create kafka handle, error={}", errstr); - throw DB::Exception("Failed to create kafka handle", mapErrorCode(rd_kafka_last_error())); + throw DB::Exception(mapErrorCode(rd_kafka_last_error()), "Failed to create kafka handle, error={}", errstr); } return kafka_handle; diff --git a/src/KafkaLog/KafkaWALCommon.h b/src/KafkaLog/KafkaWALCommon.h index a1416d78c7f..488524b1710 100644 --- a/src/KafkaLog/KafkaWALCommon.h +++ b/src/KafkaLog/KafkaWALCommon.h @@ -1,6 +1,5 @@ #pragma once -#include "KafkaWALProperties.h" #include "Results.h" #include @@ -30,6 +29,8 @@ using KConfPtr = std::unique_ptr; using KConfCallback = std::function; +using KConfParams = std::vector>; + struct PartitionTimestamp { PartitionTimestamp(int32_t partition_, int64_t timestamp_) : partition(partition_), timestamp(timestamp_) { } diff --git a/src/KafkaLog/KafkaWALPool.cpp b/src/KafkaLog/KafkaWALPool.cpp index 872bc010801..fee81a6fc78 100644 --- a/src/KafkaLog/KafkaWALPool.cpp +++ b/src/KafkaLog/KafkaWALPool.cpp @@ -24,33 +24,6 @@ namespace /// Globals const std::string SYSTEM_WALS_KEY = "cluster_settings.logstore"; const std::string SYSTEM_WALS_KEY_PREFIX = SYSTEM_WALS_KEY + "."; - - -int32_t bucketFetchWaitMax(int32_t fetch_wait_max_ms) -{ - if (fetch_wait_max_ms <= 10) - return 10; - else if (fetch_wait_max_ms <= 20) - return 20; - else if (fetch_wait_max_ms <= 30) - return 30; - else if (fetch_wait_max_ms <= 40) - return 40; - else if (fetch_wait_max_ms <= 50) - return 50; - else if (fetch_wait_max_ms <= 100) - return 100; - else if (fetch_wait_max_ms <= 200) - return 200; - else if (fetch_wait_max_ms <= 300) - return 300; - else if (fetch_wait_max_ms <= 400) - return 400; - else if (fetch_wait_max_ms <= 500) - return 500; - - return 500; -} } KafkaWALPool & KafkaWALPool::instance(const DB::ContextPtr & global_context) @@ -397,74 +370,6 @@ KafkaWALConsumerMultiplexerPtr KafkaWALPool::getOrCreateConsumerMultiplexer(cons return *best_multiplexer; } -KafkaWALSimpleConsumerPtr KafkaWALPool::getOrCreateStreamingExternal(const String & brokers, const KafkaWALAuth & auth, int32_t fetch_wait_max_ms) -{ - assert(!brokers.empty()); - - fetch_wait_max_ms = bucketFetchWaitMax(fetch_wait_max_ms); - - std::lock_guard lock{external_streaming_lock}; - - if (!external_streaming_consumers.contains(brokers)) - /// FIXME, configurable max cached consumers - external_streaming_consumers.emplace(brokers, std::pair(100, KafkaWALSimpleConsumerPtrs{})); - - /// FIXME, remove cached cluster - if (external_streaming_consumers.size() > 1000) - throw DB::Exception("Too many external Kafka cluster registered", DB::ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES); - - auto & consumers = external_streaming_consumers[brokers]; - - for (const auto & consumer : consumers.second) - { - const auto & consumer_settings = consumer->getSettings(); - if (consumer.use_count() == 1 && consumer_settings.fetch_wait_max_ms == fetch_wait_max_ms && consumer_settings.auth == auth) - { - LOG_INFO(log, "Reusing external Kafka consume with settings={}", consumer_settings.string()); - return consumer; - } - } - - /// consumer is used up and if we didn't reach maximum - if (consumers.second.size() < consumers.first) - { - if (!boost::iequals(auth.security_protocol, "plaintext") - && !boost::iequals(auth.security_protocol, "sasl_plaintext") - && !boost::iequals(auth.security_protocol, "sasl_ssl")) - throw DB::Exception( - DB::ErrorCodes::NOT_IMPLEMENTED, - "Invalid logstore kafka settings security_protocol: {}. Only plaintext, sasl_plaintext or sasl_ssl are supported", - auth.security_protocol); - - /// Create one - auto ksettings = std::make_unique(); - - ksettings->fetch_wait_max_ms = fetch_wait_max_ms; - - ksettings->brokers = brokers; - - /// Streaming WALs have a different group ID - ksettings->group_id += "-tp-external-streaming-query-" + std::to_string(consumers.second.size() + 1); - ksettings->auth = auth; - - /// We don't care offset checkpointing for WALs used for streaming processing, - /// No auto commit - ksettings->enable_auto_commit = false; - - LOG_INFO(log, "Create new external Kafka consume with settings={{{}}}", ksettings->string()); - - auto consumer = std::make_shared(std::move(ksettings)); - consumer->startup(); - consumers.second.push_back(consumer); - return consumer; - } - else - { - LOG_ERROR(log, "External streaming processing pool in cluster={} is used up, size={}", brokers, consumers.first); - throw DB::Exception("Max external streaming processing pool size has been reached", DB::ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES); - } -} - std::vector KafkaWALPool::clusters(const KafkaWALContext & ctx) const { std::vector results; diff --git a/src/KafkaLog/KafkaWALPool.h b/src/KafkaLog/KafkaWALPool.h index 9033aa04e51..69e2067f5b2 100644 --- a/src/KafkaLog/KafkaWALPool.h +++ b/src/KafkaLog/KafkaWALPool.h @@ -31,8 +31,6 @@ class KafkaWALPool : private boost::noncopyable KafkaWALSimpleConsumerPtr getOrCreateStreaming(const String & cluster_id); - KafkaWALSimpleConsumerPtr getOrCreateStreamingExternal(const String & brokers, const KafkaWALAuth & auth, int32_t fetch_wait_max_ms = 200); - std::vector clusters(const KafkaWALContext & ctx) const; bool enabled() const { return meta_wal != nullptr; } diff --git a/src/KafkaLog/KafkaWALProperties.cpp b/src/KafkaLog/KafkaWALProperties.cpp deleted file mode 100644 index aab20e8dcdf..00000000000 --- a/src/KafkaLog/KafkaWALProperties.cpp +++ /dev/null @@ -1,50 +0,0 @@ -#include - -#include -#include - -#include -#include -#include - -namespace DB::ErrorCodes -{ -extern const int INVALID_SETTING_VALUE; -} - -namespace klog -{ -KConfParams parseProperties(const String & properties) -{ - KConfParams result; - - if (properties.empty()) - return result; - - std::vector parts; - boost::split(parts, properties, boost::is_any_of(";")); - result.reserve(parts.size()); - - for (const auto & part : parts) - { - /// skip empty part, this happens when there are redundant / trailing ';' - if (unlikely(std::all_of(part.begin(), part.end(), [](char ch) { return isspace(static_cast(ch)); }))) - continue; - - auto equal_pos = part.find('='); - if (unlikely(equal_pos == std::string::npos || equal_pos == 0 || equal_pos == part.size() - 1)) - throw DB::Exception(DB::ErrorCodes::INVALID_SETTING_VALUE, "Invalid property `{}`, expected format: =.", part); - - auto key = part.substr(0, equal_pos); - auto value = part.substr(equal_pos + 1); - - /// no spaces are supposed be around `=`, thus only need to - /// remove the leading spaces of keys and trailing spaces of values - boost::trim_left(key); - boost::trim_right(value); - result.emplace_back(std::move(key), std::move(value)); - } - - return result; -} -} diff --git a/src/KafkaLog/KafkaWALProperties.h b/src/KafkaLog/KafkaWALProperties.h deleted file mode 100644 index 8eee42699bd..00000000000 --- a/src/KafkaLog/KafkaWALProperties.h +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once - -#include - -namespace klog -{ -using KConfParams = std::vector>; - -/// Parses the `properties` setting for Kafka external streams. -/// properties example: -/// message.max.bytes=1024;max.in.flight=1000;group.id=my-group -KConfParams parseProperties(const String & properties); -} diff --git a/src/KafkaLog/KafkaWALSettings.h b/src/KafkaLog/KafkaWALSettings.h index ef81ed35076..f41aa252af5 100644 --- a/src/KafkaLog/KafkaWALSettings.h +++ b/src/KafkaLog/KafkaWALSettings.h @@ -1,8 +1,8 @@ #pragma once #include - #include + #include namespace klog diff --git a/src/KafkaLog/tests/gtest_parse_properties.cpp b/src/KafkaLog/tests/gtest_parse_properties.cpp deleted file mode 100644 index 5c890c16a3e..00000000000 --- a/src/KafkaLog/tests/gtest_parse_properties.cpp +++ /dev/null @@ -1,82 +0,0 @@ -#include -#include - -#include - -#include - -TEST(ParseProperties, empty) -{ - String exp; - auto p = klog::parseProperties(exp); - EXPECT_EQ(0, p.size()); -} - -TEST(ParseProperties, justSemicolons) -{ - String exp = ";;;;"; - auto p = klog::parseProperties(exp); - EXPECT_EQ(0, p.size()); -} - -TEST(ParseProperties, singleProperty) -{ - std::unordered_set cases{ - "key.1=value.1", - ";key.1=value.1", - "key.1=value.1;", - ";key.1=value.1;", - ";;key.1=value.1", - "key.1=value.1;;", - ";;key.1=value.1;;", - }; - - for (const auto & exp : cases) - { - auto p = klog::parseProperties(exp); - EXPECT_EQ(1, p.size()); - - auto kv = p.at(0); - EXPECT_EQ("key.1", kv.first); - EXPECT_EQ("value.1", kv.second); - } -} - -TEST(ParseProperties, multipleProperties) -{ - std::unordered_set cases{ - "key.1=value.1; key.2=value.2;key.3=value.3", - " key.1=value.1; ;key.2=value.2 ; key.3=value.3 ", - "key.1=value.1;key.2=value.2;key.3=value.3;", - ";;key.1=value.1;key.2=value.2;key.3=value.3;;", - }; - - for (const auto & exp : cases) - { - auto p = klog::parseProperties(exp); - EXPECT_EQ(3, p.size()); - - auto kv = p.at(0); - EXPECT_EQ("key.1", kv.first); - EXPECT_EQ("value.1", kv.second); - kv = p.at(1); - EXPECT_EQ("key.2", kv.first); - EXPECT_EQ("value.2", kv.second); - kv = p.at(2); - EXPECT_EQ("key.3", kv.first); - EXPECT_EQ("value.3", kv.second); - } -} - -TEST(ParseProperties, errorCases) -{ - std::unordered_set cases{ - "key", - "key=", - "=value", - "=", - }; - - for (const auto & exp : cases) - EXPECT_THROW(klog::parseProperties(exp), DB::Exception); -} diff --git a/src/Storages/ExternalStream/ExternalStreamSettings.h b/src/Storages/ExternalStream/ExternalStreamSettings.h index cbd72d42334..a7051c1cbac 100644 --- a/src/Storages/ExternalStream/ExternalStreamSettings.h +++ b/src/Storages/ExternalStream/ExternalStreamSettings.h @@ -5,13 +5,13 @@ #include #include +#include + namespace DB { class ASTStorage; -#define EXTERNAL_STREAM_RELATED_SETTINGS(M) \ - M(String, type, "", "External stream type", 0) \ - /* those are kafka related settings */ \ +#define KAFKA_EXTERNAL_STREAM_SETTINGS(M) \ M(String, brokers, "", "A comma-separated list of brokers, for example Kafka brokers.", 0) \ M(String, topic, "", "topic, for example Kafka topic name.", 0) \ M(String, security_protocol, "plaintext", "The protocol to connection external logstore", 0) \ @@ -20,10 +20,12 @@ class ASTStorage; M(String, sasl_mechanism, "PLAIN", "SASL mechanism to use for authentication. Supported: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. Default to PLAIN when SASL is enabled.", 0) \ M(String, ssl_ca_cert_file, "", "The path of ssl ca cert file", 0) \ M(String, properties, "", "A semi-colon-separated key-value pairs for configuring the kafka client used by the external stream. A key-value pair is separated by a equal sign. Example: 'client.id=my-client-id;group.id=my-group-id'. Note, not all properties are supported, please check the document for supported properties.", 0) \ + M(UInt64, poll_waittime_ms, 500, "How long (in milliseconds) should poll waits.", 0) \ M(String, sharding_expr, "", "An expression which will be evaluated on each row of data returned by the query to calculate the an integer which will be used to determine the ID of the partition to which the row of data will be sent. If not set, data are sent to any partition randomly.", 0) \ M(String, message_key, "", "An expression which will be evaluated on each row of data returned by the query to compute a string which will be used as the message key.", 0) \ - M(Bool, one_message_per_row, false, "If set to true, when send data to the Kafka external stream with row-based data format like `JSONEachRow`, it will produce one message per row.", 0) \ - /* those are log related settings */ \ + M(Bool, one_message_per_row, false, "If set to true, when send data to the Kafka external stream with row-based data format like `JSONEachRow`, it will produce one message per row.", 0) + +#define LOG_FILE_EXTERNAL_STREAM_SETTINGS(M) \ M(String, log_files, "", "A comma-separated list of log files", 0) \ M(String, log_dir, "", "log root directory", 0) \ M(String, timestamp_regex, "", "Regex to extract log timestamp", 0) \ @@ -33,10 +35,31 @@ class ASTStorage; M(String, row_delimiter, "\n", "The string to be considered as a delimiter in raw message.", 0) \ M(UInt64, max_row_length, 4096, "Max row length", 0) +#define ALL_EXTERNAL_STREAM_SETTINGS(M) \ + M(String, type, "", "External stream type", 0) \ + KAFKA_EXTERNAL_STREAM_SETTINGS(M) \ + LOG_FILE_EXTERNAL_STREAM_SETTINGS(M) + #define LIST_OF_EXTERNAL_STREAM_SETTINGS(M) \ - EXTERNAL_STREAM_RELATED_SETTINGS(M) \ + ALL_EXTERNAL_STREAM_SETTINGS(M) \ FORMAT_FACTORY_SETTINGS(M) +DECLARE_SETTINGS_TRAITS(KafkaExternalStreamSettingsTraits, KAFKA_EXTERNAL_STREAM_SETTINGS) + +struct KafkaExternalStreamSettings : public BaseSettings +{ + bool usesSASL() const + { + return boost::istarts_with(security_protocol.value, "SASL_"); + } + + /// "SASL_SSL" or "SSL" + bool usesSecureConnection() const + { + return boost::iends_with(security_protocol.value, "SSL"); + } +}; + DECLARE_SETTINGS_TRAITS(ExternalStreamSettingsTraits, LIST_OF_EXTERNAL_STREAM_SETTINGS) /** Settings for the ExternalStream engine. @@ -46,6 +69,19 @@ struct ExternalStreamSettings : public BaseSettings +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int RESOURCE_NOT_FOUND; +} + +namespace RdKafka +{ + +/// Consumer will take the ownership of `rk_conf`. +Consumer::Consumer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, Poco::Logger * logger_) : logger(logger_) +{ + char errstr[512]; + auto * conf = rd_kafka_conf_dup(&rk_conf); + rk.reset(rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr))); + if (!rk) + { + /// librdkafka only take the ownership of `rk_conf` if `rd_kafka_new` succeeds, + /// we need to free it otherwise. + rd_kafka_conf_destroy(conf); + throw Exception(klog::mapErrorCode(rd_kafka_last_error()), "Failed to create kafka handle: {}", errstr); + } + + LOG_INFO(logger, "Created consumer {}", name()); + + poller.scheduleOrThrowOnError([this, poll_timeout_ms] { backgroundPoll(poll_timeout_ms); }); +} + +void Consumer::backgroundPoll(UInt64 poll_timeout_ms) const +{ + LOG_INFO(logger, "Start consumer poll"); + + while (!stopped.test()) + rd_kafka_poll(rk.get(), poll_timeout_ms); + + LOG_INFO(logger, "Consumer poll stopped"); +} + +std::vector Consumer::getOffsetsForTimestamps(const std::string & topic, const std::vector & partition_timestamps, int32_t timeout_ms) const +{ + return klog::getOffsetsForTimestamps(rk.get(), topic, partition_timestamps, timeout_ms); +} + +void Consumer::startConsume(Topic & topic, Int32 parition, Int64 offset) +{ + auto res = rd_kafka_consume_start(topic.getHandle(), parition, offset); + if (res == -1) + { + auto err = rd_kafka_last_error(); + throw Exception(klog::mapErrorCode(err), "Failed to start consuming topic={} parition={} offset={} error={}", topic.name(), parition, offset, rd_kafka_err2str(err)); + } +} + +void Consumer::stopConsume(Topic & topic, Int32 parition) +{ + auto res = rd_kafka_consume_stop(topic.getHandle(), parition); + if (res == -1) + { + auto err = rd_kafka_last_error(); + throw Exception(klog::mapErrorCode(err), "Failed to stop consuming topic={} parition={} error={}", topic.name(), parition, rd_kafka_err2str(err)); + } +} + +void Consumer::consumeBatch(Topic & topic, Int32 partition, uint32_t count, int32_t timeout_ms, Consumer::Callback callback, ErrorCallback error_callback) const +{ + std::unique_ptr rkmessages + { + static_cast(malloc(sizeof(rd_kafka_message_t *) * count)), free + }; + + auto res = rd_kafka_consume_batch(topic.getHandle(), partition, timeout_ms, rkmessages.get(), count); + + if (res < 0) + { + error_callback(rd_kafka_last_error()); + return; + } + + for (ssize_t idx = 0; idx < res; ++idx) + { + auto * rkmessage = rkmessages.get()[idx]; + try + { + if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) + error_callback(rkmessage->err); + else + callback(rkmessage, res, nullptr); + } + catch (...) + { + /// just log the error to make sure the messages get destroyed + LOG_ERROR( + logger, + "Uncaught exception during consuming topic={} partition={} error={}", + topic.name(), partition, DB::getCurrentExceptionMessage(true, true)); + } + + rd_kafka_message_destroy(rkmessage); + } +} + +} + +} diff --git a/src/Storages/ExternalStream/Kafka/Consumer.h b/src/Storages/ExternalStream/Kafka/Consumer.h new file mode 100644 index 00000000000..5c2e4d8231d --- /dev/null +++ b/src/Storages/ExternalStream/Kafka/Consumer.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include + +#include + +namespace DB +{ + +namespace RdKafka +{ + +class Consumer : boost::noncopyable +{ +public: + Consumer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, Poco::Logger * logger_); + ~Consumer() + { + stopped.test_and_set(); + } + + rd_kafka_t * getHandle() const { return rk.get(); } + + std::vector getOffsetsForTimestamps(const std::string & topic, const std::vector & partition_timestamps, int32_t timeout_ms = 5000) const; + + void startConsume(Topic & topic, Int32 parition, Int64 offset = RD_KAFKA_OFFSET_END); + void stopConsume(Topic & topic, Int32 parition); + + using Callback = std::function; + using ErrorCallback = std::function; + + void consumeBatch(Topic & topic, Int32 partition, uint32_t count, int32_t timeout_ms, Callback callback, ErrorCallback error_callback) const; + +private: + std::string name() const { return rd_kafka_name(rk.get()); } + void backgroundPoll(UInt64 poll_timeout_ms) const; + + klog::KafkaPtr rk {nullptr, rd_kafka_destroy}; + ThreadPool poller; + std::atomic_flag stopped; + Poco::Logger * logger; +}; + +} + +} diff --git a/src/Storages/ExternalStream/Kafka/ConsumerPool.h b/src/Storages/ExternalStream/Kafka/ConsumerPool.h new file mode 100644 index 00000000000..3e983189253 --- /dev/null +++ b/src/Storages/ExternalStream/Kafka/ConsumerPool.h @@ -0,0 +1,82 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +namespace RdKafka +{ + +/** Interface for consumer pools. + * + * Usage + * ConsumerPool pool(...); + * + * void thread() + * { + * auto consumer = pool.get(); + * consumer->consumeStart(...); + * } + */ + +class IConsumerPool : private boost::noncopyable +{ +public: + using Entry = PoolBase::Entry; + + virtual ~IConsumerPool() = default; + + /// Selects the consumer to work. If `max_wait_ms` equals -1, it's a blocking call. + virtual Entry get(Int64 max_wait_ms) = 0; +}; + +using ConsumerPoolPtr = std::unique_ptr; + +/** A common consumer pool. + */ +class ConsumerPool : public IConsumerPool, private PoolBase +{ +public: + using Entry = IConsumerPool::Entry; + using Base = PoolBase; + + ConsumerPool(unsigned size, const StorageID & storage_id, rd_kafka_conf_t & conf, UInt64 poll_timeout_ms_, Poco::Logger * consumer_logger_) + : Base(size, + &Poco::Logger::get("RdKafkaConsumerPool (" + storage_id.getFullNameNotQuoted() + ")")) + , rd_conf(conf) + , poll_timeout_ms(poll_timeout_ms_) + , consumer_logger(consumer_logger_) + { + } + + Entry get(Int64 max_wait_ms) override + { + Entry entry; + + if (max_wait_ms < 0) + entry = Base::get(-1); + else + entry = Base::get(Poco::Timespan(max_wait_ms).totalMilliseconds()); + + return entry; + } + +protected: + /** Creates a new object to put in the pool. */ + std::shared_ptr allocObject() override + { + return std::make_shared(rd_conf, poll_timeout_ms, consumer_logger); + } + +private: + rd_kafka_conf_t & rd_conf; + UInt64 poll_timeout_ms {0}; + Poco::Logger * consumer_logger; +}; + +} + +} diff --git a/src/Storages/ExternalStream/Kafka/Kafka.cpp b/src/Storages/ExternalStream/Kafka/Kafka.cpp index c00937e6878..1cc1e8ec25b 100644 --- a/src/Storages/ExternalStream/Kafka/Kafka.cpp +++ b/src/Storages/ExternalStream/Kafka/Kafka.cpp @@ -18,39 +18,192 @@ #include #include #include +#include #include namespace DB { + namespace ErrorCodes { -extern const int OK; +extern const int INVALID_CONFIG_PARAMETER; extern const int INVALID_SETTING_VALUE; -extern const int RESOURCE_NOT_FOUND; +} + +namespace +{ +/// Checks if a config is a unsupported global config, i.e. the config is not supposed +/// to be configured by users. +bool isUnsupportedGlobalConfig(const String & name) +{ + static std::set global_configs + { + "builtin.features", + "metadata.broker.list", + "bootstrap.servers", + "enabled_events", + "error_cb", + "throttle_cb", + "stats_cb", + "log_cb", + "log.queue", + "enable.random.seed", + "background_event_cb", + "socket_cb", + "connect_cb", + "closesocket_cb", + "open_cb", + "resolve_cb", + "opaque", + "default_topic_conf", + "internal.termination.signal", + "api.version.request", + "security.protocol", + "ssl_key", /// requires dedicated API + "ssl_certificate", /// requires dedicated API + "ssl_ca", /// requires dedicated API + "ssl_engine_callback_data", + "ssl.certificate.verify_cb", + "sasl.mechanisms", + "sasl.mechanism", + "sasl.username", + "sasl.username", + "oauthbearer_token_refresh_cb", + "plugin.library.paths", + "interceptors", + "group.id", + "group.instance.id", + "enable.auto.commit", + "enable.auto.offset.store", + "consume_cb", + "rebalance_cb", + "offset_commit_cb", + "enable.partition.eof", + "dr_cb", + "dr_msg_cb", + }; + + return global_configs.contains(name); +} + +/// Checks if a config a unsupported topic config. +bool isUnsupportedTopicConfig(const String & name) +{ + static std::set topic_configs + { + /// producer + "partitioner", + "partitioner_cb", + "msg_order_cmp", + "produce.offset.report", + /// both + "opaque", + "auto.commit.enable", + "enable.auto.commit", + "auto.commit.interval.ms", + "auto.offset.reset", + "offset.store.path", + "offset.store.sync.interval.ms", + "offset.store.method", + }; + + return topic_configs.contains(name); +} + +bool isUnsupportedConfig(const String & name) +{ + return isUnsupportedGlobalConfig(name) || isUnsupportedTopicConfig(name); +} + +Kafka::ConfPtr createConfFromSettings(const KafkaExternalStreamSettings & settings) +{ + if (settings.brokers.value.empty()) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Empty `brokers` setting for kafka external stream"); + + Kafka::ConfPtr conf {rd_kafka_conf_new(), rd_kafka_conf_destroy}; + char errstr[512] {'\0'}; + + auto conf_set = [&](const String & name, const String & value) + { + auto err = rd_kafka_conf_set(conf.get(), name.c_str(), value.c_str(), errstr, sizeof(errstr)); + if (err != RD_KAFKA_CONF_OK) + { + throw Exception( + ErrorCodes::INVALID_CONFIG_PARAMETER, + "Failed to set kafka config `{}` with value `{}` error_code={} error_msg={}", + name, value, err, errstr); + } + }; + + /// 1. Set default values + /// -- For Producer + conf_set("enable.idempotence", "true"); + conf_set("message.timeout.ms", "0" /* infinite */); + /// -- For Consumer + /// If the desired offset is out of range, read from the beginning to avoid data lost. + conf_set("auto.offset.reset", "earliest"); + + /// 2. Process the `properties` setting. The value of `properties` looks like this: + /// 'message.max.bytes=1024;max.in.flight=1000;group.id=my-group' + std::vector parts; + boost::split(parts, settings.properties.value, boost::is_any_of(";")); + + for (const auto & part : parts) + { + /// skip empty part, this happens when there are redundant / trailing ';' + if (unlikely(std::all_of(part.begin(), part.end(), [](char ch) { return isspace(static_cast(ch)); }))) + continue; + + auto equal_pos = part.find('='); + if (unlikely(equal_pos == std::string::npos || equal_pos == 0 || equal_pos == part.size() - 1)) + throw DB::Exception(DB::ErrorCodes::INVALID_SETTING_VALUE, "Invalid property `{}`, expected format: =.", part); + + auto key = part.substr(0, equal_pos); + auto value = part.substr(equal_pos + 1); + + /// no spaces are supposed be around `=`, thus only need to + /// remove the leading spaces of keys and trailing spaces of values + boost::trim_left(key); + boost::trim_right(value); + + if (isUnsupportedConfig(key)) + throw DB::Exception(DB::ErrorCodes::INVALID_SETTING_VALUE, "Unsupported property {}", key); + + conf_set(key, value); + } + + /// 3. Handle the speicific settings have higher priority + conf_set("bootstrap.servers", settings.brokers.value); + + conf_set("security.protocol", settings.security_protocol.value); + if (settings.usesSASL()) + { + conf_set("sasl.mechanism", settings.sasl_mechanism.value); + conf_set("sasl.username", settings.username.value); + conf_set("sasl.password", settings.password.value); + } + + if (settings.usesSecureConnection() && !settings.ssl_ca_cert_file.value.empty()) + conf_set("ssl.ca.location", settings.ssl_ca_cert_file.value); + + return conf; +} + } Kafka::Kafka(IStorage * storage, std::unique_ptr settings_, const ASTs & engine_args_, bool attach, ExternalStreamCounterPtr external_stream_counter_, ContextPtr context) : StorageExternalStreamImpl(std::move(settings_)) , storage_id(storage->getStorageID()) , engine_args(engine_args_) - , kafka_properties(klog::parseProperties(settings->properties.value)) , data_format(StorageExternalStreamImpl::dataFormat()) - , auth_info(std::make_unique( - settings->security_protocol.value, - settings->username.value, - settings->password.value, - settings->sasl_mechanism.value, - settings->ssl_ca_cert_file.value)) , external_stream_counter(external_stream_counter_) - , logger(&Poco::Logger::get("External-" + settings->topic.value)) + , conf(createConfFromSettings(settings->getKafkaSettings())) + , logger(&Poco::Logger::get(getLoggerName())) { assert(settings->type.value == StreamTypes::KAFKA || settings->type.value == StreamTypes::REDPANDA); assert(external_stream_counter); - if (settings->brokers.value.empty()) - throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Empty `brokers` setting for {} external stream", settings->type.value); - if (settings->topic.value.empty()) throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Empty `topic` setting for {} external stream", settings->type.value); @@ -64,10 +217,22 @@ Kafka::Kafka(IStorage * storage, std::unique_ptr setting settings->set("one_message_per_row", true); } + size_t value_size = 8; + char topic_refresh_interval_ms_value[8]{'\0'}; /// max: 3600000 + rd_kafka_conf_get(conf.get(), "topic.metadata.refresh.interval.ms", topic_refresh_interval_ms_value, &value_size); + topic_refresh_interval_ms = std::stoi(topic_refresh_interval_ms_value); + calculateDataFormat(storage); cacheVirtualColumnNamesAndTypes(); + rd_kafka_conf_set_error_cb(conf.get(), &Kafka::onError); + rd_kafka_conf_set_stats_cb(conf.get(), &Kafka::onStats); + rd_kafka_conf_set_throttle_cb(conf.get(), &Kafka::onThrottle); + rd_kafka_conf_set_dr_msg_cb(conf.get(), &KafkaSink::onMessageDelivery); + + consumer_pool = std::make_unique(/*size=*/100, storage_id, *conf, settings->poll_waittime_ms.value, logger); + if (!attach) /// Only validate cluster / topic for external stream creation validate(); @@ -83,83 +248,6 @@ bool Kafka::hasCustomShardingExpr() const { return true; } -Pipe Kafka::read( - const Names & column_names, - const StorageSnapshotPtr & storage_snapshot, - SelectQueryInfo & query_info, - ContextPtr context, - QueryProcessingStage::Enum /*processed_stage*/, - size_t max_block_size, - size_t /*num_streams*/) -{ - /// User can explicitly consume specific kafka partitions by specifying `shards=` setting - /// `SELECT * FROM kafka_stream SETTINGS shards=0,3` - std::vector shards_to_query; - if (!context->getSettingsRef().shards.value.empty()) - { - shards_to_query = parseShards(context->getSettingsRef().shards.value); - validate(shards_to_query); - LOG_INFO(logger, "reading from [{}] partitions for topic={}", fmt::join(shards_to_query, ","), settings->topic.value); - } - else - { - /// We still like to validate / describe the topic if we haven't yet - validate(); - - /// Query all shards / partitions - shards_to_query.reserve(shards); - for (int32_t i = 0; i < shards; ++i) - shards_to_query.push_back(i); - } - - assert(!shards_to_query.empty()); - - Pipes pipes; - pipes.reserve(shards_to_query.size()); - - // const auto & settings_ref = context->getSettingsRef(); - /*auto share_resource_group = (settings_ref.query_resource_group.value == "shared") && (settings_ref.seek_to.value == "latest"); - if (share_resource_group) - { - for (Int32 i = 0; i < shards; ++i) - { - if (!column_names.empty()) - pipes.emplace_back(source_multiplexers->createChannel(i, column_names, metadata_snapshot, context)); - else - pipes.emplace_back(source_multiplexers->createChannel(i, {RESERVED_APPEND_TIME}, metadata_snapshot, context)); - } - } - else*/ - { - /// For queries like `SELECT count(*) FROM tumble(table, now(), 5s) GROUP BY window_end` don't have required column from table. - /// We will need add one - Block header; - if (!column_names.empty()) - header = storage_snapshot->getSampleBlockForColumns(column_names); - else - header = storage_snapshot->getSampleBlockForColumns({ProtonConsts::RESERVED_APPEND_TIME}); - - auto offsets = getOffsets(query_info.seek_to_info, shards_to_query); - assert(offsets.size() == shards_to_query.size()); - for (auto [shard, offset] : std::ranges::views::zip(shards_to_query, offsets)) - pipes.emplace_back( - std::make_shared(this, header, storage_snapshot, context, shard, offset, max_block_size, logger, external_stream_counter)); - } - - LOG_INFO( - logger, - "Starting reading {} streams by seeking to {} in dedicated resource group", - pipes.size(), - query_info.seek_to_info->getSeekTo()); - - auto pipe = Pipe::unitePipes(std::move(pipes)); - auto min_threads = context->getSettingsRef().min_threads.value; - if (min_threads > shards_to_query.size()) - pipe.resize(min_threads); - - return pipe; -} - NamesAndTypesList Kafka::getVirtuals() const { return virtual_column_names_and_types; @@ -174,11 +262,6 @@ void Kafka::cacheVirtualColumnNamesAndTypes() virtual_column_names_and_types.push_back(NameAndTypePair(ProtonConsts::RESERVED_EVENT_SEQUENCE_ID, std::make_shared())); } -klog::KafkaWALSimpleConsumerPtr Kafka::getConsumer(int32_t fetch_wait_max_ms) const -{ - return klog::KafkaWALPool::instance(nullptr).getOrCreateStreamingExternal(settings->brokers.value, *auth_info, fetch_wait_max_ms); -} - std::vector Kafka::getOffsets(const SeekToInfoPtr & seek_to_info, const std::vector & shards_to_query) const { assert(seek_to_info); @@ -197,7 +280,8 @@ std::vector Kafka::getOffsets(const SeekToInfoPtr & seek_to_info, const s for (auto [shard, timestamp] : std::ranges::views::zip(shards_to_query, seek_timestamps)) partition_timestamps.emplace_back(shard, timestamp); - return getConsumer()->offsetsForTimestamps(settings->topic.value, partition_timestamps); + auto consumer = getConsumer(); + return consumer->getOffsetsForTimestamps(settings->topic.value, partition_timestamps); } } @@ -221,8 +305,43 @@ void Kafka::calculateDataFormat(const IStorage * storage) data_format = "JSONEachRow"; } -/// FIXME, refactor out as util and unit test it -std::vector Kafka::parseShards(const std::string & shards_setting) +void Kafka::validateMessageKey(const String & message_key_, IStorage * storage, const ContextPtr & context) +{ + const auto & key = message_key_.c_str(); + Tokens tokens(key, key + message_key_.size(), 0); + IParser::Pos pos(tokens, 0); + Expected expected; + ParserExpression p_id; + if (!p_id.parse(pos, message_key_ast, expected)) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key was not a valid expression, parse failed at {}, expected {}", expected.max_parsed_pos, fmt::join(expected.variants, ", ")); + + if (!pos->isEnd()) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key must be a single expression, got extra characters: {}", expected.max_parsed_pos); + + auto syntax_result = TreeRewriter(context).analyze(message_key_ast, storage->getInMemoryMetadata().getColumns().getAllPhysical()); + auto analyzer = ExpressionAnalyzer(message_key_ast, syntax_result, context).getActions(true); + const auto & block = analyzer->getSampleBlock(); + if (block.columns() != 1) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key expression must return exactly one column"); + + auto type_id = block.getByPosition(0).type->getTypeId(); + if (type_id != TypeIndex::String && type_id != TypeIndex::FixedString) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key must have type of string"); +} + +/// Validate the topic still exists, specified partitions are still valid etc +void Kafka::validate() const +{ + auto consumer = getConsumer(); + RdKafka::Topic topic {*consumer->getHandle(), topicName()}; + auto parition_count = topic.getPartitionCount(); + if (parition_count < 1) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Topic has no paritions, topic={}", topicName()); +} + +namespace +{ +std::vector parseShards(const std::string & shards_setting) { std::vector shard_strings; boost::split(shard_strings, shards_setting, boost::is_any_of(",")); @@ -251,63 +370,138 @@ std::vector Kafka::parseShards(const std::string & shards_setting) return specified_shards; } -void Kafka::validateMessageKey(const String & message_key_, IStorage * storage, const ContextPtr & context) +std::vector getShardsToQuery(const ContextPtr & context, Int32 parition_count) { - const auto & key = message_key_.c_str(); - Tokens tokens(key, key + message_key_.size(), 0); - IParser::Pos pos(tokens, 0); - Expected expected; - ParserExpression p_id; - if (!p_id.parse(pos, message_key_ast, expected)) - throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key was not a valid expression, parse failed at {}, expected {}", expected.max_parsed_pos, fmt::join(expected.variants, ", ")); - - if (!pos->isEnd()) - throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key must be a single expression, got extra characters: {}", expected.max_parsed_pos); - - auto syntax_result = TreeRewriter(context).analyze(message_key_ast, storage->getInMemoryMetadata().getColumns().getAllPhysical()); - auto analyzer = ExpressionAnalyzer(message_key_ast, syntax_result, context).getActions(true); - const auto & block = analyzer->getSampleBlock(); - if (block.columns() != 1) - throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key expression must return exactly one column"); + std::vector ret; + if (const auto & shards_exp = context->getSettingsRef().shards.value; !shards_exp.empty()) + { + ret = parseShards(shards_exp); + /// Make sure they are valid. + for (auto shard : ret) + { + if (shard >= parition_count) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, + "Invalid topic partition {}, the topic has only {} partitions", shard, parition_count); + } + } + else + { + /// Query all available shards / partitions + ret.reserve(parition_count); + for (int32_t i = 0; i < parition_count; ++i) + ret.push_back(i); + } - auto type_id = block.getByPosition(0).type->getTypeId(); - if (type_id != TypeIndex::String && type_id != TypeIndex::FixedString) - throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key must have type of string"); + return ret; +} } -/// Validate the topic still exists, specified partitions are still valid etc -void Kafka::validate(const std::vector & shards_to_query) +Pipe Kafka::read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum /*processed_stage*/, + size_t max_block_size, + size_t /*num_streams*/) { - if (shards == 0) + auto consumer = getConsumer(); + /// The topic_ptr can be shared between all the sources in the same pipe, because each source reads from a different partition. + auto topic_ptr = std::make_shared(*consumer->getHandle(), topicName()); + + /// User can explicitly consume specific kafka partitions by specifying `shards=` setting + /// `SELECT * FROM kafka_stream SETTINGS shards=0,3` + auto shards_to_query = getShardsToQuery(context, topic_ptr->getPartitionCount()); + assert(!shards_to_query.empty()); + LOG_INFO(logger, "Reading from partitions [{}] of topic {}", fmt::join(shards_to_query, ","), topicName()); + + Pipes pipes; + pipes.reserve(shards_to_query.size()); + + // const auto & settings_ref = context->getSettingsRef(); + /*auto share_resource_group = (settings_ref.query_resource_group.value == "shared") && (settings_ref.seek_to.value == "latest"); + if (share_resource_group) { - std::scoped_lock lock(shards_mutex); - /// Recheck again in case in-parallel query chimes in and init the shards already - if (shards == 0) + for (Int32 i = 0; i < shards; ++i) { - /// We haven't describe the topic yet - auto result = getConsumer()->describe(settings->topic.value); - if (result.err != ErrorCodes::OK) - throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "{} topic doesn't exist", settings->topic.value); - - shards = result.partitions; + if (!column_names.empty()) + pipes.emplace_back(source_multiplexers->createChannel(i, column_names, metadata_snapshot, context)); + else + pipes.emplace_back(source_multiplexers->createChannel(i, {RESERVED_APPEND_TIME}, metadata_snapshot, context)); } } - - if (!shards_to_query.empty()) + else*/ { - /// User specified specific partitions to consume. - /// Make sure they are valid. - for (auto shard : shards_to_query) - { - if (shard >= shards) - throw Exception( - ErrorCodes::INVALID_SETTING_VALUE, - "Invalid topic partition {} for topic {}, biggest partition is {}", + /// For queries like `SELECT count(*) FROM tumble(table, now(), 5s) GROUP BY window_end` don't have required column from table. + /// We will need add one + Block header; + if (!column_names.empty()) + header = storage_snapshot->getSampleBlockForColumns(column_names); + else + header = storage_snapshot->getSampleBlockForColumns({ProtonConsts::RESERVED_EVENT_TIME}); + + auto offsets = getOffsets(query_info.seek_to_info, shards_to_query); + assert(offsets.size() == shards_to_query.size()); + + for (auto [shard, offset] : std::ranges::views::zip(shards_to_query, offsets)) + pipes.emplace_back( + std::make_shared( + *this, + header, + storage_snapshot, + consumer, + topic_ptr, shard, - settings->topic.value, - shards); - } + offset, + max_block_size, + external_stream_counter, + context)); } + + LOG_INFO( + logger, + "Starting reading {} streams by seeking to {} in dedicated resource group", + pipes.size(), + query_info.seek_to_info->getSeekTo()); + + auto pipe = Pipe::unitePipes(std::move(pipes)); + auto min_threads = context->getSettingsRef().min_threads.value; + if (min_threads > shards_to_query.size()) + pipe.resize(min_threads); + + return pipe; +} + +RdKafka::Producer & Kafka::getProducer() +{ + if (producer) + return *producer; + + std::scoped_lock lock(producer_mutex); + /// Check again in case of losing the race + if (producer) + return *producer; + + auto producer_ptr = std::make_unique(*conf, settings->poll_waittime_ms.value, logger); + producer.swap(producer_ptr); + + return *producer; +} + +RdKafka::Topic & Kafka::getProducerTopic() +{ + if (producer_topic) + return *producer_topic; + + std::scoped_lock lock(producer_mutex); + /// Check again in case of losing the race + if (producer_topic) + return *producer_topic; + + auto topic_ptr = std::make_unique(*getProducer().getHandle(), topicName()); + producer_topic.swap(topic_ptr); + + return *producer_topic; } SinkToStoragePtr Kafka::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) @@ -315,6 +509,34 @@ SinkToStoragePtr Kafka::write(const ASTPtr & /*query*/, const StorageMetadataPtr /// always validate before actual use validate(); return std::make_shared( - this, metadata_snapshot->getSampleBlock(), shards, message_key_ast, context, logger, external_stream_counter); + *this, metadata_snapshot->getSampleBlock(), message_key_ast, external_stream_counter, context); } + +int Kafka::onStats(struct rd_kafka_s * rk, char * json, size_t json_len, void * /*opaque*/) +{ + std::string s(json, json + json_len); + /// controlled by the `statistics.interval.ms` property, which by default is `0`, meaning no stats + LOG_INFO(cbLogger(), "stats of {}: {}", rd_kafka_name(rk), s); + return 0; +} + +void Kafka::onError(struct rd_kafka_s * rk, int err, const char * reason, void * /*opaque*/) +{ + if (err == RD_KAFKA_RESP_ERR__FATAL) + { + char errstr[512] = {'\0'}; + rd_kafka_fatal_error(rk, errstr, sizeof(errstr)); + LOG_ERROR(cbLogger(), "Fatal error found on {}, error={}", rd_kafka_name(rk), errstr); + } + else + { + LOG_WARNING(cbLogger(), "Error occurred on {}, error={}, reason={}", rd_kafka_name(rk), rd_kafka_err2str(static_cast(err)), reason); + } +} + +void Kafka::onThrottle(struct rd_kafka_s * /*rk*/, const char * broker_name, int32_t broker_id, int throttle_time_ms, void * /*opaque*/) +{ + LOG_WARNING(cbLogger(), "Throttled on broker={}, broker_id={}, throttle_time_ms={}", broker_name, broker_id, throttle_time_ms); +} + } diff --git a/src/Storages/ExternalStream/Kafka/Kafka.h b/src/Storages/ExternalStream/Kafka/Kafka.h index fd6ad30d824..1b97a694d21 100644 --- a/src/Storages/ExternalStream/Kafka/Kafka.h +++ b/src/Storages/ExternalStream/Kafka/Kafka.h @@ -5,6 +5,9 @@ #include #include #include +#include +#include +#include #include namespace DB @@ -12,14 +15,35 @@ namespace DB class IStorage; + class Kafka final : public StorageExternalStreamImpl { public: + using ConfPtr = std::unique_ptr; + + static Poco::Logger * cbLogger() { + static Poco::Logger * logger { &Poco::Logger::get("KafkaExternalStream") }; + return logger; + } + + static int onStats(struct rd_kafka_s * rk, char * json, size_t json_len, void * opaque); + static void onError(struct rd_kafka_s * rk, int err, const char * reason, void * opaque); + static void onThrottle(struct rd_kafka_s * rk, const char * broker_name, int32_t broker_id, int throttle_time_ms, void * opaque); + Kafka(IStorage * storage, std::unique_ptr settings_, const ASTs & engine_args_, bool attach, ExternalStreamCounterPtr external_stream_counter_, ContextPtr context); ~Kafka() override = default; - void startup() override { } - void shutdown() override { } + void startup() override { LOG_INFO(logger, "Starting Kafka External Stream"); } + void shutdown() override { + LOG_INFO(logger, "Shutting down Kafka External Stream"); + /// Must release all resources here rather than relying on the deconstructor. + /// Because the `Kafka` instance will not be destroyed immediately when the external stream gets dropped. + consumer_pool.reset(); + if (producer_topic) + producer_topic.reset(); + if (producer) + producer.reset(); + } bool supportsSubcolumns() const override { return true; } NamesAndTypesList getVirtuals() const override; ExternalStreamCounterPtr getExternalStreamCounter() const override { return external_stream_counter; } @@ -36,37 +60,52 @@ class Kafka final : public StorageExternalStreamImpl SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override; bool produceOneMessagePerRow() const { return settings->one_message_per_row; } + Int32 topicRefreshIntervalMs() const { return topic_refresh_interval_ms; } const String & brokers() const { return settings->brokers.value; } const String & dataFormat() const override { return data_format; } - const String & topic() const { return settings->topic.value; } - const klog::KConfParams & properties() const { return kafka_properties; } - const klog::KafkaWALAuth & auth() const noexcept { return *auth_info; } + const String & topicName() const { return settings->topic.value; } const ASTPtr & shardingExprAst() const { assert(!engine_args.empty()); return engine_args[0]; } bool hasCustomShardingExpr() const; - klog::KafkaWALSimpleConsumerPtr getConsumer(int32_t fetch_wait_max_ms = 200) const; + + RdKafka::Producer & getProducer(); + RdKafka::Topic & getProducerTopic(); + + RdKafka::ConsumerPool::Entry getConsumer() const + { + assert(consumer_pool); + return consumer_pool->get(/*max_wait_ms=*/1000); + } + + String getLoggerName() const { return storage_id.getDatabaseName() == "default" ? storage_id.getTableName() : storage_id.getFullNameNotQuoted(); } private: void calculateDataFormat(const IStorage * storage); void cacheVirtualColumnNamesAndTypes(); std::vector getOffsets(const SeekToInfoPtr & seek_to_info, const std::vector & shards_to_query) const; void validateMessageKey(const String & message_key, IStorage * storage, const ContextPtr & context); - void validate(const std::vector & shards_to_query = {}); - static std::vector parseShards(const std::string & shards_setting); + void validate() const; StorageID storage_id; ASTs engine_args; - klog::KConfParams kafka_properties; String data_format; - const std::unique_ptr auth_info; ExternalStreamCounterPtr external_stream_counter; NamesAndTypesList virtual_column_names_and_types; - std::mutex shards_mutex; - int32_t shards = 0; - ASTPtr message_key_ast; + Int32 topic_refresh_interval_ms = 0; + std::vector shards_from_settings; + + ConfPtr conf; + /// The Producer instance and Topic instance can be used by multiple sinks at the same time, thus we only need one of each. + std::mutex producer_mutex; + std::unique_ptr producer; + std::unique_ptr producer_topic; + /// A Consumer can only be used by one source at the same time (technically speaking, it can be used by multple sources as long as each source read from a different topic, + /// but we will leave this as an enhancement later, probably when we introduce the `Connection` concept), thus we need a consumer pool. + RdKafka::ConsumerPoolPtr consumer_pool; Poco::Logger * logger; }; + } diff --git a/src/Storages/ExternalStream/Kafka/KafkaSink.cpp b/src/Storages/ExternalStream/Kafka/KafkaSink.cpp index 79a678fe69c..48fd7ad7ed0 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSink.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSink.cpp @@ -1,3 +1,4 @@ +#include "Common/Exception.h" #include #include #include @@ -18,8 +19,6 @@ namespace DB namespace ErrorCodes { extern const int CANNOT_WRITE_TO_KAFKA; -extern const int MISSING_ACKNOWLEDGEMENT; -extern const int INVALID_CONFIG_PARAMETER; extern const int TYPE_MISMATCH; extern const int INVALID_SETTING_VALUE; } @@ -124,97 +123,23 @@ IColumn::Selector ChunkSharder::createSelector(Block block, Int32 shard_cnt) con } KafkaSink::KafkaSink( - const Kafka * kafka, + Kafka & kafka, const Block & header, - Int32 initial_partition_cnt, const ASTPtr & message_key_ast, - ContextPtr context, - Poco::Logger * logger_, - ExternalStreamCounterPtr external_stream_counter_) + ExternalStreamCounterPtr external_stream_counter_, + ContextPtr context) : SinkToStorage(header, ProcessorID::ExternalTableDataSinkID) - , partition_cnt(initial_partition_cnt) - , one_message_per_row(kafka->produceOneMessagePerRow()) - , logger(logger_) + , producer(kafka.getProducer()) + , topic(kafka.getProducerTopic()) + , partition_cnt(topic.getPartitionCount()) + , one_message_per_row(kafka.produceOneMessagePerRow()) + , topic_refresh_interval_ms(kafka.topicRefreshIntervalMs()) , external_stream_counter(external_stream_counter_) + , logger(&Poco::Logger::get(fmt::format("{}(sink-{})", kafka.getLoggerName(), context->getCurrentQueryId()))) { - /// default values - std::vector> producer_params{ - {"enable.idempotence", "true"}, - {"message.timeout.ms", "0" /* infinite */}, - }; - - static const std::unordered_set allowed_properties{ - "enable.idempotence", - "message.timeout.ms", - "queue.buffering.max.messages", - "queue.buffering.max.kbytes", - "queue.buffering.max.ms", - "message.max.bytes", - "message.send.max.retries", - "retries", - "retry.backoff.ms", - "retry.backoff.max.ms", - "batch.num.messages", - "batch.size", - "compression.codec", - "compression.type", - "compression.level", - "topic.metadata.refresh.interval.ms", - }; - - /// customization, overrides default values - for (const auto & pair : kafka->properties()) - { - if (allowed_properties.contains(pair.first)) - { - producer_params.emplace_back(pair.first, pair.second); - continue; - } - throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Unsupported property {}", pair.first); - } - - /// properies from settings have higher priority - producer_params.emplace_back("bootstrap.servers", kafka->brokers()); - kafka->auth().populateConfigs(producer_params); - - auto * conf = rd_kafka_conf_new(); - char errstr[512]{'\0'}; - for (const auto & param : producer_params) - { - auto ret = rd_kafka_conf_set(conf, param.first.c_str(), param.second.c_str(), errstr, sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - { - rd_kafka_conf_destroy(conf); - throw Exception( - ErrorCodes::INVALID_CONFIG_PARAMETER, - "Failed to set kafka config `{}` with value `{}` error={}", - param.first, - param.second, - ret); - } - } - - rd_kafka_conf_set_opaque(conf, this); /* needed by onMessageDelivery */ - rd_kafka_conf_set_dr_msg_cb(conf, &KafkaSink::onMessageDelivery); - - size_t value_size = 8; - char topic_refresh_interval_ms_value[8]{'\0'}; /// max: 3600000 - rd_kafka_conf_get(conf, "topic.metadata.refresh.interval.ms", topic_refresh_interval_ms_value, &value_size); - Int32 topic_refresh_interval_ms {std::stoi(topic_refresh_interval_ms_value)}; - - producer.reset(rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr))); - if (!producer) - { - // librdkafka will take the ownership of `conf` if `rd_kafka_new` succeeds, - // but if it does not, we need to take care of cleaning it up by ourselves. - rd_kafka_conf_destroy(conf); - throw Exception("Failed to create kafka handle", klog::mapErrorCode(rd_kafka_last_error())); - } - - topic.reset(rd_kafka_topic_new(producer.get(), kafka->topic().c_str(), nullptr)); wb = std::make_unique([this](char * pos, size_t len) { addMessageToBatch(pos, len); }); - const auto & data_format = kafka->dataFormat(); + const auto & data_format = kafka.dataFormat(); assert(!data_format.empty()); if (message_key_ast) @@ -229,19 +154,19 @@ KafkaSink::KafkaSink( { /// The callback allows `IRowOutputFormat` based formats produce one Kafka message per row. writer = FormatFactory::instance().getOutputFormat( - data_format, *wb, header, context, [this](auto & /*column*/, auto /*row*/) { wb->next(); }, kafka->getFormatSettings(context)); + data_format, *wb, header, context, [this](auto & /*column*/, auto /*row*/) { wb->next(); }, kafka.getFormatSettings(context)); if (!dynamic_cast(writer.get())) throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Data format `{}` is not a row-based foramt, it cannot be used with `one_message_per_row`", data_format); } else { - writer = FormatFactory::instance().getOutputFormat(data_format, *wb, header, context, {}, kafka->getFormatSettings(context)); + writer = FormatFactory::instance().getOutputFormat(data_format, *wb, header, context, {}, kafka.getFormatSettings(context)); } writer->setAutoFlush(); - if (kafka->hasCustomShardingExpr()) + if (kafka.hasCustomShardingExpr()) { - const auto & ast = kafka->shardingExprAst(); + const auto & ast = kafka.shardingExprAst(); partitioner = std::make_unique(buildExpression(header, ast, context), ast->getColumnName()); } else @@ -250,26 +175,25 @@ KafkaSink::KafkaSink( /// Polling message deliveries. background_jobs.scheduleOrThrowOnError([this, refresh_interval_ms = static_cast(topic_refresh_interval_ms)]() { auto metadata_refresh_stopwatch = Stopwatch(); - + /// Use a small sleep interval to avoid blocking operation for a long just (in case refresh_interval_ms is big). + auto sleep_ms = std::min(UInt64(500), refresh_interval_ms); while (!is_finished.test()) { - /// Firstly, poll messages - if (auto n = rd_kafka_poll(producer.get(), POLL_TIMEOUT_MS)) - LOG_TRACE(logger, "polled {} events", n); - - /// Then, fetch topic metadata for partition updates + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + /// Fetch topic metadata for partition updates if (metadata_refresh_stopwatch.elapsedMilliseconds() < refresh_interval_ms) continue; metadata_refresh_stopwatch.restart(); - auto result {klog::describeTopic(topic.get(), producer.get(), logger)}; - if (result.err) + try { - LOG_WARNING(logger, "Failed to describe topic, error code: {}", result.err); - continue; + partition_cnt = topic.getPartitionCount(); + } + catch (...) /// do not break the loop until finished + { + LOG_WARNING(logger, "Failed to describe topic, error code: {}", getCurrentExceptionMessage(true, true)); } - partition_cnt = result.partitions; } }); } @@ -289,6 +213,7 @@ void KafkaSink::addMessageToBatch(char * pos, size_t len) .len = len, .key = const_cast(key.data), .key_len = key.size, + ._private = this, }); batch_payload.push_back(std::move(payload)); @@ -372,7 +297,7 @@ void KafkaSink::consume(Chunk chunk) /// With `wb->setAutoFlush()`, it makes sure that all messages are generated for the chunk at this point. rd_kafka_produce_batch( - topic.get(), + topic.getHandle(), RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE | RD_KAFKA_MSG_F_PARTITION | RD_KAFKA_MSG_F_BLOCK, current_batch.data(), @@ -420,33 +345,34 @@ void KafkaSink::onFinish() background_jobs.wait(); /// if there are no outstandings, no need to do flushing - if (!hasOutstandingMessages()) + if (outstandingMessages() == 0) return; /// Make sure all outstanding requests are transmitted and handled. /// It should not block for ever here, otherwise, it will block proton from stopping the job /// or block proton from terminating. - if (auto err = rd_kafka_flush(producer.get(), 15000 /* time_ms */); err) + if (auto err = rd_kafka_flush(producer.getHandle(), 15000 /* time_ms */); err) LOG_ERROR(logger, "Failed to flush kafka producer, error={}", rd_kafka_err2str(err)); if (auto err = lastSeenError(); err != RD_KAFKA_RESP_ERR_NO_ERROR) LOG_ERROR(logger, "Failed to send messages, last_seen_error={}", rd_kafka_err2str(err)); /// if flush does not return an error, the delivery report queue should be empty - if (hasOutstandingMessages()) + if (outstandingMessages() > 0) LOG_ERROR(logger, "Not all messsages are sent successfully, expected={} actual={}", outstandings(), acked()); } -void KafkaSink::onMessageDelivery(rd_kafka_t * /* producer */, const rd_kafka_message_t * msg, void * opaque) +void KafkaSink::onMessageDelivery(rd_kafka_t * /* producer */, const rd_kafka_message_t * msg, void * /*opaque*/) { - static_cast(opaque)->onMessageDelivery(msg); + auto * sink = static_cast(msg->_private); + sink->onMessageDelivery(msg->err); } -void KafkaSink::onMessageDelivery(const rd_kafka_message_t * msg) +void KafkaSink::onMessageDelivery(rd_kafka_resp_err_t err) { - if (msg->err) + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { - state.last_error_code.store(msg->err); + state.last_error_code.store(err); ++state.error_count; } else @@ -466,13 +392,16 @@ void KafkaSink::checkpoint(CheckpointContextPtr context) throw Exception( klog::mapErrorCode(err), "Failed to send messages, error_cout={} last_error={}", errorCount(), rd_kafka_err2str(err)); - if (!hasOutstandingMessages()) + auto outstanding_msgs = outstandingMessages(); + if (outstanding_msgs == 0) break; + LOG_INFO(logger, "Waiting for {} outstandings on checkpointing", outstanding_msgs); + if (is_finished.test()) { /// for a final check, it should not wait for too long - if (auto err = rd_kafka_flush(producer.get(), 15000 /* time_ms */); err) + if (auto err = rd_kafka_flush(producer.getHandle(), 15000 /* time_ms */); err) throw Exception(klog::mapErrorCode(err), "Failed to flush kafka producer, error={}", rd_kafka_err2str(err)); if (auto err = lastSeenError(); err != RD_KAFKA_RESP_ERR_NO_ERROR) @@ -482,7 +411,7 @@ void KafkaSink::checkpoint(CheckpointContextPtr context) errorCount(), rd_kafka_err2str(err)); - if (hasOutstandingMessages()) + if (outstandingMessages() > 0) throw Exception( ErrorCodes::CANNOT_WRITE_TO_KAFKA, "Not all messsages are sent successfully, expected={} actual={}", @@ -495,7 +424,7 @@ void KafkaSink::checkpoint(CheckpointContextPtr context) std::this_thread::sleep_for(std::chrono::milliseconds(10)); } while (true); - resetState(); + state.reset(); IProcessor::checkpoint(context); } diff --git a/src/Storages/ExternalStream/Kafka/KafkaSink.h b/src/Storages/ExternalStream/Kafka/KafkaSink.h index eda284d0850..edcc433a283 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSink.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSink.h @@ -8,16 +8,12 @@ #include #include -namespace Poco -{ -class Logger; -} - namespace DB { namespace KafkaStream { + /// Shard Chunk's to shards (or partitions in Kafka's term) by the sharding expression. class ChunkSharder { @@ -42,19 +38,21 @@ class ChunkSharder String sharding_key_column_name; bool random_sharding = false; }; + } class KafkaSink final : public SinkToStorage { public: + /// Callback for Kafka message delivery report + static void onMessageDelivery(rd_kafka_t * /* producer */, const rd_kafka_message_t * msg, void * /*opaque*/); + KafkaSink( - const Kafka * kafka, + Kafka & kafka, const Block & header, - Int32 initial_partition_cnt, const ASTPtr & message_key, - ContextPtr context, - Poco::Logger * logger_, - ExternalStreamCounterPtr external_stream_counter_); + ExternalStreamCounterPtr external_stream_counter_, + ContextPtr context); ~KafkaSink() override; String getName() const override { return "KafkaSink"; } @@ -64,10 +62,8 @@ class KafkaSink final : public SinkToStorage void checkpoint(CheckpointContextPtr) override; private: - /// Callback for Kafka message delivery report - static void onMessageDelivery(rd_kafka_t * /* producer */, const rd_kafka_message_t * msg, void * /*opaque*/); - void onMessageDelivery(const rd_kafka_message_t * msg); - + // void onMessageDelivery(const rd_kafka_message_t * msg); + void onMessageDelivery(rd_kafka_resp_err_t err); void addMessageToBatch(char * pos, size_t len); /// the number of acknowledgement has been received so far for the current checkpoint period @@ -80,17 +76,16 @@ class KafkaSink final : public SinkToStorage rd_kafka_resp_err_t lastSeenError() const { return static_cast(state.last_error_code.load()); } /// check if there are no more outstandings (i.e. delivery reports have been recieved /// for all out-go messages, regardless if a message is successfully delivered or not) - bool hasOutstandingMessages() const noexcept { return state.outstandings != state.acked + state.error_count; } - /// allows to reset the state after each checkpoint - void resetState() { state.reset(); } + size_t outstandingMessages() const noexcept { return state.outstandings - (state.acked + state.error_count); } - static const int POLL_TIMEOUT_MS {500}; + RdKafka::Producer & producer; + RdKafka::Topic & topic; + // std::unique_ptr topic; Int32 partition_cnt {0}; bool one_message_per_row {false}; + Int32 topic_refresh_interval_ms = 0; - klog::KafkaPtr producer {nullptr, rd_kafka_destroy}; - klog::KTopicPtr topic {nullptr, rd_kafka_topic_destroy}; ThreadPool background_jobs {1}; std::atomic_flag is_finished {false}; @@ -113,14 +108,16 @@ class KafkaSink final : public SinkToStorage std::atomic_size_t outstandings {0}; std::atomic_size_t acked {0}; std::atomic_size_t error_count {0}; - std::atomic_int last_error_code {0}; + std::atomic_int32_t last_error_code {0}; + /// allows to reset the state after each checkpoint void reset(); }; State state; - Poco::Logger * logger; ExternalStreamCounterPtr external_stream_counter; + Poco::Logger * logger; }; + } diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp index e0c21fa8abb..6ebc5714dcd 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp @@ -1,22 +1,17 @@ -#include "KafkaSource.h" -#include "Kafka.h" - #include #include +#include #include +#include #include #include #include -#include -#include -#include #include -#include -#include -#include -#include +#include +#include +#include -#include +#include namespace DB { @@ -29,34 +24,44 @@ extern const int RECOVER_CHECKPOINT_FAILED; } KafkaSource::KafkaSource( - Kafka * kafka, + Kafka & kafka_, const Block & header_, const StorageSnapshotPtr & storage_snapshot_, - ContextPtr query_context_, - Int32 shard, - Int64 offset, + RdKafka::ConsumerPool::Entry consumer_, + RdKafka::TopicPtr topic_, + Int32 shard_, + Int64 offset_, size_t max_block_size_, - Poco::Logger * log_, - ExternalStreamCounterPtr external_stream_counter_) + ExternalStreamCounterPtr external_stream_counter_, + ContextPtr query_context_) : Streaming::ISource(header_, true, ProcessorID::KafkaSourceID) + , kafka(kafka_) , storage_snapshot(storage_snapshot_) - , query_context(std::move(query_context_)) , max_block_size(max_block_size_) - , log(log_) , header(header_) , non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized()) - , consume_ctx(kafka->topic(), shard, offset) , read_buffer("", 0) , virtual_col_value_functions(header.columns(), nullptr) , virtual_col_types(header.columns(), nullptr) - , ckpt_data(consume_ctx) + , consumer(consumer_) + , topic(topic_) + , shard(shard_) + , offset(offset_) + , ckpt_data(kafka.topicName(), shard) , external_stream_counter(external_stream_counter_) + , query_context(std::move(query_context_)) + , logger(&Poco::Logger::get(fmt::format("{}(source-{})", kafka.getLoggerName(), query_context->getCurrentQueryId()))) { assert(external_stream_counter); + if (auto batch_count = query_context->getSettingsRef().record_consume_batch_count; batch_count != 0) + record_consume_batch_count = static_cast(batch_count.value); + + if (auto consume_timeout = query_context->getSettingsRef().record_consume_timeout_ms; consume_timeout != 0) + record_consume_timeout_ms = static_cast(consume_timeout.value); + calculateColumnPositions(); - initConsumer(kafka); - initFormatExecutor(kafka); + initFormatExecutor(); /// If there is no data format, physical headers shall always contain 1 column assert((physical_header.columns() == 1 && !format_executor) || format_executor); @@ -67,8 +72,11 @@ KafkaSource::KafkaSource( KafkaSource::~KafkaSource() { - LOG_INFO(log, "Stop streaming reading from topic={} shard={}", consume_ctx.topic, consume_ctx.partition); - consumer->stopConsume(consume_ctx); + if (consume_started) + { + LOG_INFO(logger, "Stop consuming from topic={} shard={}", topic->name(), shard); + consumer->stopConsume(*topic, shard); + } } Chunk KafkaSource::generate() @@ -76,6 +84,13 @@ Chunk KafkaSource::generate() if (isCancelled()) return {}; + if (!consume_started) + { + LOG_INFO(logger, "Start consuming from topic={} shard={} offset={}", topic->name(), shard, offset); + consumer->startConsume(*topic, shard, offset); + consume_started = true; + } + if (result_chunks.empty() || iter == result_chunks.end()) { readAndProcess(); @@ -100,13 +115,18 @@ void KafkaSource::readAndProcess() current_batch.clear(); current_batch.reserve(header.columns()); - auto res = consumer->consume(&KafkaSource::parseMessage, this, record_consume_batch_count, record_consume_timeout_ms, consume_ctx); + auto callback = [this](void * rkmessage, size_t total_count, void * data) + { + parseMessage(rkmessage, total_count, data); + }; - if (res != ErrorCodes::OK) + auto error_callback = [this](rd_kafka_resp_err_t err) { - LOG_ERROR(log, "Failed to consume streaming, topic={} shard={} err={}", consume_ctx.topic, consume_ctx.partition, res); + LOG_ERROR(logger, "Failed to consume topic={} shard={} err={}", topic->name(), shard, rd_kafka_err2str(err)); external_stream_counter->addToReadFailed(1); - } + }; + + consumer->consumeBatch(*topic, shard, record_consume_batch_count, record_consume_timeout_ms, callback, error_callback); if (!current_batch.empty()) { @@ -117,16 +137,16 @@ void KafkaSource::readAndProcess() iter = result_chunks.begin(); } -void KafkaSource::parseMessage(void * kmessage, size_t total_count, void * data) +void KafkaSource::parseMessage(void * rkmessage, size_t /*total_count*/, void * /*data*/) { - auto * kafka = static_cast(data); - kafka->doParseMessage(static_cast(kmessage), total_count); -} + auto * message = static_cast(rkmessage); -void KafkaSource::doParseMessage(const rd_kafka_message_t * kmessage, size_t /*total_count*/) -{ - parseFormat(kmessage); - ckpt_data.last_sn = kmessage->offset; + if (unlikely(message->offset < offset)) + /// Ignore the message which has lower offset than what clients like to have + return; + + parseFormat(message); + ckpt_data.last_sn = message->offset; } void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage) @@ -142,7 +162,8 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage) if (format_error) { - LOG_ERROR(log, "Failed to parse message at {}: {}", kmessage->offset, format_error.value()); + LOG_ERROR(logger, "Failed to parse message at {}: {}", kmessage->offset, format_error.value()); + external_stream_counter->addToReadFailed(1); format_error.reset(); } @@ -211,29 +232,9 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage) } } -void KafkaSource::initConsumer(const Kafka * kafka) -{ - const auto & settings_ref = query_context->getSettingsRef(); - - if (settings_ref.record_consume_batch_count != 0) - record_consume_batch_count = static_cast(settings_ref.record_consume_batch_count.value); - - if (settings_ref.record_consume_timeout_ms != 0) - record_consume_timeout_ms = static_cast(settings_ref.record_consume_timeout_ms.value); - - if (consume_ctx.offset == -1) - consume_ctx.auto_offset_reset = "latest"; - else if (consume_ctx.offset == -2) - consume_ctx.auto_offset_reset = "earliest"; - - consume_ctx.enforce_offset = true; - consumer = kafka->getConsumer(record_consume_timeout_ms); - consumer->initTopicHandle(consume_ctx); -} - -void KafkaSource::initFormatExecutor(const Kafka * kafka) +void KafkaSource::initFormatExecutor() { - const auto & data_format = kafka->dataFormat(); + const auto & data_format = kafka.dataFormat(); auto input_format = FormatFactory::instance().getInputFormat( data_format, @@ -241,7 +242,7 @@ void KafkaSource::initFormatExecutor(const Kafka * kafka) non_virtual_header, query_context, max_block_size, - kafka->getFormatSettings(query_context)); + kafka.getFormatSettings(query_context)); format_executor = std::make_unique( non_virtual_header, @@ -339,6 +340,7 @@ Chunk KafkaSource::doCheckpoint(CheckpointContextPtr ckpt_ctx_) result.setCheckpointContext(ckpt_ctx_); ckpt_ctx_->coordinator->checkpoint(State::VERSION, getLogicID(), ckpt_ctx_, [&](WriteBuffer & wb) { ckpt_data.serialize(wb); }); + LOG_INFO(logger, "Saved checkpoint topic={} parition={} offset={}", ckpt_data.topic, ckpt_data.partition, ckpt_data.last_sn); /// FIXME, if commit failed ? /// Propagate checkpoint barriers @@ -350,13 +352,13 @@ void KafkaSource::doRecover(CheckpointContextPtr ckpt_ctx_) ckpt_ctx_->coordinator->recover( getLogicID(), ckpt_ctx_, [&](VersionType version, ReadBuffer & rb) { ckpt_data.deserialize(version, rb); }); - LOG_INFO(log, "Recovered last_sn={}", ckpt_data.last_sn); + LOG_INFO(logger, "Recovered last_sn={}", ckpt_data.last_sn); } void KafkaSource::doResetStartSN(Int64 sn) { if (sn >= 0) - consume_ctx.offset = sn; + offset = sn; } void KafkaSource::State::serialize(WriteBuffer & wb) const diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.h b/src/Storages/ExternalStream/Kafka/KafkaSource.h index ec70cb36824..7e06166ca5a 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.h @@ -1,18 +1,13 @@ #pragma once -#include -#include +#include #include #include +#include +#include +#include #include #include -#include -#include - -namespace Poco -{ -class Logger; -} struct rd_kafka_message_s; @@ -25,21 +20,22 @@ class KafkaSource final : public Streaming::ISource { public: KafkaSource( - Kafka * kafka, - const Block & header, + Kafka & kafka_, + const Block & header_, const StorageSnapshotPtr & storage_snapshot_, - ContextPtr query_context_, - Int32 shard, - Int64 offset, - size_t max_block_size, - Poco::Logger * log_, - ExternalStreamCounterPtr external_stream_counter_); + RdKafka::ConsumerPool::Entry consumer_, + RdKafka::TopicPtr topic_, + Int32 shard_, + Int64 offset_, + size_t max_block_size_, + ExternalStreamCounterPtr external_stream_counter_, + ContextPtr query_context_); ~KafkaSource() override; String getName() const override { return "KafkaSource"; } - String description() const override { return fmt::format("topic={}, partition={}", consume_ctx.topic, consume_ctx.partition); } + String description() const override { return fmt::format("topic={}, partition={}", topic->name(), shard); } Chunk generate() override; @@ -47,11 +43,9 @@ class KafkaSource final : public Streaming::ISource private: void calculateColumnPositions(); - void initConsumer(const Kafka * kafka); - void initFormatExecutor(const Kafka * kafka); + void initFormatExecutor(); - static void parseMessage(void * kmessage, size_t total_count, void * data); - void doParseMessage(const rd_kafka_message_s * kmessage, size_t total_count); + void parseMessage(void * kmessage, size_t total_count, void * data); void parseFormat(const rd_kafka_message_s * kmessage); inline void readAndProcess(); @@ -60,11 +54,9 @@ class KafkaSource final : public Streaming::ISource void doRecover(CheckpointContextPtr ckpt_ctx_) override; void doResetStartSN(Int64 sn) override; -private: + Kafka & kafka; StorageSnapshotPtr storage_snapshot; - ContextPtr query_context; size_t max_block_size; - Poco::Logger * log; Block header; const Block non_virtual_header; @@ -73,9 +65,6 @@ class KafkaSource final : public Streaming::ISource std::shared_ptr convert_non_virtual_to_physical_action = nullptr; - klog::KafkaWALSimpleConsumerPtr consumer; - klog::KafkaWALContext consume_ctx; - std::unique_ptr format_executor; ReadBufferFromMemory read_buffer; @@ -92,6 +81,13 @@ class KafkaSource final : public Streaming::ISource UInt32 record_consume_batch_count = 1000; Int32 record_consume_timeout_ms = 100; + RdKafka::ConsumerPool::Entry consumer; + RdKafka::TopicPtr topic; + Int32 shard; + Int64 offset; + + bool consume_started = false; + /// For checkpoint struct State { @@ -105,10 +101,13 @@ class KafkaSource final : public Streaming::ISource Int32 partition; Int64 last_sn = -1; - explicit State(const klog::KafkaWALContext & consume_ctx_) : topic(consume_ctx_.topic), partition(consume_ctx_.partition) { } + State(const String & topic_, Int32 partition_) : topic(topic_), partition(partition_) { } } ckpt_data; ExternalStreamCounterPtr external_stream_counter; + + ContextPtr query_context; + Poco::Logger * logger; }; } diff --git a/src/Storages/ExternalStream/Kafka/Producer.cpp b/src/Storages/ExternalStream/Kafka/Producer.cpp new file mode 100644 index 00000000000..5a4d723e322 --- /dev/null +++ b/src/Storages/ExternalStream/Kafka/Producer.cpp @@ -0,0 +1,45 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int INVALID_CONFIG_PARAMETER; +extern const int INVALID_SETTING_VALUE; +} + +namespace RdKafka +{ + +/// Producer will take the ownership of `rk_conf`. +Producer::Producer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, Poco::Logger * logger_) : logger(logger_) +{ + char errstr[512]; + auto * conf = rd_kafka_conf_dup(&rk_conf); + rk.reset(rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr))); + if (!rk) + { + /// librdkafka only take the ownership of `rk_conf` if `rd_kafka_new` succeeds, + /// we need to free it otherwise. + rd_kafka_conf_destroy(conf); + throw Exception(klog::mapErrorCode(rd_kafka_last_error()), "Failed to create kafka handle: {}", errstr); + } + + poller.scheduleOrThrowOnError([this, poll_timeout_ms] { backgroundPoll(poll_timeout_ms); }); +} + +void Producer::backgroundPoll(UInt64 poll_timeout_ms) const +{ + LOG_INFO(logger, "Start producer poll"); + + while (!stopped.test()) + rd_kafka_poll(rk.get(), poll_timeout_ms); + + LOG_INFO(logger, "Producer poll stopped"); +} + +} + +} diff --git a/src/Storages/ExternalStream/Kafka/Producer.h b/src/Storages/ExternalStream/Kafka/Producer.h new file mode 100644 index 00000000000..97c3b200c7d --- /dev/null +++ b/src/Storages/ExternalStream/Kafka/Producer.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include + +#include + +namespace DB +{ + +namespace RdKafka +{ + +class Producer : boost::noncopyable +{ +public: + Producer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, Poco::Logger * logger_); + ~Producer() + { + stopped.test_and_set(); + } + + rd_kafka_t * getHandle() const { return rk.get(); } + +private: + std::string name() const { return rd_kafka_name(rk.get()); } + void backgroundPoll(UInt64 poll_timeout_ms) const; + + klog::KafkaPtr rk {nullptr, rd_kafka_destroy}; + ThreadPool poller; + std::atomic_flag stopped; + Poco::Logger * logger; +}; + +} + +} diff --git a/src/Storages/ExternalStream/Kafka/Topic.cpp b/src/Storages/ExternalStream/Kafka/Topic.cpp new file mode 100644 index 00000000000..698855a2464 --- /dev/null +++ b/src/Storages/ExternalStream/Kafka/Topic.cpp @@ -0,0 +1,52 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int RESOURCE_NOT_FOUND; +} + +namespace RdKafka +{ + +Topic::Topic(rd_kafka_t & rk, const std::string & name) : rdk(rk) +{ + /// rd_kafka_topic_new takes ownership of topic_conf + rkt.reset(rd_kafka_topic_new(&rk, name.c_str(), /*conf=*/nullptr)); + if (!rkt) + { + auto err_code = rd_kafka_last_error(); + throw Exception(klog::mapErrorCode(err_code), "failed to create topic handler for {}, err_code={}, error_msg={}", name, err_code, rd_kafka_err2str(err_code)); + } +} + +int Topic::getPartitionCount() const +{ + const struct rd_kafka_metadata * metadata = nullptr; + + auto err = rd_kafka_metadata(&rdk, 0, rkt.get(), &metadata, /*timeout_ms=*/5000); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) + throw Exception(klog::mapErrorCode(err), "failed to describe topic {}, error_code={}, error_msg={}", name(), err, rd_kafka_err2str(err)); + + if (metadata->topic_cnt < 1) + { + rd_kafka_metadata_destroy(metadata); + throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Could not find topic {}", name()); + } + + assert(metadata->topic_cnt == 1); + + auto partition_cnt = metadata->topics[0].partition_cnt; + rd_kafka_metadata_destroy(metadata); + if (partition_cnt > 0) + return partition_cnt; + else + throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Describe topic of {} returned 0 partitions", name()); +} + +} + +} diff --git a/src/Storages/ExternalStream/Kafka/Topic.h b/src/Storages/ExternalStream/Kafka/Topic.h new file mode 100644 index 00000000000..e23f31f3b63 --- /dev/null +++ b/src/Storages/ExternalStream/Kafka/Topic.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include + +namespace DB +{ + +namespace RdKafka +{ + +using Properties = std::vector>; + +class Topic : boost::noncopyable +{ +public: + Topic(rd_kafka_t & rk, const std::string & name); + ~Topic() = default; + + rd_kafka_topic_t * getHandle() const { return rkt.get(); } + std::string name() const { return rd_kafka_topic_name(rkt.get()); } + int getPartitionCount() const; + +private: + rd_kafka_t & rdk; + std::unique_ptr rkt {nullptr, rd_kafka_topic_destroy}; +}; + +using TopicPtr = std::shared_ptr; + +} + +} diff --git a/src/Storages/ExternalStream/StorageExternalStreamImpl.h b/src/Storages/ExternalStream/StorageExternalStreamImpl.h index 35f35187f15..c7b92b4f661 100644 --- a/src/Storages/ExternalStream/StorageExternalStreamImpl.h +++ b/src/Storages/ExternalStream/StorageExternalStreamImpl.h @@ -46,7 +46,11 @@ class StorageExternalStreamImpl : public std::enable_shared_from_thisgetFormatSettings(context); + auto ret = settings->getFormatSettings(context); + /// This is needed otherwise using an external stream with ProtobufSingle format as the target stream + /// of a MV (or in `INSERT ... SELECT ...`), i.e. more than one rows sent to the stream, exception will be thrown. + ret.protobuf.allow_multiple_rows_without_delimiter = true; + return ret; } protected: