Skip to content

Commit

Permalink
feat: connection for schema registry (#19685)
Browse files Browse the repository at this point in the history
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
tabVersion and tabversion authored Dec 5, 2024
1 parent 5ba9a73 commit 45d2664
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 5 deletions.
56 changes: 56 additions & 0 deletions e2e_test/source_inline/connection/schema_registry.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

statement error unknown field `aws.region`, expected one of `schema.registry`, `schema.registry.username`, `schema.registry.password`
CREATE CONNECTION schema_registry_conn with ( type = 'schema_registry', aws.region = 'us-east-1');

statement ok
CREATE CONNECTION schema_registry_conn with ( type = 'schema_registry', schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');

# cleanup
system ok
rpk topic delete 'sr_conn_test' || true; \
(rpk sr subject delete 'sr_conn_test-value' && rpk sr subject delete 'sr_conn_test-value' --permanent) || true;

# create topic and sr subject
system ok
rpk topic create 'sr_conn_test'

system ok
sr_register sr_conn_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}'

system ok
echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic sr_conn_test

statement error Invalid Parameter Value: Glue related options/secrets are not allowed when using schema registry connection
create table t
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_conn_test'
)
FORMAT PLAIN ENCODE AVRO (
connection = schema_registry_conn,
aws.region = 'us-east-1'
);

statement ok
create table t
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_conn_test'
)
FORMAT PLAIN ENCODE AVRO (
connection = schema_registry_conn
);

# clean up

statement ok
drop table t;

statement ok
drop connection schema_registry_conn;


20 changes: 17 additions & 3 deletions src/connector/src/connector_common/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ use with_options::WithOptions;

use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon};
use crate::error::ConnectorResult;
use crate::schema::schema_registry::Client as ConfluentSchemaRegistryClient;
use crate::source::kafka::{KafkaContextCommon, RwConsumerContext};
use crate::{dispatch_connection_impl, ConnectionImpl};

pub const SCHEMA_REGISTRY_CONNECTION_TYPE: &str = "schema_registry";

#[async_trait]
pub trait Connection {
async fn test_connection(&self) -> ConnectorResult<()>;
Expand Down Expand Up @@ -118,11 +121,22 @@ impl Connection for IcebergConnection {
#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
#[serde(deny_unknown_fields)]
pub struct SchemaRegistryConnection {}
pub struct ConfluentSchemaRegistryConnection {
#[serde(rename = "schema.registry")]
pub url: String,
// ref `SchemaRegistryAuth`
#[serde(rename = "schema.registry.username")]
pub username: Option<String>,
#[serde(rename = "schema.registry.password")]
pub password: Option<String>,
}

#[async_trait]
impl Connection for SchemaRegistryConnection {
impl Connection for ConfluentSchemaRegistryConnection {
async fn test_connection(&self) -> ConnectorResult<()> {
todo!()
// GET /config to validate the connection
let client = ConfluentSchemaRegistryClient::try_from(self)?;
client.validate_connection().await?;
Ok(())
}
}
3 changes: 2 additions & 1 deletion src/connector/src/connector_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ pub use common::{
};
mod connection;
pub use connection::{
validate_connection, Connection, IcebergConnection, KafkaConnection, SchemaRegistryConnection,
validate_connection, ConfluentSchemaRegistryConnection, Connection, IcebergConnection,
KafkaConnection, SCHEMA_REGISTRY_CONNECTION_TYPE,
};

mod iceberg;
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ macro_rules! for_all_connections {
{
{ Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
{ Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
{ SchemaRegistry, $crate::connector_common::SchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType }
{ SchemaRegistry, $crate::connector_common::ConfluentSchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType }
}
$(,$extra_args)*
}
Expand Down
30 changes: 30 additions & 0 deletions src/connector/src/schema/schema_registry/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use futures::future::select_all;
use itertools::Itertools;
use reqwest::{Method, Url};
use serde::de::DeserializeOwned;
use serde::Deserialize;
use thiserror_ext::AsReport as _;

use super::util::*;
use crate::connector_common::ConfluentSchemaRegistryConnection;
use crate::schema::{invalid_option_error, InvalidOptionError};

pub const SCHEMA_REGISTRY_USERNAME: &str = "schema.registry.username";
Expand Down Expand Up @@ -70,6 +72,22 @@ pub struct ConcurrentRequestError {

type SrResult<T> = Result<T, ConcurrentRequestError>;

impl TryFrom<&ConfluentSchemaRegistryConnection> for Client {
type Error = InvalidOptionError;

fn try_from(value: &ConfluentSchemaRegistryConnection) -> Result<Self, Self::Error> {
let urls = handle_sr_list(value.url.as_str())?;

Client::new(
urls,
&SchemaRegistryAuth {
username: value.username.clone(),
password: value.password.clone(),
},
)
}
}

impl Client {
pub(crate) fn new(
url: Vec<Url>,
Expand Down Expand Up @@ -160,6 +178,18 @@ impl Client {
self.get_subject(subject).await.map(|s| s.schema)
}

// used for connection validate, just check if request is ok
pub async fn validate_connection(&self) -> SrResult<()> {
#[derive(Debug, Deserialize)]
struct GetConfigResp {
#[serde(rename = "compatibilityLevel")]
_compatibility_level: String,
}

let _: GetConfigResp = self.concurrent_req(Method::GET, &["config"]).await?;
Ok(())
}

/// get the latest version of the subject
pub async fn get_subject(&self, subject: &str) -> SrResult<Subject> {
let res: GetBySubjectResp = self
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::BTreeMap;

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_connector::connector_common::SCHEMA_REGISTRY_CONNECTION_TYPE;
use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
use risingwave_connector::source::kafka::{KAFKA_CONNECTOR, PRIVATELINK_CONNECTION};
use risingwave_pb::catalog::connection_params::ConnectionType;
Expand Down Expand Up @@ -69,6 +70,7 @@ fn resolve_create_connection_payload(
}
KAFKA_CONNECTOR => ConnectionType::Kafka,
ICEBERG_CONNECTOR => ConnectionType::Iceberg,
SCHEMA_REGISTRY_CONNECTION_TYPE => ConnectionType::SchemaRegistry,
_ => {
return Err(RwError::from(ProtocolError(format!(
"Connection type \"{connection_type}\" is not supported"
Expand Down
16 changes: 16 additions & 0 deletions src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,22 @@ pub(crate) fn resolve_connection_ref_and_secret_ref(
))));
}
}

{
// check if not mess up with schema registry connection and glue connection
if connection_type == PbConnectionType::SchemaRegistry {
// Check no AWS related options when using schema registry connection
if options
.keys()
.chain(inner_secret_refs.keys())
.any(|k| k.starts_with("aws"))
{
return Err(RwError::from(ErrorCode::InvalidParameterValue(
"Glue related options/secrets are not allowed when using schema registry connection".to_string()
)));
}
}
}
}

// connection_params is None means the connection is not retrieved, so the connection type should be unspecified
Expand Down

0 comments on commit 45d2664

Please sign in to comment.