-
Notifications
You must be signed in to change notification settings - Fork 7.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka with protobuf format #4808
Kafka with protobuf format #4808
Conversation
Apply query settings only for the containing query (client).
01d8a3e
to
c1413cb
Compare
c1413cb
to
ba58fbb
Compare
@@ -36,7 +36,7 @@ class DelimitedReadBuffer : public ReadBuffer | |||
return false; | |||
|
|||
BufferBase::set(buffer->position(), buffer->available(), 0); | |||
put_delimiter = true; | |||
put_delimiter = (delimiter != 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using protobuf messages a single zero byte means a message with all its fields set to their default values. It seems there is no format where putting a single zero byte as a row delimiter could be useful. So I've decided not to put a row delimiter if it's set to '\0'
.
checkStringIsACharacter(x); | ||
value = x[0]; | ||
changed = true; | ||
if (x.size() > 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor change to allow setting SettingChar
to '\0'
just specifying ''
.
For example,
create table simple (
t UInt64,
url String
) ENGINE=Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'mygroup',
kafka_format = 'Protobuf',
kafka_row_delimiter = '',
kafka_schema = 'simple:AccessLog';
KAFKA_BROKER_ID: 1 | ||
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" | ||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | ||
KAFKA_ADVERTISED_LISTENERS: INSIDE://localhost:9092,OUTSIDE://kafka1:19092 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes were made to allow the python test test_kafka_protobuf
to publish messages to kafka.
@@ -796,14 +796,33 @@ class Client : public Poco::Util::Application | |||
written_progress_chars = 0; | |||
written_first_block = false; | |||
|
|||
connection->forceConnected(); | |||
{ | |||
/// Temporarily apply query settings to context. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to do apply all server-side settings on the client-side here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem to be trivial to distinguish at this point which setting are client-side and which are server-side. It's easier to just apply all the settings.
changed = true; | ||
if (x.size() > 1) | ||
throw Exception("A setting's value string has to be an exactly one character long", ErrorCodes::SIZE_OF_FIXED_STRING_DOESNT_MATCH); | ||
char c = (x.size() == 1) ? x[0] : '\0'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can we now distinguish the empty setting and 'zero'-char setting values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't distinguish them. The following two declaration mean the same:
create table simple (
t UInt64,
url String
) ENGINE=Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'mygroup',
kafka_format = 'Protobuf',
kafka_row_delimiter = '',
kafka_schema = 'simple:AccessLog';
and
create table simple (
t UInt64,
url String
) ENGINE=Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'mygroup',
kafka_format = 'Protobuf',
kafka_row_delimiter = '\0',
kafka_schema = 'simple:AccessLog';
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
#4710
#4744
Category (leave one):