Skip to content

Commit

Permalink
refactor(source): separate key and payload accessor building (#10969)
Browse files Browse the repository at this point in the history
  • Loading branch information
wugouzi authored Jul 18, 2023
1 parent 5543898 commit ddc7c27
Show file tree
Hide file tree
Showing 35 changed files with 901 additions and 475 deletions.
5 changes: 4 additions & 1 deletion src/connector/benches/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ async fn parse(parser: JsonParser, column_desc: Vec<SourceColumnDesc>, input: Ve
SourceStreamChunkBuilder::with_capacity(column_desc.clone(), input_inner.len());
for payload in input_inner {
let row_writer = builder.row_writer();
parser.parse_inner(payload, row_writer).await.unwrap();
parser
.parse_inner(None, Some(payload), row_writer)
.await
.unwrap();
}
builder.finish();
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/aws_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use aws_types::region::Region;
use aws_types::SdkConfig;

/// A flatten cofig map for aws auth.
#[derive(Debug, Clone)]
pub struct AwsAuthProps {
pub region: Option<String>,
pub endpoint: Option<String>,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ macro_rules! impl_common_split_reader_logic {
})
.boxed();
let parser =
$crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx)?;
$crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?;
#[for_await]
for msg_batch in parser.into_stream(data_stream) {
yield msg_batch?;
Expand Down
111 changes: 80 additions & 31 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::Arc;

Expand All @@ -26,17 +25,79 @@ use url::Url;

use super::schema_resolver::*;
use super::util::avro_schema_to_column_descs;
use crate::common::UpsertMessage;
use crate::parser::schema_registry::{extract_schema_id, Client};
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
use crate::parser::unified::upsert::UpsertChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::unified::AccessImpl;
use crate::parser::{
ByteStreamSourceParser, EncodingProperties, ParserProperties, SourceStreamChunkRowWriter,
WriteGuard,
AccessBuilder, ByteStreamSourceParser, EncodingProperties, EncodingType,
SourceStreamChunkRowWriter, WriteGuard,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

// Default avro access builder
#[derive(Debug)]
pub struct AvroAccessBuilder {
schema: Arc<Schema>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
value: Option<Value>,
}

impl AccessBuilder for AvroAccessBuilder {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> Result<AccessImpl<'_, '_>> {
self.value = self.parse_avro_value(&payload, Some(&*self.schema)).await?;
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_ref().unwrap(),
AvroParseOptions::default().with_schema(&self.schema),
)))
}
}

impl AvroAccessBuilder {
pub fn new(config: AvroParserConfig, _encoding_type: EncodingType) -> Result<Self> {
let AvroParserConfig {
schema,
schema_resolver,
..
} = config;
Ok(Self {
schema,
schema_resolver,
value: None,
})
}

async fn parse_avro_value(
&self,
payload: &[u8],
reader_schema: Option<&Schema>,
) -> anyhow::Result<Option<Value>> {
// parse payload to avro value
// if use confluent schema, get writer schema from confluent schema registry
if let Some(resolver) = &self.schema_resolver {
let (schema_id, mut raw_payload) = extract_schema_id(payload)?;
let writer_schema = resolver.get(schema_id).await?;
Ok(Some(from_avro_datum(
writer_schema.as_ref(),
&mut raw_payload,
reader_schema,
)?))
} else if let Some(schema) = reader_schema {
let mut reader = Reader::with_schema(schema, payload)?;
match reader.next() {
Some(Ok(v)) => Ok(Some(v)),
Some(Err(e)) => Err(e)?,
None => {
anyhow::bail!("avro parse unexpected eof")
}
}
} else {
unreachable!("both schema_resolver and reader_schema not exist");
}
}
}

#[derive(Debug)]
pub struct AvroParser {
schema: Arc<Schema>,
Expand All @@ -56,9 +117,8 @@ pub struct AvroParserConfig {
}

impl AvroParserConfig {
pub async fn new(parser_properties: ParserProperties) -> Result<Self> {
let avro_config =
try_match_expand!(parser_properties.encoding_config, EncodingProperties::Avro)?;
pub async fn new(encoding_properties: EncodingProperties) -> Result<Self> {
let avro_config = try_match_expand!(encoding_properties, EncodingProperties::Avro)?;
let schema_location = &avro_config.row_schema_location;
let enable_upsert = avro_config.enable_upsert;
let url = Url::parse(schema_location).map_err(|e| {
Expand Down Expand Up @@ -192,32 +252,17 @@ impl AvroParser {

pub(crate) async fn parse_inner(
&self,
payload: Vec<u8>,
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
let (raw_key, raw_value) = if self.is_enable_upsert() {
let msg: UpsertMessage<'_> = bincode::deserialize(&payload).map_err(|e| {
RwError::from(ProtocolError(format!(
"extract payload err {:?}, you may need to check the 'upsert' parameter",
e
)))
})?;
if !msg.record.is_empty() {
(Some(msg.primary_key), Some(msg.record))
} else {
(Some(msg.primary_key), None)
}
} else {
(None, Some(Cow::from(&payload)))
};

let avro_value = if let Some(payload) = raw_value {
let avro_value = if let Some(payload) = payload {
self.parse_avro_value(payload.as_ref(), Some(&*self.schema))
.await?
} else {
None
};
let avro_key = if let Some(payload) = raw_key {
let avro_key = if let Some(payload) = key {
self.parse_avro_value(payload.as_ref(), self.key_schema.as_deref())
.await?
} else {
Expand Down Expand Up @@ -262,10 +307,11 @@ impl ByteStreamSourceParser for AvroParser {

async fn parse_one<'a>(
&'a mut self,
payload: Vec<u8>,
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
self.parse_inner(payload, writer).await
self.parse_inner(key, payload, writer).await
}
}

Expand Down Expand Up @@ -371,7 +417,7 @@ mod test {
..Default::default()
};
let parser_config = ParserProperties::new(SourceFormat::Avro, &HashMap::new(), &info)?;
AvroParserConfig::new(parser_config).await
AvroParserConfig::new(parser_config.encoding_config).await
}

async fn new_avro_parser_from_local(file_name: &str) -> error::Result<AvroParser> {
Expand All @@ -391,12 +437,15 @@ mod test {
writer.append(record.clone()).unwrap();
let flush = writer.flush().unwrap();
assert!(flush > 0);
let input_data = writer.into_inner().unwrap();
let input_data = Some(writer.into_inner().unwrap());
let columns = build_rw_columns();
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 1);
{
let writer = builder.row_writer();
avro_parser.parse_inner(input_data, writer).await.unwrap();
avro_parser
.parse_inner(None, input_data, writer)
.await
.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
Expand Down
29 changes: 24 additions & 5 deletions src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,31 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::error::Result;
use risingwave_common::error::ErrorCode::{self};
use risingwave_common::error::{Result, RwError};

use super::unified::bytes::{BytesAccess, BytesChangeEvent};
use super::unified::ChangeEvent;
use super::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard};
use super::unified::{AccessImpl, ChangeEvent};
use super::{AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard};
use crate::only_parse_payload;
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
pub struct BytesAccessBuilder {}

impl AccessBuilder for BytesAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> Result<AccessImpl<'_, '_>> {
Ok(AccessImpl::Bytes(BytesAccess::new(payload)))
}
}

impl BytesAccessBuilder {
pub fn new() -> Result<Self> {
Ok(Self {})
}
}

/// Parser for BYTES format
#[derive(Debug)]
pub struct BytesParser {
Expand Down Expand Up @@ -72,10 +90,11 @@ impl ByteStreamSourceParser for BytesParser {

async fn parse_one<'a>(
&'a mut self,
payload: Vec<u8>,
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
self.parse_inner(payload, writer).await
only_parse_payload!(self, payload, writer)
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use risingwave_common::error::ErrorCode::{self, ProtocolError};
use risingwave_common::error::{Result, RwError};
use simd_json::{BorrowedValue, Mutable, ValueAccess};

use crate::only_parse_payload;
use crate::parser::canal::operators::*;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
Expand Down Expand Up @@ -115,10 +116,11 @@ impl ByteStreamSourceParser for CanalJsonParser {

async fn parse_one<'a>(
&'a mut self,
payload: Vec<u8>,
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
self.parse_inner(payload, writer).await
only_parse_payload!(self, payload, writer)
}
}

Expand Down
15 changes: 8 additions & 7 deletions src/connector/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ use std::str::FromStr;
use anyhow::anyhow;
use risingwave_common::cast::{str_to_date, str_to_timestamp};
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::try_match_expand;
use risingwave_common::types::{Datum, Decimal, ScalarImpl, Timestamptz};

use super::{ByteStreamSourceParser, EncodingProperties, ParserProperties};
use super::{ByteStreamSourceParser, EncodingProperties};
use crate::only_parse_payload;
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
use crate::source::{DataType, SourceColumnDesc, SourceContext, SourceContextRef};

Expand All @@ -38,9 +39,8 @@ pub struct CsvParserConfig {
}

impl CsvParserConfig {
pub fn new(parser_properties: ParserProperties) -> Result<Self> {
let csv_config =
try_match_expand!(parser_properties.encoding_config, EncodingProperties::Csv)?;
pub fn new(encoding_properties: EncodingProperties) -> Result<Self> {
let csv_config = try_match_expand!(encoding_properties, EncodingProperties::Csv)?;
Ok(Self {
delimiter: csv_config.delimiter,
has_header: csv_config.has_header,
Expand Down Expand Up @@ -171,10 +171,11 @@ impl ByteStreamSourceParser for CsvParser {

async fn parse_one<'a>(
&'a mut self,
payload: Vec<u8>,
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
self.parse_inner(payload, writer).await
only_parse_payload!(self, payload, writer)
}
}

Expand Down
Loading

0 comments on commit ddc7c27

Please sign in to comment.