diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index ecb7b1cce9..321706fb40 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -688,6 +688,8 @@ M(2534, UDA_NOT_APPLICABLE) \ M(2600, CANNOT_WRITE_TO_KAFKA) \ M(2601, NO_AVAILABLE_KAFKA_CONSUMER) \ + M(2603, KAFKA_CONSUMER_STOPPED) \ + M(2604, KAFKA_PRODUCER_STOPPED) \ M(2610, FORMAT_SCHEMA_ALREADY_EXISTS) \ M(2611, UNKNOWN_FORMAT_SCHEMA) \ M(2612, AMBIGUOUS_FORMAT_SCHEMA) \ diff --git a/src/Common/PoolBase.h b/src/Common/PoolBase.h index c9941fc333..41e04fe333 100644 --- a/src/Common/PoolBase.h +++ b/src/Common/PoolBase.h @@ -197,15 +197,15 @@ class PoolBase : private boost::noncopyable /** The maximum size of the pool. */ unsigned max_items; - /** Pool. */ - Objects items; - /** Lock to access the pool. */ std::mutex mutex; std::condition_variable available; protected: + /** Pool. */ + Objects items; /// proton: updated + Poco::Logger * log; PoolBase(unsigned max_items_, Poco::Logger * log_) diff --git a/src/Storages/ExternalStream/Kafka/Consumer.cpp b/src/Storages/ExternalStream/Kafka/Consumer.cpp index b62d269496..1f9c26dc50 100644 --- a/src/Storages/ExternalStream/Kafka/Consumer.cpp +++ b/src/Storages/ExternalStream/Kafka/Consumer.cpp @@ -7,6 +7,7 @@ namespace DB namespace ErrorCodes { +extern const int KAFKA_CONSUMER_STOPPED; extern const int RESOURCE_NOT_FOUND; } @@ -70,6 +71,9 @@ void Consumer::stopConsume(Topic & topic, Int32 parition) void Consumer::consumeBatch(Topic & topic, Int32 partition, uint32_t count, int32_t timeout_ms, Consumer::Callback callback, ErrorCallback error_callback) const { + if (unlikely(stopped.test())) + throw Exception(ErrorCodes::KAFKA_CONSUMER_STOPPED, "Cannot consume from stopped consummer"); + std::unique_ptr rkmessages { static_cast(malloc(sizeof(rd_kafka_message_t *) * count)), free diff --git a/src/Storages/ExternalStream/Kafka/Consumer.h b/src/Storages/ExternalStream/Kafka/Consumer.h index 05a7c0dfad..08dc5616ad 100644 --- a/src/Storages/ExternalStream/Kafka/Consumer.h +++ b/src/Storages/ExternalStream/Kafka/Consumer.h @@ -16,10 +16,7 @@ class Consumer : boost::noncopyable { public: Consumer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, const String & logger_name_prefix); - ~Consumer() - { - stopped.test_and_set(); - } + ~Consumer() { shutdown(); } rd_kafka_t * getHandle() const { return rk.get(); } @@ -33,6 +30,8 @@ class Consumer : boost::noncopyable void consumeBatch(Topic & topic, Int32 partition, uint32_t count, int32_t timeout_ms, Callback callback, ErrorCallback error_callback) const; + void shutdown() { stopped.test_and_set(); } + std::string name() const { return rd_kafka_name(rk.get()); } private: @@ -40,8 +39,9 @@ class Consumer : boost::noncopyable klog::KafkaPtr rk {nullptr, rd_kafka_destroy}; ThreadPool poller; - std::atomic_flag stopped; Poco::Logger * logger; + + std::atomic_flag stopped = ATOMIC_FLAG_INIT; }; } diff --git a/src/Storages/ExternalStream/Kafka/ConsumerPool.h b/src/Storages/ExternalStream/Kafka/ConsumerPool.h index a8fa886fd6..c87dfc4eea 100644 --- a/src/Storages/ExternalStream/Kafka/ConsumerPool.h +++ b/src/Storages/ExternalStream/Kafka/ConsumerPool.h @@ -59,6 +59,9 @@ class ConsumerPool : public IConsumerPool, private PoolBase if (stopped.test_and_set()) return; + for (const auto & pooled_consumer : items) + pooled_consumer->object->shutdown(); + LOG_INFO(log, "Shutting down consumer pool, waiting for all consumers to be freed"); waitForNoMoreInUse(); LOG_INFO(log, "All consumers are freed"); diff --git a/src/Storages/ExternalStream/Kafka/Kafka.cpp b/src/Storages/ExternalStream/Kafka/Kafka.cpp index 94e0464641..83dff26540 100644 --- a/src/Storages/ExternalStream/Kafka/Kafka.cpp +++ b/src/Storages/ExternalStream/Kafka/Kafka.cpp @@ -497,36 +497,36 @@ Pipe Kafka::read( return pipe; } -RdKafka::Producer & Kafka::getProducer() +std::shared_ptr Kafka::getProducer() { if (producer) - return *producer; + return producer; std::scoped_lock lock(producer_mutex); /// Check again in case of losing the race if (producer) - return *producer; + return producer; - auto producer_ptr = std::make_unique(*conf, settings->poll_waittime_ms.value, getLoggerName()); + auto producer_ptr = std::make_shared(*conf, settings->poll_waittime_ms.value, getLoggerName()); producer.swap(producer_ptr); - return *producer; + return producer; } -RdKafka::Topic & Kafka::getProducerTopic() +std::shared_ptr Kafka::getProducerTopic() { if (producer_topic) - return *producer_topic; + return producer_topic; std::scoped_lock lock(producer_mutex); /// Check again in case of losing the race if (producer_topic) - return *producer_topic; + return producer_topic; - auto topic_ptr = std::make_unique(*getProducer().getHandle(), topicName()); + auto topic_ptr = std::make_shared(*getProducer()->getHandle(), topicName()); producer_topic.swap(topic_ptr); - return *producer_topic; + return producer_topic; } SinkToStoragePtr Kafka::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) diff --git a/src/Storages/ExternalStream/Kafka/Kafka.h b/src/Storages/ExternalStream/Kafka/Kafka.h index 812abc2a61..037813c549 100644 --- a/src/Storages/ExternalStream/Kafka/Kafka.h +++ b/src/Storages/ExternalStream/Kafka/Kafka.h @@ -43,14 +43,26 @@ class Kafka final : public StorageExternalStreamImpl void startup() override { LOG_INFO(logger, "Starting Kafka External Stream"); } void shutdown() override { LOG_INFO(logger, "Shutting down Kafka External Stream"); + consumer_pool->shutdown(); + if (producer) + producer->shutdown(); + /// Must release all resources here rather than relying on the deconstructor. /// Because the `Kafka` instance will not be destroyed immediately when the external stream gets dropped. consumer_pool.reset(); if (producer_topic) - producer_topic.reset(); + { + // producer_topic.reset(); + std::shared_ptr empty_topic_ptr; + producer_topic.swap(empty_topic_ptr); + } if (producer) - producer.reset(); + { + // producer.reset(); + std::shared_ptr empty_producer_ptr; + producer.swap(empty_producer_ptr); + } tryRemoveTempDir(logger); } bool supportsSubcolumns() const override { return true; } @@ -76,8 +88,8 @@ class Kafka final : public StorageExternalStreamImpl const ASTPtr & shardingExprAst() const { assert(!engine_args.empty()); return engine_args[0]; } bool hasCustomShardingExpr() const; - RdKafka::Producer & getProducer(); - RdKafka::Topic & getProducerTopic(); + std::shared_ptr getProducer(); + std::shared_ptr getProducerTopic(); RdKafka::ConsumerPool::Entry getConsumer() const { @@ -112,8 +124,8 @@ class Kafka final : public StorageExternalStreamImpl ConfPtr conf; /// The Producer instance and Topic instance can be used by multiple sinks at the same time, thus we only need one of each. std::mutex producer_mutex; - std::unique_ptr producer; - std::unique_ptr producer_topic; + std::shared_ptr producer; + std::shared_ptr producer_topic; /// A Consumer can only be used by one source at the same time (technically speaking, it can be used by multple sources as long as each source read from a different topic, /// but we will leave this as an enhancement later, probably when we introduce the `Connection` concept), thus we need a consumer pool. RdKafka::ConsumerPoolPtr consumer_pool; diff --git a/src/Storages/ExternalStream/Kafka/KafkaSink.cpp b/src/Storages/ExternalStream/Kafka/KafkaSink.cpp index 2ff86cdac0..b4b53d6758 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSink.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSink.cpp @@ -19,8 +19,9 @@ namespace DB namespace ErrorCodes { extern const int CANNOT_WRITE_TO_KAFKA; -extern const int TYPE_MISMATCH; extern const int INVALID_SETTING_VALUE; +extern const int KAFKA_PRODUCER_STOPPED; +extern const int TYPE_MISMATCH; } namespace @@ -131,11 +132,11 @@ KafkaSink::KafkaSink( : SinkToStorage(header, ProcessorID::ExternalTableDataSinkID) , producer(kafka.getProducer()) , topic(kafka.getProducerTopic()) - , partition_cnt(topic.getPartitionCount()) + , partition_cnt(topic->getPartitionCount()) , one_message_per_row(kafka.produceOneMessagePerRow()) , topic_refresh_interval_ms(kafka.topicRefreshIntervalMs()) , external_stream_counter(external_stream_counter_) - , logger(&Poco::Logger::get(fmt::format("{}.{}", kafka.getLoggerName(), producer.name()))) + , logger(&Poco::Logger::get(fmt::format("{}.{}", kafka.getLoggerName(), producer->name()))) { wb = std::make_unique([this](char * pos, size_t len) { addMessageToBatch(pos, len); }); @@ -178,8 +179,17 @@ KafkaSink::KafkaSink( auto metadata_refresh_stopwatch = Stopwatch(); /// Use a small sleep interval to avoid blocking operation for a long just (in case refresh_interval_ms is big). auto sleep_ms = std::min(UInt64(500), refresh_interval_ms); - while (!is_finished.test()) + while (true) { + if (unlikely(producer->isStopped())) + { + is_finished.test_and_set(); + LOG_WARNING(logger, "Producer {} has stopped, stopping sink", producer->name()); + } + + if (is_finished.test()) + break; + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); /// Fetch topic metadata for partition updates if (metadata_refresh_stopwatch.elapsedMilliseconds() < refresh_interval_ms) @@ -189,7 +199,7 @@ KafkaSink::KafkaSink( try { - partition_cnt = topic.getPartitionCount(); + partition_cnt = topic->getPartitionCount(); } catch (...) /// do not break the loop until finished { @@ -227,6 +237,9 @@ void KafkaSink::consume(Chunk chunk) if (!chunk.hasRows()) return; + if (unlikely(is_finished.test())) + throw Exception(ErrorCodes::KAFKA_PRODUCER_STOPPED, "KafkaSink cannot consume data because producer has stopped, likely the underlying external stream is gone"); + auto total_rows = chunk.rows(); auto block = getHeader().cloneWithColumns(chunk.detachColumns()); auto blocks = partitioner->shard(std::move(block), partition_cnt); @@ -299,7 +312,7 @@ void KafkaSink::consume(Chunk chunk) /// With `wb->setAutoFlush()`, it makes sure that all messages are generated for the chunk at this point. rd_kafka_produce_batch( - topic.getHandle(), + topic->getHandle(), RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE | RD_KAFKA_MSG_F_PARTITION | RD_KAFKA_MSG_F_BLOCK, current_batch.data(), @@ -355,7 +368,7 @@ void KafkaSink::onFinish() /// Make sure all outstanding requests are transmitted and handled. /// It should not block for ever here, otherwise, it will block proton from stopping the job /// or block proton from terminating. - if (auto err = rd_kafka_flush(producer.getHandle(), 15000 /* time_ms */); err) + if (auto err = rd_kafka_flush(producer->getHandle(), 15000 /* time_ms */); err) LOG_ERROR(logger, "Failed to flush kafka producer, error={}", rd_kafka_err2str(err)); if (auto err = lastSeenError(); err != RD_KAFKA_RESP_ERR_NO_ERROR) @@ -405,7 +418,7 @@ void KafkaSink::checkpoint(CheckpointContextPtr context) if (is_finished.test()) { /// for a final check, it should not wait for too long - if (auto err = rd_kafka_flush(producer.getHandle(), 15000 /* time_ms */); err) + if (auto err = rd_kafka_flush(producer->getHandle(), 15000 /* time_ms */); err) throw Exception(klog::mapErrorCode(err), "Failed to flush kafka producer, error={}", rd_kafka_err2str(err)); if (auto err = lastSeenError(); err != RD_KAFKA_RESP_ERR_NO_ERROR) diff --git a/src/Storages/ExternalStream/Kafka/KafkaSink.h b/src/Storages/ExternalStream/Kafka/KafkaSink.h index edcc433a28..08951cfeed 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSink.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSink.h @@ -78,9 +78,8 @@ class KafkaSink final : public SinkToStorage /// for all out-go messages, regardless if a message is successfully delivered or not) size_t outstandingMessages() const noexcept { return state.outstandings - (state.acked + state.error_count); } - RdKafka::Producer & producer; - RdKafka::Topic & topic; - // std::unique_ptr topic; + std::shared_ptr producer; + std::shared_ptr topic; Int32 partition_cnt {0}; bool one_message_per_row {false}; diff --git a/src/Storages/ExternalStream/Kafka/Producer.h b/src/Storages/ExternalStream/Kafka/Producer.h index 3263ead86e..291af6b894 100644 --- a/src/Storages/ExternalStream/Kafka/Producer.h +++ b/src/Storages/ExternalStream/Kafka/Producer.h @@ -15,22 +15,24 @@ class Producer : boost::noncopyable { public: Producer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, const String & logger_name_prefix); - ~Producer() - { - stopped.test_and_set(); - } + ~Producer() { shutdown(); } rd_kafka_t * getHandle() const { return rk.get(); } std::string name() const { return rd_kafka_name(rk.get()); } + void shutdown() { stopped.test_and_set(); } + + bool isStopped() const { return stopped.test(); } + private: void backgroundPoll(UInt64 poll_timeout_ms) const; klog::KafkaPtr rk {nullptr, rd_kafka_destroy}; ThreadPool poller; - std::atomic_flag stopped; Poco::Logger * logger; + + std::atomic_flag stopped = ATOMIC_FLAG_INIT; }; } diff --git a/src/Storages/ExternalStream/StorageExternalStreamImpl.cpp b/src/Storages/ExternalStream/StorageExternalStreamImpl.cpp index 2e73b50a37..b13f10a671 100644 --- a/src/Storages/ExternalStream/StorageExternalStreamImpl.cpp +++ b/src/Storages/ExternalStream/StorageExternalStreamImpl.cpp @@ -20,7 +20,8 @@ void StorageExternalStreamImpl::tryRemoveTempDir(Poco::Logger * logger) const { LOG_INFO(logger, "Trying to remove external stream temproary directory {}", tmpdir.string()); std::error_code err; - if (!fs::remove_all(tmpdir, err)) + fs::remove_all(tmpdir, err); + if (err) LOG_ERROR(logger, "Failed to remove the temporary directory, error_code={}, error_message={}", err.value(), err.message()); }