From 45d266424ef4040795d97c6f744e021b2a6a2b8a Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Thu, 5 Dec 2024 17:29:38 +0800 Subject: [PATCH] feat: connection for schema registry (#19685) Co-authored-by: tabversion --- .../connection/schema_registry.slt | 56 +++++++++++++++++++ .../src/connector_common/connection.rs | 20 ++++++- src/connector/src/connector_common/mod.rs | 3 +- src/connector/src/macros.rs | 2 +- .../src/schema/schema_registry/client.rs | 30 ++++++++++ src/frontend/src/handler/create_connection.rs | 2 + src/frontend/src/utils/with_options.rs | 16 ++++++ 7 files changed, 124 insertions(+), 5 deletions(-) create mode 100644 e2e_test/source_inline/connection/schema_registry.slt diff --git a/e2e_test/source_inline/connection/schema_registry.slt b/e2e_test/source_inline/connection/schema_registry.slt new file mode 100644 index 0000000000000..e97f868cfec88 --- /dev/null +++ b/e2e_test/source_inline/connection/schema_registry.slt @@ -0,0 +1,56 @@ +control substitution on + +statement ok +SET streaming_use_shared_source TO false; + +statement error unknown field `aws.region`, expected one of `schema.registry`, `schema.registry.username`, `schema.registry.password` +CREATE CONNECTION schema_registry_conn with ( type = 'schema_registry', aws.region = 'us-east-1'); + +statement ok +CREATE CONNECTION schema_registry_conn with ( type = 'schema_registry', schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); + +# cleanup +system ok +rpk topic delete 'sr_conn_test' || true; \ +(rpk sr subject delete 'sr_conn_test-value' && rpk sr subject delete 'sr_conn_test-value' --permanent) || true; + +# create topic and sr subject +system ok +rpk topic create 'sr_conn_test' + +system ok +sr_register sr_conn_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' + +system ok +echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic sr_conn_test + +statement error Invalid Parameter Value: Glue related options/secrets are not allowed when using schema registry connection +create table t +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_conn_test' +) +FORMAT PLAIN ENCODE AVRO ( + connection = schema_registry_conn, + aws.region = 'us-east-1' +); + +statement ok +create table t +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_conn_test' +) +FORMAT PLAIN ENCODE AVRO ( + connection = schema_registry_conn +); + +# clean up + +statement ok +drop table t; + +statement ok +drop connection schema_registry_conn; + + diff --git a/src/connector/src/connector_common/connection.rs b/src/connector/src/connector_common/connection.rs index fa1f420544677..dd89dc4104f99 100644 --- a/src/connector/src/connector_common/connection.rs +++ b/src/connector/src/connector_common/connection.rs @@ -25,9 +25,12 @@ use with_options::WithOptions; use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon}; use crate::error::ConnectorResult; +use crate::schema::schema_registry::Client as ConfluentSchemaRegistryClient; use crate::source::kafka::{KafkaContextCommon, RwConsumerContext}; use crate::{dispatch_connection_impl, ConnectionImpl}; +pub const SCHEMA_REGISTRY_CONNECTION_TYPE: &str = "schema_registry"; + #[async_trait] pub trait Connection { async fn test_connection(&self) -> ConnectorResult<()>; @@ -118,11 +121,22 @@ impl Connection for IcebergConnection { #[serde_as] #[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)] #[serde(deny_unknown_fields)] -pub struct SchemaRegistryConnection {} +pub struct ConfluentSchemaRegistryConnection { + #[serde(rename = "schema.registry")] + pub url: String, + // ref `SchemaRegistryAuth` + #[serde(rename = "schema.registry.username")] + pub username: Option, + #[serde(rename = "schema.registry.password")] + pub password: Option, +} #[async_trait] -impl Connection for SchemaRegistryConnection { +impl Connection for ConfluentSchemaRegistryConnection { async fn test_connection(&self) -> ConnectorResult<()> { - todo!() + // GET /config to validate the connection + let client = ConfluentSchemaRegistryClient::try_from(self)?; + client.validate_connection().await?; + Ok(()) } } diff --git a/src/connector/src/connector_common/mod.rs b/src/connector/src/connector_common/mod.rs index 11f5946bf98e1..223f4ecd60dd2 100644 --- a/src/connector/src/connector_common/mod.rs +++ b/src/connector/src/connector_common/mod.rs @@ -25,7 +25,8 @@ pub use common::{ }; mod connection; pub use connection::{ - validate_connection, Connection, IcebergConnection, KafkaConnection, SchemaRegistryConnection, + validate_connection, ConfluentSchemaRegistryConnection, Connection, IcebergConnection, + KafkaConnection, SCHEMA_REGISTRY_CONNECTION_TYPE, }; mod iceberg; diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index e5e4140f3a2f2..ff30485b546b4 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -57,7 +57,7 @@ macro_rules! for_all_connections { { { Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType }, { Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType }, - { SchemaRegistry, $crate::connector_common::SchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType } + { SchemaRegistry, $crate::connector_common::ConfluentSchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType } } $(,$extra_args)* } diff --git a/src/connector/src/schema/schema_registry/client.rs b/src/connector/src/schema/schema_registry/client.rs index 21b0d4a7f6586..1545597851f38 100644 --- a/src/connector/src/schema/schema_registry/client.rs +++ b/src/connector/src/schema/schema_registry/client.rs @@ -20,9 +20,11 @@ use futures::future::select_all; use itertools::Itertools; use reqwest::{Method, Url}; use serde::de::DeserializeOwned; +use serde::Deserialize; use thiserror_ext::AsReport as _; use super::util::*; +use crate::connector_common::ConfluentSchemaRegistryConnection; use crate::schema::{invalid_option_error, InvalidOptionError}; pub const SCHEMA_REGISTRY_USERNAME: &str = "schema.registry.username"; @@ -70,6 +72,22 @@ pub struct ConcurrentRequestError { type SrResult = Result; +impl TryFrom<&ConfluentSchemaRegistryConnection> for Client { + type Error = InvalidOptionError; + + fn try_from(value: &ConfluentSchemaRegistryConnection) -> Result { + let urls = handle_sr_list(value.url.as_str())?; + + Client::new( + urls, + &SchemaRegistryAuth { + username: value.username.clone(), + password: value.password.clone(), + }, + ) + } +} + impl Client { pub(crate) fn new( url: Vec, @@ -160,6 +178,18 @@ impl Client { self.get_subject(subject).await.map(|s| s.schema) } + // used for connection validate, just check if request is ok + pub async fn validate_connection(&self) -> SrResult<()> { + #[derive(Debug, Deserialize)] + struct GetConfigResp { + #[serde(rename = "compatibilityLevel")] + _compatibility_level: String, + } + + let _: GetConfigResp = self.concurrent_req(Method::GET, &["config"]).await?; + Ok(()) + } + /// get the latest version of the subject pub async fn get_subject(&self, subject: &str) -> SrResult { let res: GetBySubjectResp = self diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs index 54ca0f3d28520..ec727f5a63792 100644 --- a/src/frontend/src/handler/create_connection.rs +++ b/src/frontend/src/handler/create_connection.rs @@ -15,6 +15,7 @@ use std::collections::BTreeMap; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_connector::connector_common::SCHEMA_REGISTRY_CONNECTION_TYPE; use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; use risingwave_connector::source::kafka::{KAFKA_CONNECTOR, PRIVATELINK_CONNECTION}; use risingwave_pb::catalog::connection_params::ConnectionType; @@ -69,6 +70,7 @@ fn resolve_create_connection_payload( } KAFKA_CONNECTOR => ConnectionType::Kafka, ICEBERG_CONNECTOR => ConnectionType::Iceberg, + SCHEMA_REGISTRY_CONNECTION_TYPE => ConnectionType::SchemaRegistry, _ => { return Err(RwError::from(ProtocolError(format!( "Connection type \"{connection_type}\" is not supported" diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 2d5d53fe93c0b..3280370952375 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -304,6 +304,22 @@ pub(crate) fn resolve_connection_ref_and_secret_ref( )))); } } + + { + // check if not mess up with schema registry connection and glue connection + if connection_type == PbConnectionType::SchemaRegistry { + // Check no AWS related options when using schema registry connection + if options + .keys() + .chain(inner_secret_refs.keys()) + .any(|k| k.starts_with("aws")) + { + return Err(RwError::from(ErrorCode::InvalidParameterValue( + "Glue related options/secrets are not allowed when using schema registry connection".to_string() + ))); + } + } + } } // connection_params is None means the connection is not retrieved, so the connection type should be unspecified