Skip to content

Commit

Permalink
feat(source): Default key for UpsertAvro (#11125)
Browse files Browse the repository at this point in the history
  • Loading branch information
wugouzi authored Jul 25, 2023
1 parent a4a5dc4 commit cafbd0c
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 103 deletions.
21 changes: 18 additions & 3 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Start with nosim to avoid running in deterministic test


# key schema should be a subset of value schema
statement error
CREATE TABLE upsert_avro_json ()
# If we cannot extract key schema, use message key as varchar primary key
statement ok
CREATE TABLE upsert_avro_json_default_key ()
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand Down Expand Up @@ -74,6 +74,18 @@ flush;
# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 5s

query II
SELECT
*
FROM
upsert_avro_json_default_key
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z

query II
SELECT
Expand Down Expand Up @@ -129,6 +141,9 @@ select count(*) from debezium_compact;
----
2

statement ok
DROP TABLE upsert_avro_json_default_key;

statement ok
DROP TABLE upsert_avro_json;

Expand Down
21 changes: 18 additions & 3 deletions e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Start with nosim to avoid running in deterministic test


# key schema should be a subset of value schema
statement error
CREATE TABLE upsert_avro_json ()
# If we cannot extract key schema, use message key as varchar primary key
statement ok
CREATE TABLE upsert_avro_json_default_key ()
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand Down Expand Up @@ -80,6 +80,18 @@ flush;
# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 5s

query II
SELECT
*
FROM
upsert_avro_json_default_key
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z

query II
SELECT
Expand Down Expand Up @@ -135,6 +147,9 @@ select count(*) from debezium_compact;
----
2

statement ok
DROP TABLE upsert_avro_json_default_key;

statement ok
DROP TABLE upsert_avro_json;

Expand Down
4 changes: 4 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub const SYSTEM_SCHEMAS: [&str; 3] = [
RW_CATALOG_SCHEMA_NAME,
];

// When there is no primary key specified while creating source, will use the
// the message key as primary key in `BYTEA` type with this name.
pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";

pub fn is_system_schema(schema_name: &str) -> bool {
SYSTEM_SCHEMAS.iter().any(|s| *s == schema_name)
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ mod test {
writer.append(record.clone()).unwrap();
let flush = writer.flush().unwrap();
assert!(flush > 0);
let input_data = Some(writer.into_inner().unwrap());
let input_data = writer.into_inner().unwrap();
let columns = build_rw_columns();
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 1);
{
Expand Down
27 changes: 18 additions & 9 deletions src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,33 @@
// limitations under the License.

use risingwave_common::error::Result;
use risingwave_common::try_match_expand;

use super::unified::bytes::BytesAccess;
use super::unified::AccessImpl;
use super::AccessBuilder;
use super::{AccessBuilder, EncodingProperties};

#[derive(Debug)]
pub struct BytesAccessBuilder {}
pub struct BytesAccessBuilder {
column_name: Option<String>,
}

impl AccessBuilder for BytesAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> Result<AccessImpl<'_, '_>> {
Ok(AccessImpl::Bytes(BytesAccess::new(payload)))
Ok(AccessImpl::Bytes(BytesAccess::new(
&self.column_name,
payload,
)))
}
}

impl BytesAccessBuilder {
pub fn new() -> Result<Self> {
Ok(Self {})
pub fn new(encoding_properties: EncodingProperties) -> Result<Self> {
let config = try_match_expand!(encoding_properties, EncodingProperties::Bytes)?;
Ok(Self {
column_name: config.column_name,
})
}
}

Expand All @@ -42,8 +51,8 @@ mod tests {

use crate::parser::plain_parser::PlainParser;
use crate::parser::{
EncodingProperties, ParserProperties, ProtocolProperties, SourceColumnDesc,
SourceStreamChunkBuilder,
BytesProperties, EncodingProperties, ParserProperties, ProtocolProperties,
SourceColumnDesc, SourceStreamChunkBuilder,
};

fn get_payload() -> Vec<Vec<u8>> {
Expand All @@ -54,7 +63,7 @@ mod tests {
let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
let props = ParserProperties {
key_encoding_config: None,
encoding_config: EncodingProperties::Bytes,
encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }),
protocol_config: ProtocolProperties::Plain,
};
let mut parser = PlainParser::new(props, descs.clone(), Default::default())
Expand All @@ -65,7 +74,7 @@ mod tests {

for payload in get_payload() {
let writer = builder.row_writer();
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}

let chunk = builder.finish();
Expand Down
19 changes: 15 additions & 4 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ mod schema_registry;
mod unified;
mod upsert_parser;
mod util;

/// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`].
pub struct SourceStreamChunkBuilder {
descs: Vec<SourceColumnDesc>,
Expand Down Expand Up @@ -432,7 +433,9 @@ impl AccessBuilderImpl {
let config = ProtobufParserConfig::new(config).await?;
AccessBuilderImpl::Protobuf(ProtobufAccessBuilder::new(config)?)
}
EncodingProperties::Bytes => AccessBuilderImpl::Bytes(BytesAccessBuilder::new()?),
EncodingProperties::Bytes(_) => {
AccessBuilderImpl::Bytes(BytesAccessBuilder::new(config)?)
}
EncodingProperties::Json(_) => AccessBuilderImpl::Json(JsonAccessBuilder::new()?),
EncodingProperties::Csv(_) => unreachable!(),
EncodingProperties::None => unreachable!(),
Expand Down Expand Up @@ -597,13 +600,18 @@ pub struct CsvProperties {
#[derive(Debug, Default, Clone)]
pub struct JsonProperties {}

#[derive(Debug, Default, Clone)]
pub struct BytesProperties {
pub column_name: Option<String>,
}

#[derive(Debug, Default, Clone)]
pub enum EncodingProperties {
Avro(AvroProperties),
Protobuf(ProtobufProperties),
Csv(CsvProperties),
Json(JsonProperties),
Bytes,
Bytes(BytesProperties),
#[default]
None,
}
Expand Down Expand Up @@ -729,7 +737,10 @@ impl ParserProperties {
EncodingProperties::Json(JsonProperties {}),
ProtocolProperties::Upsert,
),
SourceFormat::Bytes => (EncodingProperties::Bytes, ProtocolProperties::Plain),
SourceFormat::Bytes => (
EncodingProperties::Bytes(BytesProperties { column_name: None }),
ProtocolProperties::Plain,
),
SourceFormat::Native | SourceFormat::Invalid => {
(EncodingProperties::None, ProtocolProperties::Plain)
}
Expand Down Expand Up @@ -762,7 +773,7 @@ impl SpecificParserConfig {
EncodingProperties::Avro(_) => SourceFormat::Avro,
EncodingProperties::Protobuf(_) => SourceFormat::Protobuf,
EncodingProperties::Csv(_) => SourceFormat::Csv,
EncodingProperties::Bytes => SourceFormat::Bytes,
EncodingProperties::Bytes(_) => SourceFormat::Bytes,
_ => unreachable!(),
},
SpecificParserConfig::Maxwell(_) => SourceFormat::Maxwell,
Expand Down
14 changes: 6 additions & 8 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
// limitations under the License.

use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::error::{ErrorCode, Result, RwError};

use super::unified::util::apply_row_accessor_on_stream_chunk_writer;
use super::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParserProperties,
SourceStreamChunkRowWriter, WriteGuard,
};
use crate::only_parse_payload;
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand All @@ -38,7 +39,7 @@ impl PlainParser {
let payload_builder = match props.encoding_config {
EncodingProperties::Protobuf(_)
| EncodingProperties::Avro(_)
| EncodingProperties::Bytes => {
| EncodingProperties::Bytes(_) => {
AccessBuilderImpl::new_default(props.encoding_config, EncodingType::Value).await?
}
_ => {
Expand All @@ -56,13 +57,10 @@ impl PlainParser {

pub async fn parse_inner(
&mut self,
payload: Option<Vec<u8>>,
payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
let accessor = self
.payload_builder
.generate_accessor(payload.unwrap())
.await?;
let accessor = self.payload_builder.generate_accessor(payload).await?;

apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer)
}
Expand All @@ -83,6 +81,6 @@ impl ByteStreamSourceParser for PlainParser {
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
self.parse_inner(payload, writer).await
only_parse_payload!(self, payload, writer)
}
}
55 changes: 18 additions & 37 deletions src/connector/src/parser/unified/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,34 @@

use risingwave_common::types::{DataType, ScalarImpl};

use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
use super::{Access, AccessError, AccessResult};

// where do we put data

pub struct BytesAccess {
pub struct BytesAccess<'a> {
column_name: &'a Option<String>,
bytes: Vec<u8>,
}

impl BytesAccess {
pub fn new(bytes: Vec<u8>) -> Self {
Self { bytes }
impl<'a> BytesAccess<'a> {
pub fn new(column_name: &'a Option<String>, bytes: Vec<u8>) -> Self {
Self { column_name, bytes }
}
}

pub struct BytesChangeEvent {
value_accessor: BytesAccess,
key_accessor: Option<BytesAccess>,
}

impl BytesChangeEvent {
pub fn with_value(value_accessor: BytesAccess) -> Self {
Self::new(None, value_accessor)
}

pub fn new(key_accessor: Option<BytesAccess>, value_accessor: BytesAccess) -> Self {
Self {
value_accessor,
key_accessor,
}
}
}

impl ChangeEvent for BytesChangeEvent {
fn op(&self) -> std::result::Result<ChangeEventOperation, super::AccessError> {
Ok(ChangeEventOperation::Upsert)
}

fn access_field(&self, name: &str, type_expected: &DataType) -> super::AccessResult {
self.value_accessor.access(&[name], Some(type_expected))
}
}

impl Access for BytesAccess {
impl<'a> Access for BytesAccess<'a> {
/// path is empty currently, `type_expected` should be `Bytea`
fn access(&self, _path: &[&str], type_expected: Option<&DataType>) -> AccessResult {
if let Some(DataType::Bytea) = type_expected {
return Ok(Some(ScalarImpl::Bytea(Box::from(self.bytes.as_slice()))));
fn access(&self, path: &[&str], type_expected: Option<&DataType>) -> AccessResult {
if let DataType::Bytea = type_expected.unwrap() {
if self.column_name.is_none()
|| (path.len() == 1 && self.column_name.as_ref().unwrap() == path[0])
{
return Ok(Some(ScalarImpl::Bytea(Box::from(self.bytes.as_slice()))));
}
return Err(AccessError::Undefined {
name: path[0].to_string(),
path: self.column_name.as_ref().unwrap().to_string(),
});
}
Err(AccessError::TypeError {
expected: "Bytea".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/unified/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub trait Access {

pub enum AccessImpl<'a, 'b> {
Avro(AvroAccess<'a, 'b>),
Bytes(BytesAccess),
Bytes(BytesAccess<'a>),
Protobuf(ProtobufAccess),
Json(JsonAccess<'a, 'b>),
}
Expand Down
Loading

0 comments on commit cafbd0c

Please sign in to comment.