diff --git a/src/Storages/ExternalStream/ExternalStreamCounter.h b/src/Storages/ExternalStream/ExternalStreamCounter.h index 540b85b9adc..15deff668b0 100644 --- a/src/Storages/ExternalStream/ExternalStreamCounter.h +++ b/src/Storages/ExternalStream/ExternalStreamCounter.h @@ -11,10 +11,16 @@ class ExternalStreamCounter inline uint64_t getReadBytes() const { return read_bytes.load(); } inline uint64_t getReadCounts() const { return read_counts.load(); } inline uint64_t getReadFailed() const { return read_failed.load(); } + inline uint64_t getWriteBytes() const { return write_bytes.load(); } + inline uint64_t getWriteCounts() const { return write_counts.load(); } + inline uint64_t getWriteFailed() const { return write_failed.load(); } inline void addToReadBytes(uint64_t bytes) { read_bytes.fetch_add(bytes); } inline void addToReadCounts(uint64_t counts) { read_counts.fetch_add(counts); } inline void addToReadFailed(uint64_t amount) { read_failed.fetch_add(amount); } + inline void addToWriteBytes(uint64_t bytes) { write_bytes.fetch_add(bytes); } + inline void addToWriteCounts(uint64_t counts) { write_counts.fetch_add(counts); } + inline void addToWriteFailed(uint64_t amount) { write_failed.fetch_add(amount); } std::map getCounters() const { @@ -22,6 +28,9 @@ class ExternalStreamCounter {"ReadBytes", read_bytes.load()}, {"ReadCounts", read_counts.load()}, {"ReadFailed", read_failed.load()}, + {"WriteBytes", write_bytes.load()}, + {"WriteCounts", write_counts.load()}, + {"WriteFailed", write_failed.load()}, }; } @@ -29,6 +38,9 @@ class ExternalStreamCounter std::atomic read_bytes; std::atomic read_counts; std::atomic read_failed; + std::atomic write_bytes; + std::atomic write_counts; + std::atomic write_failed; }; using ExternalStreamCounterPtr = std::shared_ptr; diff --git a/src/Storages/ExternalStream/Kafka/Kafka.cpp b/src/Storages/ExternalStream/Kafka/Kafka.cpp index a88b4493a49..c00937e6878 100644 --- a/src/Storages/ExternalStream/Kafka/Kafka.cpp +++ b/src/Storages/ExternalStream/Kafka/Kafka.cpp @@ -314,6 +314,7 @@ SinkToStoragePtr Kafka::write(const ASTPtr & /*query*/, const StorageMetadataPtr { /// always validate before actual use validate(); - return std::make_shared(this, metadata_snapshot->getSampleBlock(), shards, message_key_ast, context, logger); + return std::make_shared( + this, metadata_snapshot->getSampleBlock(), shards, message_key_ast, context, logger, external_stream_counter); } } diff --git a/src/Storages/ExternalStream/Kafka/KafkaSink.cpp b/src/Storages/ExternalStream/Kafka/KafkaSink.cpp index 92040c8e7b9..79a678fe69c 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSink.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSink.cpp @@ -123,11 +123,19 @@ IColumn::Selector ChunkSharder::createSelector(Block block, Int32 shard_cnt) con } } -KafkaSink::KafkaSink(const Kafka * kafka, const Block & header, Int32 initial_partition_cnt, const ASTPtr & message_key_ast, ContextPtr context, Poco::Logger * logger_) +KafkaSink::KafkaSink( + const Kafka * kafka, + const Block & header, + Int32 initial_partition_cnt, + const ASTPtr & message_key_ast, + ContextPtr context, + Poco::Logger * logger_, + ExternalStreamCounterPtr external_stream_counter_) : SinkToStorage(header, ProcessorID::ExternalTableDataSinkID) , partition_cnt(initial_partition_cnt) , one_message_per_row(kafka->produceOneMessagePerRow()) , logger(logger_) + , external_stream_counter(external_stream_counter_) { /// default values std::vector> producer_params{ @@ -292,6 +300,7 @@ void KafkaSink::consume(Chunk chunk) if (!chunk.hasRows()) return; + auto total_rows = chunk.rows(); auto block = getHeader().cloneWithColumns(chunk.detachColumns()); auto blocks = partitioner->shard(std::move(block), partition_cnt); @@ -371,10 +380,18 @@ void KafkaSink::consume(Chunk chunk) rd_kafka_resp_err_t err {RD_KAFKA_RESP_ERR_NO_ERROR}; for (size_t i = 0; i < current_batch.size(); ++i) + { if (current_batch[i].err) + { err = current_batch[i].err; + external_stream_counter->addToWriteFailed(1); + } else + { batch_payload[i].release(); /// payload of messages which are succesfully handled by rd_kafka_produce_batch will be free'ed by librdkafka + external_stream_counter->addToWriteBytes(current_batch[i].len); + } + } /// Clean up all the bookkeepings for the batch. std::vector batch; @@ -391,6 +408,8 @@ void KafkaSink::consume(Chunk chunk) if (err != RD_KAFKA_RESP_ERR_NO_ERROR) throw Exception(klog::mapErrorCode(err), rd_kafka_err2str(err)); + else + external_stream_counter->addToWriteCounts(total_rows); } void KafkaSink::onFinish() diff --git a/src/Storages/ExternalStream/Kafka/KafkaSink.h b/src/Storages/ExternalStream/Kafka/KafkaSink.h index 3a7bc28fa04..eda284d0850 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSink.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSink.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -46,7 +47,14 @@ class ChunkSharder class KafkaSink final : public SinkToStorage { public: - KafkaSink(const Kafka * kafka, const Block & header, Int32 initial_partition_cnt, const ASTPtr & message_key, ContextPtr context, Poco::Logger * logger_); + KafkaSink( + const Kafka * kafka, + const Block & header, + Int32 initial_partition_cnt, + const ASTPtr & message_key, + ContextPtr context, + Poco::Logger * logger_, + ExternalStreamCounterPtr external_stream_counter_); ~KafkaSink() override; String getName() const override { return "KafkaSink"; } @@ -113,5 +121,6 @@ class KafkaSink final : public SinkToStorage State state; Poco::Logger * logger; + ExternalStreamCounterPtr external_stream_counter; }; }