Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prometheus metrics for external stream sink #485

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/Storages/ExternalStream/ExternalStreamCounter.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,36 @@ class ExternalStreamCounter
inline uint64_t getReadBytes() const { return read_bytes.load(); }
inline uint64_t getReadCounts() const { return read_counts.load(); }
inline uint64_t getReadFailed() const { return read_failed.load(); }
inline uint64_t getWriteBytes() const { return write_bytes.load(); }
inline uint64_t getWriteCounts() const { return write_counts.load(); }
inline uint64_t getWriteFailed() const { return write_failed.load(); }

inline void addToReadBytes(uint64_t bytes) { read_bytes.fetch_add(bytes); }
inline void addToReadCounts(uint64_t counts) { read_counts.fetch_add(counts); }
inline void addToReadFailed(uint64_t amount) { read_failed.fetch_add(amount); }
inline void addToWriteBytes(uint64_t bytes) { write_bytes.fetch_add(bytes); }
inline void addToWriteCounts(uint64_t counts) { write_counts.fetch_add(counts); }
inline void addToWriteFailed(uint64_t amount) { write_failed.fetch_add(amount); }

std::map<String, uint64_t> getCounters() const
{
return {
{"ReadBytes", read_bytes.load()},
{"ReadCounts", read_counts.load()},
{"ReadFailed", read_failed.load()},
{"WriteBytes", write_bytes.load()},
{"WriteCounts", write_counts.load()},
{"WriteFailed", write_failed.load()},
};
}

private:
std::atomic<uint64_t> read_bytes;
std::atomic<uint64_t> read_counts;
std::atomic<uint64_t> read_failed;
std::atomic<uint64_t> write_bytes;
std::atomic<uint64_t> write_counts;
std::atomic<uint64_t> write_failed;
};

using ExternalStreamCounterPtr = std::shared_ptr<ExternalStreamCounter>;
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/ExternalStream/Kafka/Kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ SinkToStoragePtr Kafka::write(const ASTPtr & /*query*/, const StorageMetadataPtr
{
/// always validate before actual use
validate();
return std::make_shared<KafkaSink>(this, metadata_snapshot->getSampleBlock(), shards, message_key_ast, context, logger);
return std::make_shared<KafkaSink>(
this, metadata_snapshot->getSampleBlock(), shards, message_key_ast, context, logger, external_stream_counter);
}
}
21 changes: 20 additions & 1 deletion src/Storages/ExternalStream/Kafka/KafkaSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,19 @@ IColumn::Selector ChunkSharder::createSelector(Block block, Int32 shard_cnt) con
}
}

KafkaSink::KafkaSink(const Kafka * kafka, const Block & header, Int32 initial_partition_cnt, const ASTPtr & message_key_ast, ContextPtr context, Poco::Logger * logger_)
KafkaSink::KafkaSink(
const Kafka * kafka,
const Block & header,
Int32 initial_partition_cnt,
const ASTPtr & message_key_ast,
ContextPtr context,
Poco::Logger * logger_,
ExternalStreamCounterPtr external_stream_counter_)
: SinkToStorage(header, ProcessorID::ExternalTableDataSinkID)
, partition_cnt(initial_partition_cnt)
, one_message_per_row(kafka->produceOneMessagePerRow())
, logger(logger_)
, external_stream_counter(external_stream_counter_)
{
/// default values
std::vector<std::pair<String, String>> producer_params{
Expand Down Expand Up @@ -292,6 +300,7 @@ void KafkaSink::consume(Chunk chunk)
if (!chunk.hasRows())
return;

auto total_rows = chunk.rows();
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
auto blocks = partitioner->shard(std::move(block), partition_cnt);

Expand Down Expand Up @@ -371,10 +380,18 @@ void KafkaSink::consume(Chunk chunk)

rd_kafka_resp_err_t err {RD_KAFKA_RESP_ERR_NO_ERROR};
for (size_t i = 0; i < current_batch.size(); ++i)
{
if (current_batch[i].err)
{
err = current_batch[i].err;
external_stream_counter->addToWriteFailed(1);
}
else
{
batch_payload[i].release(); /// payload of messages which are succesfully handled by rd_kafka_produce_batch will be free'ed by librdkafka
external_stream_counter->addToWriteBytes(current_batch[i].len);
}
}

/// Clean up all the bookkeepings for the batch.
std::vector<rd_kafka_message_t> batch;
Expand All @@ -391,6 +408,8 @@ void KafkaSink::consume(Chunk chunk)

if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
throw Exception(klog::mapErrorCode(err), rd_kafka_err2str(err));
else
external_stream_counter->addToWriteCounts(total_rows);
}

void KafkaSink::onFinish()
Expand Down
11 changes: 10 additions & 1 deletion src/Storages/ExternalStream/Kafka/KafkaSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Core/BlockWithShard.h>
#include <Formats/FormatFactory.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/ExternalStream/ExternalStreamCounter.h>
#include <Storages/ExternalStream/Kafka/Kafka.h>
#include <Storages/ExternalStream/Kafka/WriteBufferFromKafkaSink.h>
#include <Common/ThreadPool.h>
Expand Down Expand Up @@ -46,7 +47,14 @@ class ChunkSharder
class KafkaSink final : public SinkToStorage
{
public:
KafkaSink(const Kafka * kafka, const Block & header, Int32 initial_partition_cnt, const ASTPtr & message_key, ContextPtr context, Poco::Logger * logger_);
KafkaSink(
const Kafka * kafka,
const Block & header,
Int32 initial_partition_cnt,
const ASTPtr & message_key,
ContextPtr context,
Poco::Logger * logger_,
ExternalStreamCounterPtr external_stream_counter_);
~KafkaSink() override;

String getName() const override { return "KafkaSink"; }
Expand Down Expand Up @@ -113,5 +121,6 @@ class KafkaSink final : public SinkToStorage
State state;

Poco::Logger * logger;
ExternalStreamCounterPtr external_stream_counter;
};
}
Loading