-
Notifications
You must be signed in to change notification settings - Fork 613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Connection for Kafka source & sink #19270
Conversation
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabversion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
…ngwavelabs/risingwave into tab/share-kafka-client-enum
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
…ngwavelabs/risingwave into tab/share-kafka-client-enum
Does |
Co-authored-by: tabversion <[email protected]>
Signed-off-by: tabversion <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM
@@ -246,6 +269,7 @@ message Connection { | |||
string name = 4; | |||
oneof info { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, then let's place connection_params
outside the oneof
at least
@@ -156,6 +161,9 @@ message SinkFormatDesc { | |||
optional plan_common.EncodeType key_encode = 4; | |||
// Secret used for format encode options. | |||
map<string, secret.SecretRef> secret_refs = 5; | |||
|
|||
// ref connection for schema registry | |||
optional uint32 connection_id = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. You keep both connection_id
and the resolved connection arguments in these message structures, right? It's acceptable to me but a bit counter-intuitive.
And to keep the design simple and align with the secret ref,
IIRC, secret ref doesn't keep the resolved plaintext secret at all. It always resolves whenever using a secret.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Common part generally LGTM.
@@ -123,6 +126,8 @@ message Source { | |||
uint32 associated_table_id = 12; | |||
} | |||
string definition = 13; | |||
|
|||
// ref connection for connector | |||
optional uint32 connection_id = 14; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we just reuse Source.connection_id
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and same for the schema registry part and sink.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
following #18975
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
introducing a new catalog
CONNECTION
. and have integrated with SECRETplease note that the legacy
create connection
to AWS PrivateLink has been deprecated in #18975.new syntax
planned support for Kafka, iceberg (by @chenzl25 ) and FS (@wcy-fdu ) at the first stage.
when creating source/sink from a connection,
connector
must match with the connectiontype
& the ref key must beconnection
and the attrs defined in connection and source/table/sink cannot have overlap
to perform connection validate, we need a new kafka ACL:
DESCRIBE CLUSTER
(the privilege is auth via username and password, not related with consumer group. )Connection stores KV in the catalog and validation only takes a copy.
When building source/sink/table, we first fill the KVs in connection catalog to the with options and then start the create procedure.
accepted kafka connection props
(connection related)
(private link related)
handle_create_connection
will do the private link resolve and remove bothprivatelink.targets
andprivatelink.endpoint
and insert thebroker.rewrite.endpoints
to the props.so if users specify
privatelink.targets
andprivatelink.endpoint
in connection, they cannot set it again when create source/table/sink.(aws auth related: for msk)