Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/kafka properties for source #618

Merged
merged 11 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/KafkaLog/KafkaWALCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ initRdKafkaHandle(rd_kafka_type_t type, KConfParams & params, KafkaWALStats * st
if (!kafka_handle)
{
LOG_ERROR(stats->log, "Failed to create kafka handle, error={}", errstr);
throw DB::Exception("Failed to create kafka handle", mapErrorCode(rd_kafka_last_error()));
throw DB::Exception(mapErrorCode(rd_kafka_last_error()), "Failed to create kafka handle, error={}", errstr);
}

return kafka_handle;
Expand Down
3 changes: 2 additions & 1 deletion src/KafkaLog/KafkaWALCommon.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include "KafkaWALProperties.h"
#include "Results.h"

#include <NativeLog/Record/Record.h>
Expand Down Expand Up @@ -30,6 +29,8 @@ using KConfPtr = std::unique_ptr<rd_kafka_conf_t, decltype(rd_kafka_conf_destroy
using KTopicConfPtr = std::unique_ptr<rd_kafka_topic_conf_t, decltype(rd_kafka_topic_conf_destroy) *>;
using KConfCallback = std::function<void(rd_kafka_conf_t *)>;

using KConfParams = std::vector<std::pair<std::string, std::string>>;

struct PartitionTimestamp
{
PartitionTimestamp(int32_t partition_, int64_t timestamp_) : partition(partition_), timestamp(timestamp_) { }
Expand Down
95 changes: 0 additions & 95 deletions src/KafkaLog/KafkaWALPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,6 @@ namespace
/// Globals
const std::string SYSTEM_WALS_KEY = "cluster_settings.logstore";
const std::string SYSTEM_WALS_KEY_PREFIX = SYSTEM_WALS_KEY + ".";


int32_t bucketFetchWaitMax(int32_t fetch_wait_max_ms)
{
if (fetch_wait_max_ms <= 10)
return 10;
else if (fetch_wait_max_ms <= 20)
return 20;
else if (fetch_wait_max_ms <= 30)
return 30;
else if (fetch_wait_max_ms <= 40)
return 40;
else if (fetch_wait_max_ms <= 50)
return 50;
else if (fetch_wait_max_ms <= 100)
return 100;
else if (fetch_wait_max_ms <= 200)
return 200;
else if (fetch_wait_max_ms <= 300)
return 300;
else if (fetch_wait_max_ms <= 400)
return 400;
else if (fetch_wait_max_ms <= 500)
return 500;

return 500;
}
}

KafkaWALPool & KafkaWALPool::instance(const DB::ContextPtr & global_context)
Expand Down Expand Up @@ -397,74 +370,6 @@ KafkaWALConsumerMultiplexerPtr KafkaWALPool::getOrCreateConsumerMultiplexer(cons
return *best_multiplexer;
}

KafkaWALSimpleConsumerPtr KafkaWALPool::getOrCreateStreamingExternal(const String & brokers, const KafkaWALAuth & auth, int32_t fetch_wait_max_ms)
{
assert(!brokers.empty());

fetch_wait_max_ms = bucketFetchWaitMax(fetch_wait_max_ms);

std::lock_guard lock{external_streaming_lock};

if (!external_streaming_consumers.contains(brokers))
/// FIXME, configurable max cached consumers
external_streaming_consumers.emplace(brokers, std::pair<size_t, KafkaWALSimpleConsumerPtrs>(100, KafkaWALSimpleConsumerPtrs{}));

/// FIXME, remove cached cluster
if (external_streaming_consumers.size() > 1000)
throw DB::Exception("Too many external Kafka cluster registered", DB::ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);

auto & consumers = external_streaming_consumers[brokers];

for (const auto & consumer : consumers.second)
{
const auto & consumer_settings = consumer->getSettings();
if (consumer.use_count() == 1 && consumer_settings.fetch_wait_max_ms == fetch_wait_max_ms && consumer_settings.auth == auth)
{
LOG_INFO(log, "Reusing external Kafka consume with settings={}", consumer_settings.string());
return consumer;
}
}

/// consumer is used up and if we didn't reach maximum
if (consumers.second.size() < consumers.first)
{
if (!boost::iequals(auth.security_protocol, "plaintext")
&& !boost::iequals(auth.security_protocol, "sasl_plaintext")
&& !boost::iequals(auth.security_protocol, "sasl_ssl"))
throw DB::Exception(
DB::ErrorCodes::NOT_IMPLEMENTED,
"Invalid logstore kafka settings security_protocol: {}. Only plaintext, sasl_plaintext or sasl_ssl are supported",
auth.security_protocol);

/// Create one
auto ksettings = std::make_unique<KafkaWALSettings>();

ksettings->fetch_wait_max_ms = fetch_wait_max_ms;

ksettings->brokers = brokers;

/// Streaming WALs have a different group ID
ksettings->group_id += "-tp-external-streaming-query-" + std::to_string(consumers.second.size() + 1);
ksettings->auth = auth;

/// We don't care offset checkpointing for WALs used for streaming processing,
/// No auto commit
ksettings->enable_auto_commit = false;

LOG_INFO(log, "Create new external Kafka consume with settings={{{}}}", ksettings->string());

auto consumer = std::make_shared<KafkaWALSimpleConsumer>(std::move(ksettings));
consumer->startup();
consumers.second.push_back(consumer);
return consumer;
}
else
{
LOG_ERROR(log, "External streaming processing pool in cluster={} is used up, size={}", brokers, consumers.first);
throw DB::Exception("Max external streaming processing pool size has been reached", DB::ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
}
}

std::vector<KafkaWALClusterPtr> KafkaWALPool::clusters(const KafkaWALContext & ctx) const
{
std::vector<KafkaWALClusterPtr> results;
Expand Down
2 changes: 0 additions & 2 deletions src/KafkaLog/KafkaWALPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ class KafkaWALPool : private boost::noncopyable

KafkaWALSimpleConsumerPtr getOrCreateStreaming(const String & cluster_id);

KafkaWALSimpleConsumerPtr getOrCreateStreamingExternal(const String & brokers, const KafkaWALAuth & auth, int32_t fetch_wait_max_ms = 200);

std::vector<KafkaWALClusterPtr> clusters(const KafkaWALContext & ctx) const;

bool enabled() const { return meta_wal != nullptr; }
Expand Down
50 changes: 0 additions & 50 deletions src/KafkaLog/KafkaWALProperties.cpp

This file was deleted.

13 changes: 0 additions & 13 deletions src/KafkaLog/KafkaWALProperties.h

This file was deleted.

2 changes: 1 addition & 1 deletion src/KafkaLog/KafkaWALSettings.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#pragma once

#include <boost/algorithm/string/join.hpp>

#include <boost/algorithm/string/predicate.hpp>

#include <fmt/format.h>

namespace klog
Expand Down
82 changes: 0 additions & 82 deletions src/KafkaLog/tests/gtest_parse_properties.cpp

This file was deleted.

48 changes: 42 additions & 6 deletions src/Storages/ExternalStream/ExternalStreamSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>

#include <boost/algorithm/string/predicate.hpp>

namespace DB
{
class ASTStorage;

#define EXTERNAL_STREAM_RELATED_SETTINGS(M) \
M(String, type, "", "External stream type", 0) \
/* those are kafka related settings */ \
#define KAFKA_EXTERNAL_STREAM_SETTINGS(M) \
M(String, brokers, "", "A comma-separated list of brokers, for example Kafka brokers.", 0) \
M(String, topic, "", "topic, for example Kafka topic name.", 0) \
M(String, security_protocol, "plaintext", "The protocol to connection external logstore", 0) \
Expand All @@ -20,10 +20,12 @@ class ASTStorage;
M(String, sasl_mechanism, "PLAIN", "SASL mechanism to use for authentication. Supported: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. Default to PLAIN when SASL is enabled.", 0) \
M(String, ssl_ca_cert_file, "", "The path of ssl ca cert file", 0) \
M(String, properties, "", "A semi-colon-separated key-value pairs for configuring the kafka client used by the external stream. A key-value pair is separated by a equal sign. Example: 'client.id=my-client-id;group.id=my-group-id'. Note, not all properties are supported, please check the document for supported properties.", 0) \
M(UInt64, poll_waittime_ms, 500, "How long (in milliseconds) should poll waits.", 0) \
M(String, sharding_expr, "", "An expression which will be evaluated on each row of data returned by the query to calculate the an integer which will be used to determine the ID of the partition to which the row of data will be sent. If not set, data are sent to any partition randomly.", 0) \
M(String, message_key, "", "An expression which will be evaluated on each row of data returned by the query to compute a string which will be used as the message key.", 0) \
M(Bool, one_message_per_row, false, "If set to true, when send data to the Kafka external stream with row-based data format like `JSONEachRow`, it will produce one message per row.", 0) \
/* those are log related settings */ \
M(Bool, one_message_per_row, false, "If set to true, when send data to the Kafka external stream with row-based data format like `JSONEachRow`, it will produce one message per row.", 0)

#define LOG_FILE_EXTERNAL_STREAM_SETTINGS(M) \
M(String, log_files, "", "A comma-separated list of log files", 0) \
M(String, log_dir, "", "log root directory", 0) \
M(String, timestamp_regex, "", "Regex to extract log timestamp", 0) \
Expand All @@ -33,10 +35,31 @@ class ASTStorage;
M(String, row_delimiter, "\n", "The string to be considered as a delimiter in raw message.", 0) \
M(UInt64, max_row_length, 4096, "Max row length", 0)

#define ALL_EXTERNAL_STREAM_SETTINGS(M) \
M(String, type, "", "External stream type", 0) \
KAFKA_EXTERNAL_STREAM_SETTINGS(M) \
LOG_FILE_EXTERNAL_STREAM_SETTINGS(M)

#define LIST_OF_EXTERNAL_STREAM_SETTINGS(M) \
EXTERNAL_STREAM_RELATED_SETTINGS(M) \
ALL_EXTERNAL_STREAM_SETTINGS(M) \
FORMAT_FACTORY_SETTINGS(M)

DECLARE_SETTINGS_TRAITS(KafkaExternalStreamSettingsTraits, KAFKA_EXTERNAL_STREAM_SETTINGS)

struct KafkaExternalStreamSettings : public BaseSettings<KafkaExternalStreamSettingsTraits>
{
bool usesSASL() const
{
return boost::istarts_with(security_protocol.value, "SASL_");
}

/// "SASL_SSL" or "SSL"
bool usesSecureConnection() const
{
return boost::iends_with(security_protocol.value, "SSL");
}
};

DECLARE_SETTINGS_TRAITS(ExternalStreamSettingsTraits, LIST_OF_EXTERNAL_STREAM_SETTINGS)

/** Settings for the ExternalStream engine.
Expand All @@ -46,6 +69,19 @@ struct ExternalStreamSettings : public BaseSettings<ExternalStreamSettingsTraits
{
void loadFromQuery(ASTStorage & storage_def);

KafkaExternalStreamSettings getKafkaSettings()
{
KafkaExternalStreamSettings settings {};
#define SET_CHANGED_SETTINGS(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) \
if ((NAME).changed) \
settings.NAME = (NAME);

KAFKA_EXTERNAL_STREAM_SETTINGS(SET_CHANGED_SETTINGS)

#undef SET_CHANGED_SETTINGS
return settings;
}

FormatSettings getFormatSettings(const ContextPtr & context)
{
FormatFactorySettings settings {};
Expand Down
Loading
Loading