From ddc7c276e610553ac6e64bfb35f4cac50b106157 Mon Sep 17 00:00:00 2001 From: wu Date: Tue, 18 Jul 2023 15:04:53 +0800 Subject: [PATCH] refactor(source): separate key and payload accessor building (#10969) --- src/connector/benches/parser.rs | 5 +- src/connector/src/aws_auth.rs | 1 + src/connector/src/macros.rs | 2 +- src/connector/src/parser/avro/parser.rs | 111 +++++-- src/connector/src/parser/bytes_parser.rs | 29 +- .../src/parser/canal/simd_json_parser.rs | 6 +- src/connector/src/parser/csv_parser.rs | 15 +- .../src/parser/debezium/avro_parser.rs | 182 +++++------ .../src/parser/debezium/debezium_parser.rs | 117 +++++++ src/connector/src/parser/debezium/mod.rs | 6 +- .../src/parser/debezium/mongo_json_parser.rs | 8 +- .../src/parser/debezium/simd_json_parser.rs | 131 ++++---- src/connector/src/parser/json_parser.rs | 92 ++++-- .../src/parser/maxwell/maxwell_parser.rs | 87 ++++++ src/connector/src/parser/maxwell/mod.rs | 4 +- .../src/parser/maxwell/simd_json_parser.rs | 81 +---- src/connector/src/parser/mod.rs | 286 ++++++++++++------ src/connector/src/parser/protobuf/parser.rs | 65 +++- src/connector/src/parser/unified/debezium.rs | 2 +- src/connector/src/parser/unified/mod.rs | 24 ++ src/connector/src/parser/unified/protobuf.rs | 51 ++++ src/connector/src/parser/util.rs | 14 + src/connector/src/source/base.rs | 4 +- .../src/source/cdc/source/message.rs | 1 + .../src/source/datagen/source/generator.rs | 1 + .../src/source/filesystem/nd_streaming.rs | 2 + .../src/source/filesystem/s3/source/reader.rs | 3 +- .../source/google_pubsub/source/message.rs | 1 + .../src/source/kafka/source/message.rs | 19 +- .../src/source/kafka/source/reader.rs | 8 +- .../src/source/kinesis/source/message.rs | 1 + .../src/source/nexmark/source/message.rs | 1 + .../src/source/pulsar/source/message.rs | 1 + src/frontend/src/handler/create_source.rs | 10 +- src/source/benches/json_parser.rs | 5 +- 35 files changed, 901 insertions(+), 475 deletions(-) create mode 100644 src/connector/src/parser/debezium/debezium_parser.rs create mode 100644 src/connector/src/parser/maxwell/maxwell_parser.rs create mode 100644 src/connector/src/parser/unified/protobuf.rs diff --git a/src/connector/benches/parser.rs b/src/connector/benches/parser.rs index 5a2390c3e4fe8..92b9b3b874fa8 100644 --- a/src/connector/benches/parser.rs +++ b/src/connector/benches/parser.rs @@ -81,7 +81,10 @@ async fn parse(parser: JsonParser, column_desc: Vec, input: Ve SourceStreamChunkBuilder::with_capacity(column_desc.clone(), input_inner.len()); for payload in input_inner { let row_writer = builder.row_writer(); - parser.parse_inner(payload, row_writer).await.unwrap(); + parser + .parse_inner(None, Some(payload), row_writer) + .await + .unwrap(); } builder.finish(); } diff --git a/src/connector/src/aws_auth.rs b/src/connector/src/aws_auth.rs index 92e34976a6e46..701931f2bbb39 100644 --- a/src/connector/src/aws_auth.rs +++ b/src/connector/src/aws_auth.rs @@ -20,6 +20,7 @@ use aws_types::region::Region; use aws_types::SdkConfig; /// A flatten cofig map for aws auth. +#[derive(Debug, Clone)] pub struct AwsAuthProps { pub region: Option, pub endpoint: Option, diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 04d730ee25ed3..96931edeeade2 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -215,7 +215,7 @@ macro_rules! impl_common_split_reader_logic { }) .boxed(); let parser = - $crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx)?; + $crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?; #[for_await] for msg_batch in parser.into_stream(data_stream) { yield msg_batch?; diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index bcf7b0ebd1f38..6f52e62f281e1 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::Cow; use std::fmt::Debug; use std::sync::Arc; @@ -26,17 +25,79 @@ use url::Url; use super::schema_resolver::*; use super::util::avro_schema_to_column_descs; -use crate::common::UpsertMessage; use crate::parser::schema_registry::{extract_schema_id, Client}; use crate::parser::unified::avro::{AvroAccess, AvroParseOptions}; use crate::parser::unified::upsert::UpsertChangeEvent; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; +use crate::parser::unified::AccessImpl; use crate::parser::{ - ByteStreamSourceParser, EncodingProperties, ParserProperties, SourceStreamChunkRowWriter, - WriteGuard, + AccessBuilder, ByteStreamSourceParser, EncodingProperties, EncodingType, + SourceStreamChunkRowWriter, WriteGuard, }; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; +// Default avro access builder +#[derive(Debug)] +pub struct AvroAccessBuilder { + schema: Arc, + pub schema_resolver: Option>, + value: Option, +} + +impl AccessBuilder for AvroAccessBuilder { + async fn generate_accessor(&mut self, payload: Vec) -> Result> { + self.value = self.parse_avro_value(&payload, Some(&*self.schema)).await?; + Ok(AccessImpl::Avro(AvroAccess::new( + self.value.as_ref().unwrap(), + AvroParseOptions::default().with_schema(&self.schema), + ))) + } +} + +impl AvroAccessBuilder { + pub fn new(config: AvroParserConfig, _encoding_type: EncodingType) -> Result { + let AvroParserConfig { + schema, + schema_resolver, + .. + } = config; + Ok(Self { + schema, + schema_resolver, + value: None, + }) + } + + async fn parse_avro_value( + &self, + payload: &[u8], + reader_schema: Option<&Schema>, + ) -> anyhow::Result> { + // parse payload to avro value + // if use confluent schema, get writer schema from confluent schema registry + if let Some(resolver) = &self.schema_resolver { + let (schema_id, mut raw_payload) = extract_schema_id(payload)?; + let writer_schema = resolver.get(schema_id).await?; + Ok(Some(from_avro_datum( + writer_schema.as_ref(), + &mut raw_payload, + reader_schema, + )?)) + } else if let Some(schema) = reader_schema { + let mut reader = Reader::with_schema(schema, payload)?; + match reader.next() { + Some(Ok(v)) => Ok(Some(v)), + Some(Err(e)) => Err(e)?, + None => { + anyhow::bail!("avro parse unexpected eof") + } + } + } else { + unreachable!("both schema_resolver and reader_schema not exist"); + } + } +} + #[derive(Debug)] pub struct AvroParser { schema: Arc, @@ -56,9 +117,8 @@ pub struct AvroParserConfig { } impl AvroParserConfig { - pub async fn new(parser_properties: ParserProperties) -> Result { - let avro_config = - try_match_expand!(parser_properties.encoding_config, EncodingProperties::Avro)?; + pub async fn new(encoding_properties: EncodingProperties) -> Result { + let avro_config = try_match_expand!(encoding_properties, EncodingProperties::Avro)?; let schema_location = &avro_config.row_schema_location; let enable_upsert = avro_config.enable_upsert; let url = Url::parse(schema_location).map_err(|e| { @@ -192,32 +252,17 @@ impl AvroParser { pub(crate) async fn parse_inner( &self, - payload: Vec, + key: Option>, + payload: Option>, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result { - let (raw_key, raw_value) = if self.is_enable_upsert() { - let msg: UpsertMessage<'_> = bincode::deserialize(&payload).map_err(|e| { - RwError::from(ProtocolError(format!( - "extract payload err {:?}, you may need to check the 'upsert' parameter", - e - ))) - })?; - if !msg.record.is_empty() { - (Some(msg.primary_key), Some(msg.record)) - } else { - (Some(msg.primary_key), None) - } - } else { - (None, Some(Cow::from(&payload))) - }; - - let avro_value = if let Some(payload) = raw_value { + let avro_value = if let Some(payload) = payload { self.parse_avro_value(payload.as_ref(), Some(&*self.schema)) .await? } else { None }; - let avro_key = if let Some(payload) = raw_key { + let avro_key = if let Some(payload) = key { self.parse_avro_value(payload.as_ref(), self.key_schema.as_deref()) .await? } else { @@ -262,10 +307,11 @@ impl ByteStreamSourceParser for AvroParser { async fn parse_one<'a>( &'a mut self, - payload: Vec, + key: Option>, + payload: Option>, writer: SourceStreamChunkRowWriter<'a>, ) -> Result { - self.parse_inner(payload, writer).await + self.parse_inner(key, payload, writer).await } } @@ -371,7 +417,7 @@ mod test { ..Default::default() }; let parser_config = ParserProperties::new(SourceFormat::Avro, &HashMap::new(), &info)?; - AvroParserConfig::new(parser_config).await + AvroParserConfig::new(parser_config.encoding_config).await } async fn new_avro_parser_from_local(file_name: &str) -> error::Result { @@ -391,12 +437,15 @@ mod test { writer.append(record.clone()).unwrap(); let flush = writer.flush().unwrap(); assert!(flush > 0); - let input_data = writer.into_inner().unwrap(); + let input_data = Some(writer.into_inner().unwrap()); let columns = build_rw_columns(); let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 1); { let writer = builder.row_writer(); - avro_parser.parse_inner(input_data, writer).await.unwrap(); + avro_parser + .parse_inner(None, input_data, writer) + .await + .unwrap(); } let chunk = builder.finish(); let (op, row) = chunk.rows().next().unwrap(); diff --git a/src/connector/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs index 8bfd10c43687d..a8a47ae228fa5 100644 --- a/src/connector/src/parser/bytes_parser.rs +++ b/src/connector/src/parser/bytes_parser.rs @@ -12,13 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::error::Result; +use risingwave_common::error::ErrorCode::{self}; +use risingwave_common::error::{Result, RwError}; use super::unified::bytes::{BytesAccess, BytesChangeEvent}; -use super::unified::ChangeEvent; -use super::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard}; +use super::unified::{AccessImpl, ChangeEvent}; +use super::{AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard}; +use crate::only_parse_payload; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; +#[derive(Debug)] +pub struct BytesAccessBuilder {} + +impl AccessBuilder for BytesAccessBuilder { + #[allow(clippy::unused_async)] + async fn generate_accessor(&mut self, payload: Vec) -> Result> { + Ok(AccessImpl::Bytes(BytesAccess::new(payload))) + } +} + +impl BytesAccessBuilder { + pub fn new() -> Result { + Ok(Self {}) + } +} + /// Parser for BYTES format #[derive(Debug)] pub struct BytesParser { @@ -72,10 +90,11 @@ impl ByteStreamSourceParser for BytesParser { async fn parse_one<'a>( &'a mut self, - payload: Vec, + _key: Option>, + payload: Option>, writer: SourceStreamChunkRowWriter<'a>, ) -> Result { - self.parse_inner(payload, writer).await + only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/canal/simd_json_parser.rs b/src/connector/src/parser/canal/simd_json_parser.rs index 4397f65e62425..17740e4541714 100644 --- a/src/connector/src/parser/canal/simd_json_parser.rs +++ b/src/connector/src/parser/canal/simd_json_parser.rs @@ -16,6 +16,7 @@ use risingwave_common::error::ErrorCode::{self, ProtocolError}; use risingwave_common::error::{Result, RwError}; use simd_json::{BorrowedValue, Mutable, ValueAccess}; +use crate::only_parse_payload; use crate::parser::canal::operators::*; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; @@ -115,10 +116,11 @@ impl ByteStreamSourceParser for CanalJsonParser { async fn parse_one<'a>( &'a mut self, - payload: Vec, + _key: Option>, + payload: Option>, writer: SourceStreamChunkRowWriter<'a>, ) -> Result { - self.parse_inner(payload, writer).await + only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index 072f29afb57e5..ef3e2cdd7244f 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -17,11 +17,12 @@ use std::str::FromStr; use anyhow::anyhow; use risingwave_common::cast::{str_to_date, str_to_timestamp}; use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; -use risingwave_common::error::{Result, RwError}; +use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::try_match_expand; use risingwave_common::types::{Datum, Decimal, ScalarImpl, Timestamptz}; -use super::{ByteStreamSourceParser, EncodingProperties, ParserProperties}; +use super::{ByteStreamSourceParser, EncodingProperties}; +use crate::only_parse_payload; use crate::parser::{SourceStreamChunkRowWriter, WriteGuard}; use crate::source::{DataType, SourceColumnDesc, SourceContext, SourceContextRef}; @@ -38,9 +39,8 @@ pub struct CsvParserConfig { } impl CsvParserConfig { - pub fn new(parser_properties: ParserProperties) -> Result { - let csv_config = - try_match_expand!(parser_properties.encoding_config, EncodingProperties::Csv)?; + pub fn new(encoding_properties: EncodingProperties) -> Result { + let csv_config = try_match_expand!(encoding_properties, EncodingProperties::Csv)?; Ok(Self { delimiter: csv_config.delimiter, has_header: csv_config.has_header, @@ -171,10 +171,11 @@ impl ByteStreamSourceParser for CsvParser { async fn parse_one<'a>( &'a mut self, - payload: Vec, + _key: Option>, + payload: Option>, writer: SourceStreamChunkRowWriter<'a>, ) -> Result { - self.parse_inner(payload, writer).await + only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 111caf1ac5cf4..f7841f69f71a1 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -15,6 +15,7 @@ use std::fmt::Debug; use std::sync::Arc; +use apache_avro::types::Value; use apache_avro::{from_avro_datum, Schema}; use reqwest::Url; use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; @@ -22,35 +23,77 @@ use risingwave_common::error::{Result, RwError}; use risingwave_common::try_match_expand; use risingwave_pb::plan_common::ColumnDesc; -use crate::common::UpsertMessage; use crate::parser::avro::schema_resolver::ConfluentSchemaResolver; use crate::parser::avro::util::avro_schema_to_column_descs; use crate::parser::schema_registry::{extract_schema_id, Client}; use crate::parser::unified::avro::{ avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions, }; -use crate::parser::unified::debezium::DebeziumChangeEvent; -use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; -use crate::parser::{ - ByteStreamSourceParser, EncodingProperties, ParserProperties, SourceStreamChunkRowWriter, - WriteGuard, -}; -use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; +use crate::parser::unified::AccessImpl; +use crate::parser::{AccessBuilder, EncodingProperties, EncodingType}; const BEFORE: &str = "before"; const AFTER: &str = "after"; const OP: &str = "op"; const PAYLOAD: &str = "payload"; -// TODO: avoid duplicated codes with `AvroParser` #[derive(Debug)] -pub struct DebeziumAvroParser { +pub struct DebeziumAvroAccessBuilder { schema: Schema, schema_resolver: Arc, - rw_columns: Vec, - source_ctx: SourceContextRef, + key_schema: Option>, + value: Option, + encoding_type: EncodingType, +} + +// TODO: reduce encodingtype match +impl AccessBuilder for DebeziumAvroAccessBuilder { + async fn generate_accessor(&mut self, payload: Vec) -> Result> { + let (schema_id, mut raw_payload) = extract_schema_id(&payload)?; + let schema = self.schema_resolver.get(schema_id).await?; + self.value = Some( + from_avro_datum(schema.as_ref(), &mut raw_payload, None) + .map_err(|e| RwError::from(ProtocolError(e.to_string())))?, + ); + self.key_schema = match self.encoding_type { + EncodingType::Key => Some(schema), + EncodingType::Value => None, + }; + Ok(AccessImpl::Avro(AvroAccess::new( + self.value.as_mut().unwrap(), + AvroParseOptions::default().with_schema(match self.encoding_type { + EncodingType::Key => self.key_schema.as_mut().unwrap(), + EncodingType::Value => &self.schema, + }), + ))) + } +} + +impl DebeziumAvroAccessBuilder { + pub fn new(config: DebeziumAvroParserConfig, encoding_type: EncodingType) -> Result { + let DebeziumAvroParserConfig { + outer_schema, + schema_resolver, + .. + } = config; + + let resolver = apache_avro::schema::ResolvedSchema::try_from(&*outer_schema) + .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; + // todo: to_resolved may cause stackoverflow if there's a loop in the schema + let schema = resolver + .to_resolved(&outer_schema) + .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; + Ok(Self { + schema, + schema_resolver, + key_schema: None, + value: None, + encoding_type, + }) + } } +// TODO: avoid duplicated codes with `AvroParser` #[derive(Debug, Clone)] pub struct DebeziumAvroParserConfig { pub key_schema: Arc, @@ -59,9 +102,8 @@ pub struct DebeziumAvroParserConfig { } impl DebeziumAvroParserConfig { - pub async fn new(parser_properties: ParserProperties) -> Result { - let avro_config = - try_match_expand!(parser_properties.encoding_config, EncodingProperties::Avro)?; + pub async fn new(encoding_config: EncodingProperties) -> Result { + let avro_config = try_match_expand!(encoding_config, EncodingProperties::Avro)?; let schema_location = &avro_config.row_schema_location; let client_config = &avro_config.client_config; let kafka_topic = &avro_config.topic; @@ -98,97 +140,6 @@ impl DebeziumAvroParserConfig { } } -impl DebeziumAvroParser { - pub fn new( - rw_columns: Vec, - config: DebeziumAvroParserConfig, - source_ctx: SourceContextRef, - ) -> Result { - let DebeziumAvroParserConfig { - outer_schema, - schema_resolver, - .. - } = config; - let resolver = apache_avro::schema::ResolvedSchema::try_from(&*outer_schema) - .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - // todo: to_resolved may cause stackoverflow if there's a loop in the schema - let schema = resolver - .to_resolved(&outer_schema) - .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - Ok(Self { - schema, - schema_resolver, - rw_columns, - source_ctx, - }) - } - - pub(crate) async fn parse_inner( - &self, - payload: Vec, - mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { - // https://debezium.io/documentation/reference/stable/transformations/event-flattening.html#event-flattening-behavior: - // - // A database DELETE operation causes Debezium to generate two Kafka records: - // - A record that contains "op": "d", the before row data, and some other fields. - // - A tombstone record that has the same key as the deleted row and a value of null. This - // record is a marker for Apache Kafka. It indicates that log compaction can remove - // all records that have this key. - - let UpsertMessage { - primary_key: key, - record: payload, - } = bincode::deserialize(&payload[..]).unwrap(); - - // If message value == null, it must be a tombstone message. Emit DELETE to downstream using - // message key as the DELETE row. Throw an error if message key is empty. - if payload.is_empty() { - let (schema_id, mut raw_payload) = extract_schema_id(&key)?; - let key_schema = self.schema_resolver.get(schema_id).await?; - let key = from_avro_datum(key_schema.as_ref(), &mut raw_payload, None) - .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - - let row_op = DebeziumChangeEvent::with_key(AvroAccess::new( - &key, - AvroParseOptions::default().with_schema(&key_schema), - )); - - apply_row_operation_on_stream_chunk_writer(row_op, &mut writer) - } else { - let (schema_id, mut raw_payload) = extract_schema_id(&payload)?; - let writer_schema = self.schema_resolver.get(schema_id).await?; - let avro_value = from_avro_datum(writer_schema.as_ref(), &mut raw_payload, None) - .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - - let row_op = DebeziumChangeEvent::with_value(AvroAccess::new( - &avro_value, - AvroParseOptions::default().with_schema(&self.schema), - )); - - apply_row_operation_on_stream_chunk_writer(row_op, &mut writer) - } - } -} - -impl ByteStreamSourceParser for DebeziumAvroParser { - fn columns(&self) -> &[SourceColumnDesc] { - &self.rw_columns - } - - fn source_ctx(&self) -> &SourceContext { - &self.source_ctx - } - - async fn parse_one<'a>( - &'a mut self, - payload: Vec, - writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { - self.parse_inner(payload, writer).await - } -} - #[cfg(test)] mod tests { use std::io::Read; @@ -204,8 +155,10 @@ mod tests { use risingwave_pb::catalog::StreamSourceInfo; use super::*; - use crate::parser::{DebeziumAvroParserConfig, SourceStreamChunkBuilder}; - use crate::source::SourceFormat; + use crate::parser::{ + DebeziumAvroParserConfig, DebeziumParser, ParserProperties, SourceStreamChunkBuilder, + }; + use crate::source::{SourceColumnDesc, SourceFormat}; const DEBEZIUM_AVRO_DATA: &[u8] = b"\x00\x00\x00\x00\x06\x00\x02\xd2\x0f\x0a\x53\x61\x6c\x6c\x79\x0c\x54\x68\x6f\x6d\x61\x73\x2a\x73\x61\x6c\x6c\x79\x2e\x74\x68\x6f\x6d\x61\x73\x40\x61\x63\x6d\x65\x2e\x63\x6f\x6d\x16\x32\x2e\x31\x2e\x32\x2e\x46\x69\x6e\x61\x6c\x0a\x6d\x79\x73\x71\x6c\x12\x64\x62\x73\x65\x72\x76\x65\x72\x31\xc0\xb4\xe8\xb7\xc9\x61\x00\x30\x66\x69\x72\x73\x74\x5f\x69\x6e\x5f\x64\x61\x74\x61\x5f\x63\x6f\x6c\x6c\x65\x63\x74\x69\x6f\x6e\x12\x69\x6e\x76\x65\x6e\x74\x6f\x72\x79\x00\x02\x12\x63\x75\x73\x74\x6f\x6d\x65\x72\x73\x00\x00\x20\x6d\x79\x73\x71\x6c\x2d\x62\x69\x6e\x2e\x30\x30\x30\x30\x30\x33\x8c\x06\x00\x00\x00\x02\x72\x02\x92\xc3\xe8\xb7\xc9\x61\x00"; @@ -218,14 +171,17 @@ mod tests { } async fn parse_one( - parser: DebeziumAvroParser, + mut parser: DebeziumParser, columns: Vec, payload: Vec, ) -> Vec<(Op, OwnedRow)> { let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); { let writer = builder.row_writer(); - parser.parse_inner(payload, writer).await.unwrap(); + parser + .parse_inner(None, Some(payload), writer) + .await + .unwrap(); } let chunk = builder.finish(); chunk @@ -347,16 +303,16 @@ mod tests { ..Default::default() }; let parser_config = ParserProperties::new(SourceFormat::DebeziumAvro, &props, &info)?; - let config = DebeziumAvroParserConfig::new(parser_config).await?; + let config = DebeziumAvroParserConfig::new(parser_config.clone().encoding_config).await?; let columns = config .map_to_columns()? .into_iter() .map(CatColumnDesc::from) .map(|c| SourceColumnDesc::from(&c)) .collect_vec(); - let parser = - DebeziumAvroParser::new(columns.clone(), config, Arc::new(Default::default()))?; + DebeziumParser::new(parser_config, columns.clone(), Arc::new(Default::default())) + .await?; let [(op, row)]: [_; 1] = parse_one(parser, columns, DEBEZIUM_AVRO_DATA.to_vec()) .await .try_into() diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs new file mode 100644 index 0000000000000..25b55f38b3f2b --- /dev/null +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -0,0 +1,117 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::error::ErrorCode::ProtocolError; +use risingwave_common::error::{Result, RwError}; + +use super::simd_json_parser::DebeziumJsonAccessBuilder; +use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig}; +use crate::parser::unified::debezium::DebeziumChangeEvent; +use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; +use crate::parser::{ + AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParserProperties, + SourceStreamChunkRowWriter, WriteGuard, +}; +use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; + +#[derive(Debug)] +pub struct DebeziumParser { + key_builder: AccessBuilderImpl, + payload_builder: AccessBuilderImpl, + pub(crate) rw_columns: Vec, + source_ctx: SourceContextRef, +} + +async fn build_accessor_builder( + config: EncodingProperties, + encoding_type: EncodingType, +) -> Result { + match config { + EncodingProperties::Avro(_) => { + let config = DebeziumAvroParserConfig::new(config).await?; + Ok(AccessBuilderImpl::DebeziumAvro( + DebeziumAvroAccessBuilder::new(config, encoding_type)?, + )) + } + EncodingProperties::Json(_) => Ok(AccessBuilderImpl::DebeziumJson( + DebeziumJsonAccessBuilder::new()?, + )), + EncodingProperties::Protobuf(_) => { + Ok(AccessBuilderImpl::new_default(config, encoding_type).await?) + } + _ => Err(RwError::from(ProtocolError( + "unsupported encoding for Debezium".to_string(), + ))), + } +} + +impl DebeziumParser { + pub async fn new( + props: ParserProperties, + rw_columns: Vec, + source_ctx: SourceContextRef, + ) -> Result { + let key_config = props + .key_encoding_config + .unwrap_or(props.encoding_config.clone()); + let key_builder = build_accessor_builder(key_config, EncodingType::Key).await?; + let payload_builder = + build_accessor_builder(props.encoding_config, EncodingType::Value).await?; + Ok(Self { + key_builder, + payload_builder, + rw_columns, + source_ctx, + }) + } + + pub async fn parse_inner( + &mut self, + key: Option>, + payload: Option>, + mut writer: SourceStreamChunkRowWriter<'_>, + ) -> Result { + // tombetone messages are handled implicitly by these accessors + let key_accessor = match key { + None => None, + Some(data) => Some(self.key_builder.generate_accessor(data).await?), + }; + let payload_accessor = match payload { + None => None, + Some(data) => Some(self.payload_builder.generate_accessor(data).await?), + }; + let row_op = DebeziumChangeEvent::new(key_accessor, payload_accessor); + + apply_row_operation_on_stream_chunk_writer(row_op, &mut writer) + } +} + +impl ByteStreamSourceParser for DebeziumParser { + fn columns(&self) -> &[SourceColumnDesc] { + &self.rw_columns + } + + fn source_ctx(&self) -> &SourceContext { + &self.source_ctx + } + + async fn parse_one<'a>( + &'a mut self, + key: Option>, + payload: Option>, + writer: SourceStreamChunkRowWriter<'a>, + ) -> Result { + self.parse_inner(key, payload, writer).await + } +} diff --git a/src/connector/src/parser/debezium/mod.rs b/src/connector/src/parser/debezium/mod.rs index b7af514a72ed6..b71484bc4141b 100644 --- a/src/connector/src/parser/debezium/mod.rs +++ b/src/connector/src/parser/debezium/mod.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use simd_json_parser::*; +// pub use simd_json_parser::*; mod avro_parser; +mod debezium_parser; mod mongo_json_parser; mod operators; -mod simd_json_parser; +pub mod simd_json_parser; pub use avro_parser::*; +pub use debezium_parser::*; pub use mongo_json_parser::DebeziumMongoJsonParser; diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index 77455a155731e..910b5a3e0131d 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -14,11 +14,12 @@ use std::fmt::Debug; -use risingwave_common::error::ErrorCode::ProtocolError; +use risingwave_common::error::ErrorCode::{self, ProtocolError}; use risingwave_common::error::{Result, RwError}; use risingwave_common::types::DataType; use simd_json::{BorrowedValue, Mutable}; +use crate::only_parse_payload; use crate::parser::unified::debezium::{DebeziumChangeEvent, MongoProjeciton}; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; @@ -113,10 +114,11 @@ impl ByteStreamSourceParser for DebeziumMongoJsonParser { async fn parse_one<'a>( &'a mut self, - payload: Vec, + _key: Option>, + payload: Option>, writer: SourceStreamChunkRowWriter<'a>, ) -> Result { - self.parse_inner(payload, writer).await + only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 91fb2c13a111b..a937edbaa140f 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -14,38 +14,31 @@ use std::fmt::Debug; -use risingwave_common::error::ErrorCode::ProtocolError; -use risingwave_common::error::{Result, RwError}; +use risingwave_common::error::{ErrorCode, Result, RwError}; use simd_json::{BorrowedValue, Mutable}; -use crate::parser::unified::debezium::DebeziumChangeEvent; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; -use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; -use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard}; -use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; +use crate::parser::unified::AccessImpl; +use crate::parser::AccessBuilder; #[derive(Debug)] -pub struct DebeziumJsonParser { - pub(crate) rw_columns: Vec, - source_ctx: SourceContextRef, +pub struct DebeziumJsonAccessBuilder { + value: Option>, } -impl DebeziumJsonParser { - pub fn new(rw_columns: Vec, source_ctx: SourceContextRef) -> Result { - Ok(Self { - rw_columns, - source_ctx, - }) +impl DebeziumJsonAccessBuilder { + pub fn new() -> Result { + Ok(Self { value: None }) } +} +impl AccessBuilder for DebeziumJsonAccessBuilder { #[allow(clippy::unused_async)] - pub async fn parse_inner( - &self, - mut payload: Vec, - mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { - let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload) - .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; + async fn generate_accessor(&mut self, payload: Vec) -> Result> { + self.value = Some(payload); + let mut event: BorrowedValue<'_> = + simd_json::to_borrowed_value(self.value.as_mut().unwrap()) + .map_err(|e| RwError::from(ErrorCode::ProtocolError(e.to_string())))?; let payload = if let Some(payload) = event.get_mut("payload") { std::mem::take(payload) @@ -53,29 +46,10 @@ impl DebeziumJsonParser { event }; - let accessor = JsonAccess::new_with_options(payload, &JsonParseOptions::DEBEZIUM); - - let row_op = DebeziumChangeEvent::with_value(accessor); - - apply_row_operation_on_stream_chunk_writer(row_op, &mut writer) - } -} - -impl ByteStreamSourceParser for DebeziumJsonParser { - fn columns(&self) -> &[SourceColumnDesc] { - &self.rw_columns - } - - fn source_ctx(&self) -> &SourceContext { - &self.source_ctx - } - - async fn parse_one<'a>( - &'a mut self, - payload: Vec, - writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { - self.parse_inner(payload, writer).await + Ok(AccessImpl::Json(JsonAccess::new_with_options( + payload, + &JsonParseOptions::DEBEZIUM, + ))) } } @@ -92,9 +66,11 @@ mod tests { }; use serde_json::Value; - use super::*; - use crate::parser::{SourceColumnDesc, SourceStreamChunkBuilder}; - + use crate::parser::{ + DebeziumParser, EncodingProperties, JsonProperties, ParserProperties, ProtocolProperties, + SourceColumnDesc, SourceStreamChunkBuilder, + }; + use crate::source::SourceContextRef; fn assert_json_eq(parse_result: &Option, json_str: &str) { if let Some(ScalarImpl::Jsonb(json_val)) = parse_result { let mut json_string = String::new(); @@ -108,15 +84,32 @@ mod tests { } } + async fn build_parser( + rw_columns: Vec, + source_ctx: SourceContextRef, + ) -> DebeziumParser { + let props = ParserProperties { + key_encoding_config: None, + encoding_config: EncodingProperties::Json(JsonProperties {}), + protocol_config: ProtocolProperties::Debezium, + }; + DebeziumParser::new(props, rw_columns, source_ctx) + .await + .unwrap() + } + async fn parse_one( - parser: DebeziumJsonParser, + mut parser: DebeziumParser, columns: Vec, payload: Vec, ) -> Vec<(Op, OwnedRow)> { let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); { let writer = builder.row_writer(); - parser.parse_inner(payload, writer).await.unwrap(); + parser + .parse_inner(None, Some(payload), writer) + .await + .unwrap(); } let chunk = builder.finish(); chunk @@ -155,7 +148,7 @@ mod tests { let columns = get_test1_columns(); for data in input { - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let parser = build_parser(columns.clone(), Default::default()).await; let [(_op, row)]: [_; 1] = parse_one(parser, columns.clone(), data) .await .try_into() @@ -186,7 +179,7 @@ mod tests { let columns = get_test1_columns(); for data in input { - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let parser = build_parser(columns.clone(), Default::default()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data) .await .try_into() @@ -217,7 +210,7 @@ mod tests { for data in input { let columns = get_test1_columns(); - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let parser = build_parser(columns.clone(), Default::default()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data) .await .try_into() @@ -255,7 +248,7 @@ mod tests { let columns = get_test1_columns(); for data in input { - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let parser = build_parser(columns.clone(), Default::default()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data) .await .try_into() @@ -314,7 +307,7 @@ mod tests { let columns = get_test2_columns(); - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let parser = build_parser(columns.clone(), Default::default()).await; let [(_op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await @@ -349,7 +342,7 @@ mod tests { let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678088861000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":789,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678088861249,"transaction":null}}"#; let columns = get_test2_columns(); - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let parser = build_parser(columns.clone(), Default::default()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() @@ -384,7 +377,7 @@ mod tests { let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\":\"v1_updated\",\"k2\":33}"},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090653000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1643,"row":0,"thread":4,"query":null},"op":"d","ts_ms":1678090653611,"transaction":null}}"#; let columns = get_test2_columns(); - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let parser = build_parser(columns.clone(), Default::default()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() @@ -421,7 +414,7 @@ mod tests { let columns = get_test2_columns(); - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let parser = build_parser(columns.clone(), Default::default()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() @@ -462,13 +455,13 @@ mod tests { SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)), SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)), ]; - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let mut parser = build_parser(columns.clone(), Default::default()).await; let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); // i64 overflow let data0 = br#"{"payload":{"before":null,"after":{"O_KEY":9223372036854775808,"O_BOOL":1,"O_TINY":33,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; if let Err(e) = parser - .parse_inner(data0.to_vec(), builder.row_writer()) + .parse_inner(None, Some(data0.to_vec()), builder.row_writer()) .await { println!("{:?}", e.to_string()); @@ -478,7 +471,7 @@ mod tests { // bool incorrect value let data1 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":2,"O_TINY":33,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; if let Err(e) = parser - .parse_inner(data1.to_vec(), builder.row_writer()) + .parse_inner(None, Some(data1.to_vec()), builder.row_writer()) .await { println!("{:?}", e.to_string()); @@ -488,7 +481,7 @@ mod tests { // i16 overflow let data2 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":32768,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; if let Err(e) = parser - .parse_inner(data2.to_vec(), builder.row_writer()) + .parse_inner(None, Some(data2.to_vec()), builder.row_writer()) .await { println!("{:?}", e.to_string()); @@ -498,7 +491,7 @@ mod tests { // i32 overflow let data3 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":33,"O_INT":2147483648,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; if let Err(e) = parser - .parse_inner(data3.to_vec(), builder.row_writer()) + .parse_inner(None, Some(data3.to_vec()), builder.row_writer()) .await { println!("{:?}", e.to_string()); @@ -508,7 +501,7 @@ mod tests { // float32 overflow let data4 = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":33,"O_INT":444,"O_REAL":3.80282347E38,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; if let Err(e) = parser - .parse_inner(data4.to_vec(), builder.row_writer()) + .parse_inner(None, Some(data4.to_vec()), builder.row_writer()) .await { println!("{:?}", e.to_string()); @@ -527,11 +520,11 @@ mod tests { DataType::Float64, ColumnId::from(0), )]; - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let mut parser = build_parser(columns.clone(), Default::default()).await; let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); let data = br#"{"payload":{"before":null,"after":{"O_DOUBLE":1.797695E308},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678174483000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":563,"row":0,"thread":3,"query":null},"op":"c","ts_ms":1678174483866,"transaction":null}}"#; if let Err(e) = parser - .parse_inner(data.to_vec(), builder.row_writer()) + .parse_inner(None, Some(data.to_vec()), builder.row_writer()) .await { println!("{:?}", e.to_string()); @@ -639,7 +632,7 @@ mod tests { // this test covers an insert event on the table above let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_time_0":40271000000,"o_time_6":40271000010,"o_timez_0":"11:11:11Z","o_timez_6":"11:11:11.00001Z","o_timestamp_0":1321009871000,"o_timestamp_6":1321009871123456,"o_timestampz_0":"2011-11-11T03:11:11Z","o_timestampz_6":"2011-11-11T03:11:11.123456Z","o_interval":"P1Y2M3DT4H5M6.78S","o_date":"1999-09-09"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684733351963,"snapshot":"last","db":"test","sequence":"[null,\"26505352\"]","schema":"public","table":"orders","txId":729,"lsn":26505352,"xmin":null},"op":"r","ts_ms":1684733352110,"transaction":null}}"#; let columns = get_temporal_test_columns(); - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let parser = build_parser(columns.clone(), Default::default()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() @@ -694,7 +687,7 @@ mod tests { // this test covers an insert event on the table above let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_smallint":32767,"o_integer":2147483647,"o_bigint":9223372036854775807,"o_real":9.999,"o_double":9.999999,"o_numeric":123456.789,"o_numeric_6_3":123.456,"o_money":123.12},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684404343201,"snapshot":"last","db":"test","sequence":"[null,\"26519216\"]","schema":"public","table":"orders","txId":729,"lsn":26519216,"xmin":null},"op":"r","ts_ms":1684404343349,"transaction":null}}"#; let columns = get_numeric_test_columns(); - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let parser = build_parser(columns.clone(), Default::default()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() @@ -733,7 +726,7 @@ mod tests { // this test covers an insert event on the table above let data = br#"{"payload":{"before":null,"after":{"o_key":1,"o_boolean":false,"o_bit":true,"o_bytea":"ASNFZ4mrze8=","o_json":"{\"k1\": \"v1\", \"k2\": 11}","o_xml":"","o_uuid":"60f14fe2-f857-404a-b586-3b5375b3259f","o_point":{"x":1.0,"y":2.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAABA","srid":null},"o_enum":"polar","o_char":"h","o_varchar":"ha","o_character":"h","o_character_varying":"hahaha"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684743927178,"snapshot":"last","db":"test","sequence":"[null,\"26524528\"]","schema":"public","table":"orders","txId":730,"lsn":26524528,"xmin":null},"op":"r","ts_ms":1684743927343,"transaction":null}}"#; let columns = get_other_types_test_columns(); - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let parser = build_parser(columns.clone(), Default::default()).await; let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 82f565ef4928f..049839691b0e1 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -15,8 +15,9 @@ use risingwave_common::error::ErrorCode::{self, ProtocolError}; use risingwave_common::error::{Result, RwError}; -use super::ByteStreamSourceParser; -use crate::common::UpsertMessage; +use super::unified::json::JsonParseOptions; +use super::unified::AccessImpl; +use super::{AccessBuilder, ByteStreamSourceParser}; use crate::parser::unified::json::JsonAccess; use crate::parser::unified::upsert::UpsertChangeEvent; use crate::parser::unified::util::{ @@ -26,6 +27,32 @@ use crate::parser::unified::ChangeEventOperation; use crate::parser::{SourceStreamChunkRowWriter, WriteGuard}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; +#[derive(Debug)] +pub struct JsonAccessBuilder { + value: Option>, +} + +impl AccessBuilder for JsonAccessBuilder { + #[allow(clippy::unused_async)] + async fn generate_accessor(&mut self, payload: Vec) -> Result> { + self.value = Some(payload); + let value = simd_json::to_borrowed_value(self.value.as_mut().unwrap()) + .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; + Ok(AccessImpl::Json(JsonAccess::new_with_options( + value, + // Debezium and Canal have their special json access builder and will not + // use this + &JsonParseOptions::DEFAULT, + ))) + } +} + +impl JsonAccessBuilder { + pub fn new() -> Result { + Ok(Self { value: None }) + } +} + /// Parser for JSON format #[derive(Debug)] pub struct JsonParser { @@ -65,15 +92,14 @@ impl JsonParser { #[allow(clippy::unused_async)] pub async fn parse_inner( &self, - mut payload: Vec, + key: Option>, + mut payload: Option>, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result { if self.enable_upsert { - let msg: UpsertMessage<'_> = bincode::deserialize(&payload) - .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; + let mut primary_key = key.unwrap_or(vec![]); + let mut record = payload.unwrap_or(vec![]); - let mut primary_key = msg.primary_key.to_vec(); - let mut record = msg.record.to_vec(); let change_event_op = if record.is_empty() { ChangeEventOperation::Delete } else { @@ -102,7 +128,12 @@ impl JsonParser { change_event_op, ) } else { - let value = simd_json::to_borrowed_value(&mut payload) + if payload.is_none() { + return Err(RwError::from(ErrorCode::InternalError( + "Empty payload with nonempty key for non-upsert".into(), + ))); + } + let value = simd_json::to_borrowed_value(payload.as_mut().unwrap()) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; let values = if let simd_json::BorrowedValue::Array(arr) = value { arr @@ -147,10 +178,11 @@ impl ByteStreamSourceParser for JsonParser { async fn parse_one<'a>( &'a mut self, - payload: Vec, + key: Option>, + payload: Option>, writer: SourceStreamChunkRowWriter<'a>, ) -> Result { - self.parse_inner(payload, writer).await + self.parse_inner(key, payload, writer).await } } @@ -167,7 +199,6 @@ mod tests { use risingwave_common::test_prelude::StreamChunkTestExt; use risingwave_common::types::{DataType, Decimal, ScalarImpl, ToOwnedDatum}; - use crate::common::UpsertMessage; use crate::parser::{JsonParser, SourceColumnDesc, SourceStreamChunkBuilder}; fn get_payload() -> Vec> { @@ -203,7 +234,10 @@ mod tests { for payload in get_payload() { let writer = builder.row_writer(); - parser.parse_inner(payload, writer).await.unwrap(); + parser + .parse_inner(None, Some(payload), writer) + .await + .unwrap(); } let chunk = builder.finish(); @@ -303,7 +337,10 @@ mod tests { { let writer = builder.row_writer(); let payload = br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(); - parser.parse_inner(payload, writer).await.unwrap(); + parser + .parse_inner(None, Some(payload), writer) + .await + .unwrap(); } // Parse an incorrect record. @@ -311,14 +348,20 @@ mod tests { let writer = builder.row_writer(); // `v2` overflowed. let payload = br#"{"v1": 1, "v2": 65536, "v3": "3"}"#.to_vec(); - parser.parse_inner(payload, writer).await.unwrap_err(); + assert!(parser + .parse_inner(None, Some(payload), writer) + .await + .is_err()); } // Parse a correct record. { let writer = builder.row_writer(); let payload = br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(); - parser.parse_inner(payload, writer).await.unwrap(); + parser + .parse_inner(None, Some(payload), writer) + .await + .unwrap(); } let chunk = builder.finish(); @@ -377,7 +420,10 @@ mod tests { let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1); { let writer = builder.row_writer(); - parser.parse_inner(payload, writer).await.unwrap(); + parser + .parse_inner(None, Some(payload), writer) + .await + .unwrap(); } let chunk = builder.finish(); let (op, row) = chunk.rows().next().unwrap(); @@ -412,13 +458,6 @@ mod tests { (r#"{"a":2}"#, r#"{"a":2,"b":2}"#), (r#"{"a":2}"#, r#""#), ] - .map(|(k, v)| { - bincode::serialize(&UpsertMessage { - primary_key: k.as_bytes().into(), - record: v.as_bytes().into(), - }) - .unwrap() - }) .to_vec(); let descs = vec![ SourceColumnDesc::simple("a", DataType::Int32, 0.into()), @@ -428,10 +467,15 @@ mod tests { let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); for item in items { parser - .parse_inner(item, builder.row_writer()) + .parse_inner( + Some(item.0.as_bytes().to_vec()), + Some(item.1.as_bytes().to_vec()), + builder.row_writer(), + ) .await .unwrap(); } + let chunk = builder.finish(); let mut rows = chunk.rows(); diff --git a/src/connector/src/parser/maxwell/maxwell_parser.rs b/src/connector/src/parser/maxwell/maxwell_parser.rs new file mode 100644 index 0000000000000..895900fd80ab1 --- /dev/null +++ b/src/connector/src/parser/maxwell/maxwell_parser.rs @@ -0,0 +1,87 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::error::{ErrorCode, Result, RwError}; + +use crate::only_parse_payload; +use crate::parser::unified::maxwell::MaxwellChangeEvent; +use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; +use crate::parser::{ + AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParserProperties, + SourceStreamChunkRowWriter, WriteGuard, +}; +use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; + +#[derive(Debug)] +pub struct MaxwellParser { + payload_builder: AccessBuilderImpl, + pub(crate) rw_columns: Vec, + source_ctx: SourceContextRef, +} + +impl MaxwellParser { + pub async fn new( + props: ParserProperties, + rw_columns: Vec, + source_ctx: SourceContextRef, + ) -> Result { + match props.encoding_config { + EncodingProperties::Json(_) => { + let payload_builder = + AccessBuilderImpl::new_default(props.encoding_config, EncodingType::Value) + .await?; + Ok(Self { + payload_builder, + rw_columns, + source_ctx, + }) + } + _ => Err(RwError::from(ErrorCode::ProtocolError( + "unsupported encoding for Maxwell".to_string(), + ))), + } + } + + pub async fn parse_inner( + &mut self, + payload: Vec, + mut writer: SourceStreamChunkRowWriter<'_>, + ) -> Result { + let payload_accessor = self.payload_builder.generate_accessor(payload).await?; + let row_op = MaxwellChangeEvent::new(payload_accessor); + + apply_row_operation_on_stream_chunk_writer(row_op, &mut writer) + } +} + +impl ByteStreamSourceParser for MaxwellParser { + fn columns(&self) -> &[SourceColumnDesc] { + &self.rw_columns + } + + fn source_ctx(&self) -> &SourceContext { + &self.source_ctx + } + + async fn parse_one<'a>( + &'a mut self, + _key: Option>, + payload: Option>, + writer: SourceStreamChunkRowWriter<'a>, + ) -> Result { + // restrict the behaviours since there is no corresponding + // key/value test for maxwell yet. + only_parse_payload!(self, payload, writer) + } +} diff --git a/src/connector/src/parser/maxwell/mod.rs b/src/connector/src/parser/maxwell/mod.rs index f21a7dd647ac7..787b1f235f637 100644 --- a/src/connector/src/parser/maxwell/mod.rs +++ b/src/connector/src/parser/maxwell/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod maxwell_parser; mod operators; +pub use maxwell_parser::*; mod simd_json_parser; - -pub use simd_json_parser::*; diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index 963f19a26ff5c..fa71ffb5143fc 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -12,71 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Debug; - -use risingwave_common::error::ErrorCode::ProtocolError; -use risingwave_common::error::{Result, RwError}; -use simd_json::BorrowedValue; - -use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; -use crate::parser::unified::maxwell::MaxwellChangeEvent; -use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; -use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard}; -use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; - -const AFTER: &str = "data"; -const BEFORE: &str = "old"; -const OP: &str = "type"; - -#[derive(Debug)] -pub struct MaxwellParser { - pub(crate) rw_columns: Vec, - source_ctx: SourceContextRef, -} - -impl MaxwellParser { - pub fn new(rw_columns: Vec, source_ctx: SourceContextRef) -> Result { - Ok(Self { - rw_columns, - source_ctx, - }) - } - - #[allow(clippy::unused_async)] - pub async fn parse_inner( - &self, - mut payload: Vec, - mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { - let event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload) - .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - - let accessor = JsonAccess::new_with_options(event, &JsonParseOptions::DEFAULT); - - let row_op = MaxwellChangeEvent::new(accessor); - - apply_row_operation_on_stream_chunk_writer(row_op, &mut writer) - } -} - -impl ByteStreamSourceParser for MaxwellParser { - fn columns(&self) -> &[SourceColumnDesc] { - &self.rw_columns - } - - fn source_ctx(&self) -> &SourceContext { - &self.source_ctx - } - - async fn parse_one<'a>( - &'a mut self, - payload: Vec, - writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { - self.parse_inner(payload, writer).await - } -} - #[cfg(test)] mod tests { use risingwave_common::array::Op; @@ -84,8 +19,11 @@ mod tests { use risingwave_common::row::Row; use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum}; - use super::*; - use crate::parser::{SourceColumnDesc, SourceStreamChunkBuilder}; + use crate::parser::maxwell::MaxwellParser; + use crate::parser::{ + EncodingProperties, JsonProperties, ParserProperties, ProtocolProperties, SourceColumnDesc, + SourceStreamChunkBuilder, + }; #[tokio::test] async fn test_json_parser() { let descs = vec![ @@ -95,7 +33,14 @@ mod tests { SourceColumnDesc::simple("birthday", DataType::Timestamp, 3.into()), ]; - let parser = MaxwellParser::new(descs.clone(), Default::default()).unwrap(); + let props = ParserProperties { + key_encoding_config: None, + encoding_config: EncodingProperties::Json(JsonProperties {}), + protocol_config: ProtocolProperties::Maxwell, + }; + let mut parser = MaxwellParser::new(props, descs.clone(), Default::default()) + .await + .unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); let payloads = vec![ diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index ebae881019209..0c6781a7d62fb 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -32,8 +32,11 @@ use risingwave_common::types::Datum; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::catalog::StreamSourceInfo; -use self::bytes_parser::BytesParser; +use self::avro::AvroAccessBuilder; +use self::bytes_parser::{BytesAccessBuilder, BytesParser}; pub use self::csv_parser::CsvParserConfig; +use self::simd_json_parser::DebeziumJsonAccessBuilder; +use self::unified::AccessImpl; use self::util::get_kafka_topic; use crate::aws_auth::AwsAuthProps; use crate::parser::maxwell::MaxwellParser; @@ -312,7 +315,8 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { /// Parse one record from the given `payload` and write it to the `writer`. fn parse_one<'a>( &'a mut self, - payload: Vec, + key: Option>, + payload: Option>, writer: SourceStreamChunkRowWriter<'a>, ) -> impl Future> + Send + 'a; @@ -330,6 +334,8 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { } } +// TODO: when upsert is disabled, how to filter those empty payload +// Currently, an err is returned for non upsert with empty payload #[try_stream(ok = StreamChunkWithState, error = RwError)] async fn into_chunk_stream(mut parser: P, data_stream: BoxSourceStream) { #[for_await] @@ -340,43 +346,46 @@ async fn into_chunk_stream(mut parser: P, data_stream let mut split_offset_mapping: HashMap = HashMap::new(); for msg in batch { - if let Some(content) = msg.payload { - split_offset_mapping.insert(msg.split_id, msg.offset); + if msg.key.is_none() && msg.payload.is_none() { + continue; + } - let old_op_num = builder.op_num(); + split_offset_mapping.insert(msg.split_id, msg.offset); - if let Err(e) = parser.parse_one(content, builder.row_writer()).await { - tracing::warn!("message parsing failed {}, skipping", e.to_string()); - // This will throw an error for batch - parser.source_ctx().report_user_source_error(e)?; - continue; - } + let old_op_num = builder.op_num(); + + if let Err(e) = parser + .parse_one(msg.key, msg.payload, builder.row_writer()) + .await + { + tracing::warn!("message parsing failed {}, skipping", e.to_string()); + // This will throw an error for batch + parser.source_ctx().report_user_source_error(e)?; + continue; + } - let new_op_num = builder.op_num(); - - // new_op_num - old_op_num is the number of rows added to the builder - for _ in old_op_num..new_op_num { - // TODO: support more kinds of SourceMeta - if let SourceMeta::Kafka(kafka_meta) = &msg.meta { - let f = - |desc: &SourceColumnDesc| -> Option { - if !desc.is_meta { - return None; - } - match desc.name.as_str() { - "_rw_kafka_timestamp" => Some(kafka_meta.timestamp.map(|ts| { - risingwave_common::cast::i64_to_timestamptz(ts) - .unwrap() - .into() - })), - _ => unreachable!( - "kafka will not have this meta column: {}", - desc.name - ), - } - }; - builder.row_writer().fulfill_meta_column(f)?; - } + let new_op_num = builder.op_num(); + + // new_op_num - old_op_num is the number of rows added to the builder + for _ in old_op_num..new_op_num { + // TODO: support more kinds of SourceMeta + if let SourceMeta::Kafka(kafka_meta) = &msg.meta { + let f = |desc: &SourceColumnDesc| -> Option { + if !desc.is_meta { + return None; + } + match desc.name.as_str() { + "_rw_kafka_timestamp" => Some(kafka_meta.timestamp.map(|ts| { + risingwave_common::cast::i64_to_timestamptz(ts) + .unwrap() + .into() + })), + _ => { + unreachable!("kafka will not have this meta column: {}", desc.name) + } + } + }; + builder.row_writer().fulfill_meta_column(f)?; } } } @@ -388,17 +397,77 @@ async fn into_chunk_stream(mut parser: P, data_stream } } +// TODO: +// 1. Debezium parser: tombstone, +// 2. Avro: specific behaviour for debezium +// 3. Maxwell +// 4. Canal +// 5. Overall performance #10840 +// 6. dispatch +// 7. correct parser trait + +pub trait AccessBuilder { + async fn generate_accessor(&mut self, payload: Vec) -> Result>; +} + +#[derive(Debug)] +pub enum EncodingType { + Key, + Value, +} + +#[derive(Debug)] +pub enum AccessBuilderImpl { + Avro(AvroAccessBuilder), + Protobuf(ProtobufAccessBuilder), + Json(JsonAccessBuilder), + Bytes(BytesAccessBuilder), + DebeziumAvro(DebeziumAvroAccessBuilder), + DebeziumJson(DebeziumJsonAccessBuilder), +} + +impl AccessBuilderImpl { + pub async fn new_default(config: EncodingProperties, kv: EncodingType) -> Result { + let accessor = match config { + EncodingProperties::Avro(_) => { + let config = AvroParserConfig::new(config).await?; + AccessBuilderImpl::Avro(AvroAccessBuilder::new(config, kv)?) + } + EncodingProperties::Protobuf(_) => { + let config = ProtobufParserConfig::new(config).await?; + AccessBuilderImpl::Protobuf(ProtobufAccessBuilder::new(config)?) + } + EncodingProperties::Bytes => AccessBuilderImpl::Bytes(BytesAccessBuilder::new()?), + EncodingProperties::Json(_) => AccessBuilderImpl::Json(JsonAccessBuilder::new()?), + EncodingProperties::Csv(_) => unreachable!(), + EncodingProperties::None => unreachable!(), + }; + Ok(accessor) + } + + pub async fn generate_accessor(&mut self, payload: Vec) -> Result> { + let accessor = match self { + Self::Avro(builder) => builder.generate_accessor(payload).await?, + Self::Protobuf(builder) => builder.generate_accessor(payload).await?, + Self::Json(builder) => builder.generate_accessor(payload).await?, + Self::Bytes(builder) => builder.generate_accessor(payload).await?, + Self::DebeziumAvro(builder) => builder.generate_accessor(payload).await?, + Self::DebeziumJson(builder) => builder.generate_accessor(payload).await?, + }; + Ok(accessor) + } +} + #[derive(Debug)] pub enum ByteStreamSourceParserImpl { Csv(CsvParser), Json(JsonParser), Protobuf(ProtobufParser), - DebeziumJson(DebeziumJsonParser), + Debezium(DebeziumParser), DebeziumMongoJson(DebeziumMongoJsonParser), Avro(AvroParser), Maxwell(MaxwellParser), CanalJson(CanalJsonParser), - DebeziumAvro(DebeziumAvroParser), Bytes(BytesParser), } @@ -412,12 +481,11 @@ impl ByteStreamSourceParserImpl { Self::Csv(parser) => parser.into_stream(msg_stream), Self::Json(parser) => parser.into_stream(msg_stream), Self::Protobuf(parser) => parser.into_stream(msg_stream), - Self::DebeziumJson(parser) => parser.into_stream(msg_stream), + Self::Debezium(parser) => parser.into_stream(msg_stream), Self::DebeziumMongoJson(parser) => parser.into_stream(msg_stream), Self::Avro(parser) => parser.into_stream(msg_stream), Self::Maxwell(parser) => parser.into_stream(msg_stream), Self::CanalJson(parser) => parser.into_stream(msg_stream), - Self::DebeziumAvro(parser) => parser.into_stream(msg_stream), Self::Bytes(parser) => parser.into_stream(msg_stream), }; Box::pin(stream) @@ -425,7 +493,7 @@ impl ByteStreamSourceParserImpl { } impl ByteStreamSourceParserImpl { - pub fn create(parser_config: ParserConfig, source_ctx: SourceContextRef) -> Result { + pub async fn create(parser_config: ParserConfig, source_ctx: SourceContextRef) -> Result { let CommonParserConfig { rw_columns } = parser_config.common; match parser_config.specific { SpecificParserConfig::Csv(config) => { @@ -444,24 +512,23 @@ impl ByteStreamSourceParserImpl { SpecificParserConfig::CanalJson => { CanalJsonParser::new(rw_columns, source_ctx).map(Self::CanalJson) } - SpecificParserConfig::DebeziumJson => { - DebeziumJsonParser::new(rw_columns, source_ctx).map(Self::DebeziumJson) - } SpecificParserConfig::DebeziumMongoJson => { DebeziumMongoJsonParser::new(rw_columns, source_ctx).map(Self::DebeziumMongoJson) } - SpecificParserConfig::Maxwell => { - MaxwellParser::new(rw_columns, source_ctx).map(Self::Maxwell) - } - SpecificParserConfig::DebeziumAvro(config) => { - DebeziumAvroParser::new(rw_columns, config, source_ctx).map(Self::DebeziumAvro) - } SpecificParserConfig::Bytes => { BytesParser::new(rw_columns, source_ctx).map(Self::Bytes) } SpecificParserConfig::Native => { unreachable!("Native parser should not be created") } + SpecificParserConfig::Debezium(config) => { + let parser = DebeziumParser::new(config, rw_columns, source_ctx).await?; + Ok(Self::Debezium(parser)) + } + SpecificParserConfig::Maxwell(config) => { + let parser = MaxwellParser::new(config, rw_columns, source_ctx).await?; + Ok(Self::Maxwell(parser)) + } } } } @@ -485,13 +552,12 @@ pub enum SpecificParserConfig { Protobuf(ProtobufParserConfig), Json, UpsertJson, - DebeziumJson, + Debezium(ParserProperties), + Maxwell(ParserProperties), DebeziumMongoJson, - Maxwell, CanalJson, #[default] Native, - DebeziumAvro(DebeziumAvroParserConfig), Bytes, } @@ -513,7 +579,7 @@ impl From<&HashMap> for SchemaRegistryAuth { } } -#[derive(Default)] +#[derive(Debug, Default, Clone)] pub struct AvroProperties { pub use_schema_registry: bool, pub row_schema_location: String, @@ -524,7 +590,7 @@ pub struct AvroProperties { pub enable_upsert: bool, } -#[derive(Default)] +#[derive(Debug, Default, Clone)] pub struct ProtobufProperties { pub message_name: String, pub use_schema_registry: bool, @@ -534,23 +600,27 @@ pub struct ProtobufProperties { pub topic: String, } -#[derive(Default)] +#[derive(Debug, Default, Clone)] pub struct CsvProperties { pub delimiter: u8, pub has_header: bool, } -#[derive(Default)] +#[derive(Debug, Default, Clone)] +pub struct JsonProperties {} + +#[derive(Debug, Default, Clone)] pub enum EncodingProperties { Avro(AvroProperties), Protobuf(ProtobufProperties), Csv(CsvProperties), + Json(JsonProperties), Bytes, #[default] None, } -#[derive(Default)] +#[derive(Debug, Default, Clone)] pub enum ProtocolProperties { Debezium, Maxwell, @@ -559,7 +629,9 @@ pub enum ProtocolProperties { Plain, } +#[derive(Debug, Default, Clone)] pub struct ParserProperties { + pub key_encoding_config: Option, pub encoding_config: EncodingProperties, pub protocol_config: ProtocolProperties, } @@ -579,11 +651,14 @@ impl ParserProperties { props: &HashMap, info: &StreamSourceInfo, ) -> Result { - let encoding_config = match format { - SourceFormat::Csv => EncodingProperties::Csv(CsvProperties { - delimiter: info.csv_delimiter as u8, - has_header: info.csv_has_header, - }), + let (encoding_config, protocol_config) = match format { + SourceFormat::Csv => ( + EncodingProperties::Csv(CsvProperties { + delimiter: info.csv_delimiter as u8, + has_header: info.csv_has_header, + }), + ProtocolProperties::Plain, + ), SourceFormat::Avro | SourceFormat::UpsertAvro => { let mut config = AvroProperties { use_schema_registry: info.use_schema_registry, @@ -602,7 +677,7 @@ impl ParserProperties { props.iter().map(|(k, v)| (k.as_str(), v.as_str())), )); } - EncodingProperties::Avro(config) + (EncodingProperties::Avro(config), ProtocolProperties::Plain) } SourceFormat::Protobuf => { let mut config = ProtobufProperties { @@ -619,18 +694,37 @@ impl ParserProperties { props.iter().map(|(k, v)| (k.as_str(), v.as_str())), )); } - EncodingProperties::Protobuf(config) + ( + EncodingProperties::Protobuf(config), + ProtocolProperties::Plain, + ) } - SourceFormat::DebeziumAvro => EncodingProperties::Avro(AvroProperties { - row_schema_location: info.row_schema_location.clone(), - topic: get_kafka_topic(props).unwrap().clone(), - client_config: SchemaRegistryAuth::from(props), - ..Default::default() - }), - _ => EncodingProperties::None, + SourceFormat::DebeziumAvro => ( + EncodingProperties::Avro(AvroProperties { + row_schema_location: info.row_schema_location.clone(), + topic: get_kafka_topic(props).unwrap().clone(), + client_config: SchemaRegistryAuth::from(props), + ..Default::default() + }), + ProtocolProperties::Debezium, + ), + SourceFormat::DebeziumJson => ( + EncodingProperties::Json(JsonProperties {}), + ProtocolProperties::Debezium, + ), + SourceFormat::DebeziumMongoJson => { + (EncodingProperties::None, ProtocolProperties::Debezium) + } + SourceFormat::Maxwell => ( + EncodingProperties::Json(JsonProperties {}), + ProtocolProperties::Maxwell, + ), + SourceFormat::CanalJson => (EncodingProperties::None, ProtocolProperties::Canal), + _ => (EncodingProperties::None, ProtocolProperties::Plain), }; - let protocol_config = ProtocolProperties::Plain; + // TODO: need to build correct key encoding config Ok(ParserProperties { + key_encoding_config: None, encoding_config, protocol_config, }) @@ -646,25 +740,19 @@ impl SpecificParserConfig { SpecificParserConfig::Protobuf(_) => SourceFormat::Protobuf, SpecificParserConfig::Json => SourceFormat::Json, SpecificParserConfig::UpsertJson => SourceFormat::UpsertJson, - SpecificParserConfig::DebeziumJson => SourceFormat::DebeziumJson, - SpecificParserConfig::Maxwell => SourceFormat::Maxwell, SpecificParserConfig::CanalJson => SourceFormat::CanalJson, SpecificParserConfig::Native => SourceFormat::Native, - SpecificParserConfig::DebeziumAvro(_) => SourceFormat::DebeziumAvro, SpecificParserConfig::DebeziumMongoJson => SourceFormat::DebeziumMongoJson, SpecificParserConfig::Bytes => SourceFormat::Bytes, + SpecificParserConfig::Maxwell(_) => SourceFormat::Maxwell, + SpecificParserConfig::Debezium(config) => match config.encoding_config { + EncodingProperties::Avro(_) => SourceFormat::DebeziumAvro, + EncodingProperties::Json(_) => SourceFormat::DebeziumJson, + _ => unreachable!(), + }, } } - pub fn is_upsert(&self) -> bool { - matches!( - self, - SpecificParserConfig::UpsertJson - | SpecificParserConfig::UpsertAvro(_) - | SpecificParserConfig::DebeziumAvro(_) - ) - } - pub async fn new( format: SourceFormat, info: &StreamSourceInfo, @@ -673,28 +761,26 @@ impl SpecificParserConfig { let parser_properties = ParserProperties::new(format, props, info)?; let conf = match format { SourceFormat::Csv => { - SpecificParserConfig::Csv(CsvParserConfig::new(parser_properties)?) - } - SourceFormat::Avro => { - SpecificParserConfig::Avro(AvroParserConfig::new(parser_properties).await?) - } - SourceFormat::UpsertAvro => { - SpecificParserConfig::UpsertAvro(AvroParserConfig::new(parser_properties).await?) - } - SourceFormat::Protobuf => { - SpecificParserConfig::Protobuf(ProtobufParserConfig::new(parser_properties).await?) + SpecificParserConfig::Csv(CsvParserConfig::new(parser_properties.encoding_config)?) } + SourceFormat::Avro => SpecificParserConfig::Avro( + AvroParserConfig::new(parser_properties.encoding_config).await?, + ), + SourceFormat::UpsertAvro => SpecificParserConfig::UpsertAvro( + AvroParserConfig::new(parser_properties.encoding_config).await?, + ), + SourceFormat::Protobuf => SpecificParserConfig::Protobuf( + ProtobufParserConfig::new(parser_properties.encoding_config).await?, + ), SourceFormat::Json => SpecificParserConfig::Json, SourceFormat::UpsertJson => SpecificParserConfig::UpsertJson, - SourceFormat::DebeziumJson => SpecificParserConfig::DebeziumJson, SourceFormat::DebeziumMongoJson => SpecificParserConfig::DebeziumMongoJson, - SourceFormat::Maxwell => SpecificParserConfig::Maxwell, SourceFormat::CanalJson => SpecificParserConfig::CanalJson, SourceFormat::Native => SpecificParserConfig::Native, SourceFormat::Bytes => SpecificParserConfig::Bytes, - SourceFormat::DebeziumAvro => SpecificParserConfig::DebeziumAvro( - DebeziumAvroParserConfig::new(parser_properties).await?, - ), + SourceFormat::Maxwell => SpecificParserConfig::Maxwell(parser_properties), + SourceFormat::DebeziumJson => SpecificParserConfig::Debezium(parser_properties), + SourceFormat::DebeziumAvro => SpecificParserConfig::Debezium(parser_properties), _ => { return Err(RwError::from(ProtocolError( "invalid source format".to_string(), diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 1d2672929bb05..baedea9f92170 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -20,7 +20,7 @@ use prost_reflect::{ ReflectMessage, Value, }; use risingwave_common::array::{ListValue, StructValue}; -use risingwave_common::error::ErrorCode::{InternalError, NotImplemented, ProtocolError}; +use risingwave_common::error::ErrorCode::{self, InternalError, NotImplemented, ProtocolError}; use risingwave_common::error::{Result, RwError}; use risingwave_common::try_match_expand; use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl, F32, F64}; @@ -29,13 +29,51 @@ use url::Url; use super::schema_resolver::*; use crate::aws_utils::load_file_descriptor_from_s3; +use crate::only_parse_payload; use crate::parser::schema_registry::{extract_schema_id, Client}; +use crate::parser::unified::protobuf::ProtobufAccess; +use crate::parser::unified::AccessImpl; use crate::parser::{ - ByteStreamSourceParser, EncodingProperties, ParserProperties, SourceStreamChunkRowWriter, + AccessBuilder, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter, WriteGuard, }; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; +#[derive(Debug)] +pub struct ProtobufAccessBuilder { + confluent_wire_type: bool, + message_descriptor: MessageDescriptor, +} + +impl AccessBuilder for ProtobufAccessBuilder { + #[allow(clippy::unused_async)] + async fn generate_accessor(&mut self, payload: Vec) -> Result> { + let payload = if self.confluent_wire_type { + resolve_pb_header(&payload)? + } else { + &payload + }; + + let message = DynamicMessage::decode(self.message_descriptor.clone(), payload) + .map_err(|e| ProtocolError(format!("parse message failed: {}", e)))?; + + Ok(AccessImpl::Protobuf(ProtobufAccess::new(message))) + } +} + +impl ProtobufAccessBuilder { + pub fn new(config: ProtobufParserConfig) -> Result { + let ProtobufParserConfig { + confluent_wire_type, + message_descriptor, + } = config; + Ok(Self { + confluent_wire_type, + message_descriptor, + }) + } +} + #[derive(Debug, Clone)] pub struct ProtobufParser { message_descriptor: MessageDescriptor, @@ -51,11 +89,8 @@ pub struct ProtobufParserConfig { } impl ProtobufParserConfig { - pub async fn new(parser_properties: ParserProperties) -> Result { - let protobuf_config = try_match_expand!( - parser_properties.encoding_config, - EncodingProperties::Protobuf - )?; + pub async fn new(encoding_properties: EncodingProperties) -> Result { + let protobuf_config = try_match_expand!(encoding_properties, EncodingProperties::Protobuf)?; let location = &protobuf_config.row_schema_location; let message_name = &protobuf_config.message_name; let url = Url::parse(location) @@ -258,14 +293,15 @@ impl ByteStreamSourceParser for ProtobufParser { async fn parse_one<'a>( &'a mut self, - payload: Vec, + _key: Option>, + payload: Option>, writer: SourceStreamChunkRowWriter<'a>, ) -> Result { - self.parse_inner(payload, writer).await + only_parse_payload!(self, payload, writer) } } -fn from_protobuf_value(field_desc: &FieldDescriptor, value: &Value) -> Result { +pub fn from_protobuf_value(field_desc: &FieldDescriptor, value: &Value) -> Result { let v = match value { Value::Bool(v) => ScalarImpl::Bool(*v), Value::I32(i) => ScalarImpl::Int32(*i), @@ -396,6 +432,7 @@ mod test { use risingwave_pb::data::data_type::PbTypeName; use super::*; + use crate::parser::ParserProperties; use crate::source::SourceFormat; fn schema_dir() -> String { @@ -426,7 +463,7 @@ mod test { ..Default::default() }; let parser_config = ParserProperties::new(SourceFormat::Protobuf, &HashMap::new(), &info)?; - let conf = ProtobufParserConfig::new(parser_config).await?; + let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?; let parser = ProtobufParser::new(Vec::default(), conf, Default::default())?; let value = DynamicMessage::decode(parser.message_descriptor, PRE_GEN_PROTO_DATA).unwrap(); @@ -470,7 +507,7 @@ mod test { ..Default::default() }; let parser_config = ParserProperties::new(SourceFormat::Protobuf, &HashMap::new(), &info)?; - let conf = ProtobufParserConfig::new(parser_config).await?; + let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?; let columns = conf.map_to_columns().unwrap(); assert_eq!(columns[0].name, "id".to_string()); @@ -518,7 +555,9 @@ mod test { }; let parser_config = ParserProperties::new(SourceFormat::Protobuf, &HashMap::new(), &info).unwrap(); - let conf = ProtobufParserConfig::new(parser_config).await.unwrap(); + let conf = ProtobufParserConfig::new(parser_config.encoding_config) + .await + .unwrap(); let columns = conf.map_to_columns(); // expect error message: // "Err(Protocol error: circular reference detected: diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index e0ec9c8559a7d..9f32c70e08e24 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -43,7 +43,7 @@ where } /// Panic: one of the `key_accessor` or `value_accessor` must be provided. - fn new(key_accessor: Option, value_accessor: Option) -> Self { + pub fn new(key_accessor: Option, value_accessor: Option) -> Self { assert!(key_accessor.is_some() || value_accessor.is_some()); Self { value_accessor, diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs index e0a2692cfdd6f..aff6c420da019 100644 --- a/src/connector/src/parser/unified/mod.rs +++ b/src/connector/src/parser/unified/mod.rs @@ -17,11 +17,17 @@ use risingwave_common::types::{DataType, Datum}; use thiserror::Error; +use self::avro::AvroAccess; +use self::bytes::BytesAccess; +use self::json::JsonAccess; +use self::protobuf::ProtobufAccess; + pub mod avro; pub mod bytes; pub mod debezium; pub mod json; pub mod maxwell; +pub mod protobuf; pub mod upsert; pub mod util; @@ -32,6 +38,24 @@ pub trait Access { fn access(&self, path: &[&str], type_expected: Option<&DataType>) -> AccessResult; } +pub enum AccessImpl<'a, 'b> { + Avro(AvroAccess<'a, 'b>), + Bytes(BytesAccess), + Protobuf(ProtobufAccess), + Json(JsonAccess<'a, 'b>), +} + +impl Access for AccessImpl<'_, '_> { + fn access(&self, path: &[&str], type_expected: Option<&DataType>) -> AccessResult { + match self { + Self::Avro(accessor) => accessor.access(path, type_expected), + Self::Bytes(accessor) => accessor.access(path, type_expected), + Self::Protobuf(accessor) => accessor.access(path, type_expected), + Self::Json(accessor) => accessor.access(path, type_expected), + } + } +} + #[derive(Debug, Clone, Copy)] pub enum ChangeEventOperation { Upsert, // Insert or Update diff --git a/src/connector/src/parser/unified/protobuf.rs b/src/connector/src/parser/unified/protobuf.rs new file mode 100644 index 0000000000000..0afe0cde52d34 --- /dev/null +++ b/src/connector/src/parser/unified/protobuf.rs @@ -0,0 +1,51 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::anyhow; +use prost_reflect::{DynamicMessage, ReflectMessage}; +use risingwave_common::error::ErrorCode::ProtocolError; +use risingwave_common::error::RwError; +use risingwave_common::types::DataType; + +use super::{Access, AccessResult}; +use crate::parser::from_protobuf_value; +use crate::parser::unified::AccessError; + +pub struct ProtobufAccess { + message: DynamicMessage, +} + +impl ProtobufAccess { + pub fn new(message: DynamicMessage) -> Self { + Self { message } + } +} + +impl Access for ProtobufAccess { + fn access(&self, path: &[&str], _type_expected: Option<&DataType>) -> AccessResult { + debug_assert_eq!(1, path.len()); + let field_desc = self + .message + .descriptor() + .get_field_by_name(path[0]) + .ok_or_else(|| { + let err_msg = format!("protobuf schema don't have field {}", path[0]); + tracing::error!(err_msg); + RwError::from(ProtocolError(err_msg)) + }) + .map_err(|e| AccessError::Other(anyhow!(e)))?; + let value = self.message.get_field(&field_desc); + from_protobuf_value(&field_desc, &value).map_err(|e| AccessError::Other(anyhow!(e))) + } +} diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index cc9af7880f0c9..d3c26c5e74661 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -88,3 +88,17 @@ pub(super) fn at_least_one_ok(mut results: Vec>) -> Result { + if $payload.is_some() { + $self.parse_inner($payload.unwrap(), $writer).await + } else { + Err(RwError::from(ErrorCode::InternalError( + "Empty payload with nonempty key".into(), + ))) + } + }; +} diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index bcd20db1a8310..2b8dcd24cee4c 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -184,8 +184,9 @@ pub struct SourceInfo { pub fragment_id: u32, } -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] pub enum SourceFormat { + #[default] Invalid, Json, UpsertJson, @@ -433,6 +434,7 @@ pub type SplitId = Arc; /// The third-party message structs will eventually be transformed into this struct. #[derive(Debug, Clone)] pub struct SourceMessage { + pub key: Option>, pub payload: Option>, pub offset: String, pub split_id: SplitId, diff --git a/src/connector/src/source/cdc/source/message.rs b/src/connector/src/source/cdc/source/message.rs index a14653bdff8ff..71046df258d63 100644 --- a/src/connector/src/source/cdc/source/message.rs +++ b/src/connector/src/source/cdc/source/message.rs @@ -20,6 +20,7 @@ use crate::source::SourceMeta; impl From for SourceMessage { fn from(message: CdcMessage) -> Self { SourceMessage { + key: None, payload: Some(message.payload.as_bytes().to_vec()), offset: message.offset, split_id: message.partition.into(), diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index 544205911711a..5f011bef7d267 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -127,6 +127,7 @@ impl DatagenEventGenerator { } }; msgs.push(SourceMessage { + key: None, payload: Some(payload), offset: self.offset.to_string(), split_id: self.split_id.clone(), diff --git a/src/connector/src/source/filesystem/nd_streaming.rs b/src/connector/src/source/filesystem/nd_streaming.rs index ffb59ab6f52ad..e1f5e88eda01e 100644 --- a/src/connector/src/source/filesystem/nd_streaming.rs +++ b/src/connector/src/source/filesystem/nd_streaming.rs @@ -74,6 +74,7 @@ pub async fn split_stream(data_stream: BoxSourceStream) { let len = line.as_bytes().len(); msgs.push(SourceMessage { + key: None, payload: Some(line.into()), offset: (offset + len).to_string(), split_id: split_id.clone(), @@ -124,6 +125,7 @@ mod tests { Ok(e.chunks(N3) .enumerate() .map(|(j, buf)| SourceMessage { + key: None, payload: Some(buf.to_owned()), offset: (i * N2 + j * N3).to_string(), split_id: split_id.clone(), diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 29cb7f3d62170..5e51a49fd67bc 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -84,6 +84,7 @@ impl S3FileReader { let bytes = read?; let len = bytes.len(); let msg = SourceMessage { + key: None, payload: Some(bytes.as_ref().to_vec()), offset: offset.to_string(), split_id: split.id(), @@ -201,7 +202,7 @@ impl S3FileReader { ); let parser = - ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx)?; + ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?; let msg_stream = if matches!( parser, ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_) diff --git a/src/connector/src/source/google_pubsub/source/message.rs b/src/connector/src/source/google_pubsub/source/message.rs index 93c1e1ed0daf1..490dc86234348 100644 --- a/src/connector/src/source/google_pubsub/source/message.rs +++ b/src/connector/src/source/google_pubsub/source/message.rs @@ -42,6 +42,7 @@ impl From for SourceMessage { .unwrap_or_default(); Self { + key: None, payload: { let payload = message.message.data; match payload.len() { diff --git a/src/connector/src/source/kafka/source/message.rs b/src/connector/src/source/kafka/source/message.rs index 53644dc59fbea..bc3d4e38cbd2b 100644 --- a/src/connector/src/source/kafka/source/message.rs +++ b/src/connector/src/source/kafka/source/message.rs @@ -15,7 +15,6 @@ use rdkafka::message::BorrowedMessage; use rdkafka::Message; -use crate::common::UpsertMessage; use crate::source::base::SourceMessage; use crate::source::SourceMeta; @@ -26,26 +25,10 @@ pub struct KafkaMeta { } impl SourceMessage { - pub fn from_kafka_message_upsert(message: &BorrowedMessage<'_>) -> Self { - let encoded = bincode::serialize(&UpsertMessage { - primary_key: message.key().unwrap_or_default().into(), - record: message.payload().unwrap_or_default().into(), - }) - .unwrap(); - SourceMessage { - // TODO(TaoWu): Possible performance improvement: avoid memory copying here. - payload: Some(encoded), - offset: message.offset().to_string(), - split_id: message.partition().to_string().into(), - meta: SourceMeta::Kafka(KafkaMeta { - timestamp: message.timestamp().to_millis(), - }), - } - } - pub fn from_kafka_message(message: &BorrowedMessage<'_>) -> Self { SourceMessage { // TODO(TaoWu): Possible performance improvement: avoid memory copying here. + key: message.key().map(|p| p.to_vec()), payload: message.payload().map(|p| p.to_vec()), offset: message.offset().to_string(), split_id: message.partition().to_string().into(), diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 4aa42e0f37f58..f75f5167e84a5 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -41,7 +41,6 @@ pub struct KafkaSplitReader { stop_offset: Option, bytes_per_second: usize, max_num_messages: usize, - enable_upsert: bool, split_id: SplitId, parser_config: ParserConfig, @@ -138,7 +137,6 @@ impl SplitReader for KafkaSplitReader { bytes_per_second, max_num_messages, split_id, - enable_upsert: parser_config.specific.is_upsert(), parser_config, source_ctx, }) @@ -200,11 +198,7 @@ impl KafkaSplitReader { Some(payload) => payload.len(), }; num_messages += 1; - if self.enable_upsert { - res.push(SourceMessage::from_kafka_message_upsert(&msg)); - } else { - res.push(SourceMessage::from_kafka_message(&msg)); - } + res.push(SourceMessage::from_kafka_message(&msg)); if let Some(stop_offset) = self.stop_offset { if cur_offset == stop_offset - 1 { diff --git a/src/connector/src/source/kinesis/source/message.rs b/src/connector/src/source/kinesis/source/message.rs index 7c74ccb2b22e7..56de6963b0d71 100644 --- a/src/connector/src/source/kinesis/source/message.rs +++ b/src/connector/src/source/kinesis/source/message.rs @@ -27,6 +27,7 @@ pub struct KinesisMessage { impl From for SourceMessage { fn from(msg: KinesisMessage) -> Self { SourceMessage { + key: None, payload: Some(msg.payload), offset: msg.sequence_number.clone(), split_id: msg.shard_id, diff --git a/src/connector/src/source/nexmark/source/message.rs b/src/connector/src/source/nexmark/source/message.rs index d6af93862731d..605391765e668 100644 --- a/src/connector/src/source/nexmark/source/message.rs +++ b/src/connector/src/source/nexmark/source/message.rs @@ -31,6 +31,7 @@ pub struct NexmarkMessage { impl From for SourceMessage { fn from(msg: NexmarkMessage) -> Self { SourceMessage { + key: None, payload: Some(msg.payload), offset: msg.sequence_number.clone(), split_id: msg.split_id, diff --git a/src/connector/src/source/pulsar/source/message.rs b/src/connector/src/source/pulsar/source/message.rs index 10ffb75f0c40e..26f3e81773967 100644 --- a/src/connector/src/source/pulsar/source/message.rs +++ b/src/connector/src/source/pulsar/source/message.rs @@ -21,6 +21,7 @@ impl From>> for SourceMessage { let message_id = msg.message_id.id; SourceMessage { + key: None, payload: Some(msg.payload.data), offset: format!( "{}:{}:{}:{}", diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 0d5b6797c0b86..bf69b147024f9 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -71,7 +71,7 @@ async fn extract_avro_table_schema( ..Default::default() }; let parser_config = ParserProperties::new(SourceFormat::Avro, with_properties, &info)?; - let conf = AvroParserConfig::new(parser_config).await?; + let conf = AvroParserConfig::new(parser_config.encoding_config).await?; let vec_column_desc = conf.map_to_columns()?; Ok(vec_column_desc .into_iter() @@ -93,7 +93,7 @@ async fn extract_upsert_avro_table_schema( ..Default::default() }; let parser_config = ParserProperties::new(SourceFormat::UpsertAvro, with_properties, &info)?; - let conf = AvroParserConfig::new(parser_config).await?; + let conf = AvroParserConfig::new(parser_config.encoding_config).await?; let vec_column_desc = conf.map_to_columns()?; let vec_pk_desc = conf.extract_pks().map_err(|e| RwError::from(ErrorCode::InternalError( @@ -135,7 +135,7 @@ async fn extract_debezium_avro_table_pk_columns( ..Default::default() }; let parser_config = ParserProperties::new(SourceFormat::DebeziumAvro, with_properties, &info)?; - let conf = DebeziumAvroParserConfig::new(parser_config).await?; + let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?; Ok(conf.extract_pks()?.drain(..).map(|c| c.name).collect()) } @@ -149,7 +149,7 @@ async fn extract_debezium_avro_table_schema( ..Default::default() }; let parser_config = ParserProperties::new(SourceFormat::DebeziumAvro, with_properties, &info)?; - let conf = DebeziumAvroParserConfig::new(parser_config).await?; + let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?; let vec_column_desc = conf.map_to_columns()?; let column_catalog = vec_column_desc .into_iter() @@ -173,7 +173,7 @@ async fn extract_protobuf_table_schema( ..Default::default() }; let parser_config = ParserProperties::new(SourceFormat::Protobuf, &with_properties, &info)?; - let conf = ProtobufParserConfig::new(parser_config).await?; + let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?; let column_descs = conf.map_to_columns()?; diff --git a/src/source/benches/json_parser.rs b/src/source/benches/json_parser.rs index e26bdbc686086..2b9cbce07fdc9 100644 --- a/src/source/benches/json_parser.rs +++ b/src/source/benches/json_parser.rs @@ -81,7 +81,10 @@ fn bench_json_parser(c: &mut Criterion) { SourceStreamChunkBuilder::with_capacity(descs.clone(), NUM_RECORDS); for record in records { let writer = builder.row_writer(); - parser.parse_inner(record, writer).await.unwrap(); + parser + .parse_inner(None, Some(record), writer) + .await + .unwrap(); } }, BatchSize::SmallInput,