From d0a377d2e4266bb1eb2239ab47f80b9fc2f757b6 Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Tue, 6 Sep 2022 14:38:54 +0800 Subject: [PATCH] feat: create source with avro row format (#5059) * stash Signed-off-by: tabVersion * create with avro format Signed-off-by: tabVersion * fix Signed-off-by: tabVersion * fix Signed-off-by: tabVersion * trigger Signed-off-by: tabVersion * resolve comment Signed-off-by: tabVersion * fix Signed-off-by: tabVersion * add e2e Signed-off-by: tabVersion * another try Signed-off-by: tabVersion * another try Signed-off-by: tabVersion Signed-off-by: tabVersion Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- ci/scripts/build.sh | 4 + ci/scripts/e2e-source-test.sh | 3 + e2e_test/source/basic_test.slt | 13 ++++ scripts/source/test_data/avro_bin.1 | Bin 0 -> 614 bytes src/frontend/src/handler/create_source.rs | 38 ++++++++- src/source/src/parser/avro_parser.rs | 90 ++++++++++++++++++++++ src/source/src/parser/mod.rs | 2 +- src/sqlparser/src/ast/statement.rs | 48 +++++++++++- 8 files changed, 192 insertions(+), 6 deletions(-) create mode 100644 scripts/source/test_data/avro_bin.1 diff --git a/ci/scripts/build.sh b/ci/scripts/build.sh index 9aa4ccf166a6a..54623a5ca87d0 100755 --- a/ci/scripts/build.sh +++ b/ci/scripts/build.sh @@ -56,3 +56,7 @@ buildkite-agent artifact upload risingwave-"$profile" buildkite-agent artifact upload risedev-dev-"$profile" buildkite-agent artifact upload risingwave_regress_test-"$profile" buildkite-agent artifact upload ./sqlsmith-"$profile" + +echo "--- upload misc" +cp src/source/src/test_data/simple-schema.avsc ./avro-simple-schema.avsc +buildkite-agent artifact upload ./avro-simple-schema.avsc diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 1d5a2798cbc4d..d093cb0b781d8 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -28,6 +28,9 @@ buildkite-agent artifact download risedev-dev-"$profile" target/debug/ mv target/debug/risingwave-"$profile" target/debug/risingwave mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev +echo "--- Download mise" +buildkite-agent artifact download avro-simple-schema.avsc ./ + echo "--- Adjust permission" chmod +x ./target/debug/risingwave chmod +x ./target/debug/risedev-dev diff --git a/e2e_test/source/basic_test.slt b/e2e_test/source/basic_test.slt index 81e99b5bfb122..19e90433a9f3e 100644 --- a/e2e_test/source/basic_test.slt +++ b/e2e_test/source/basic_test.slt @@ -106,6 +106,16 @@ create materialized source s8 ( statement ok select * from s8 +statement ok +create materialized source s9 with ( + connector = 'kafka', topic = 'avro_bin', + properties.bootstrap.server = '127.0.0.1:29092', + scan.startup.mode = 'earliest' +) row format avro message 'test_student' row schema location 'file:///risingwave/avro-simple-schema.avsc' + +statement ok +select * from s9 + statement ok flush; @@ -242,3 +252,6 @@ drop source s6 statement ok drop source s8 + +statement ok +drop source s9 diff --git a/scripts/source/test_data/avro_bin.1 b/scripts/source/test_data/avro_bin.1 new file mode 100644 index 0000000000000000000000000000000000000000..97b87a4884deba45200225197e85ba2e2260c01e GIT binary patch literal 614 zcmZ`%!Ait16l_KCA|6CMdCGZR#A8p3cvsMih?LjnwHtX&YLgbqQqT|ZGdz0n6ZS0r zh2kf8H0`c!DjLWkVP-NjFK5xMzqt=pp{6>XV+LPa3ufvZS-=X6Rl(VWNyyr>L2W{9 zN{-Ul+e9F7&4TO24?wiwHCc2RjtP`xSa8%#dYHz$5@n2`izg*h%l?zlz27TqQ;0E^ zF<0H1#=%EI3R<^=@YMAJRT}CsgR3&4R^cCt&>?{~=!A`wA_VDj5+IF^E!$ZEZDk&HxP<{auiY_GO1b1Y{>W(NpaewYa&p>1hH#7_FA z+3S8ntT@KoT0)-J-t)WvcJ=u2w0!w&mJb)N8>a^}yZ3kP7{UsuM9=e1?6&KC=6l|; J+L`|5d;@mH(aQh; literal 0 HcmV?d00001 diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 2b35dc2e81481..5941d8383b835 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::DEFAULT_SCHEMA_NAME; @@ -22,8 +24,10 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::plan_common::{ColumnCatalog as ProstColumnCatalog, RowFormatType}; use risingwave_pb::user::grant_privilege::{Action, Object}; -use risingwave_source::ProtobufParser; -use risingwave_sqlparser::ast::{CreateSourceStatement, ObjectName, ProtobufSchema, SourceSchema}; +use risingwave_source::{AvroParser, ProtobufParser}; +use risingwave_sqlparser::ast::{ + AvroSchema, CreateSourceStatement, ObjectName, ProtobufSchema, SourceSchema, +}; use super::create_table::{ bind_sql_columns, bind_sql_table_constraints, gen_materialized_source_plan, @@ -72,6 +76,22 @@ pub(crate) fn make_prost_source( }) } +/// Map an Avro schema to a relational schema. +async fn extract_avro_table_schema( + schema: &AvroSchema, + with_properties: HashMap, +) -> Result> { + let parser = AvroParser::new(schema.row_schema_location.0.as_str(), with_properties).await?; + let vec_column_desc = parser.map_to_columns()?; + Ok(vec_column_desc + .into_iter() + .map(|c| ProstColumnCatalog { + column_desc: Some(c), + is_hidden: false, + }) + .collect_vec()) +} + /// Map a protobuf schema to a relational schema. fn extract_protobuf_table_schema(schema: &ProtobufSchema) -> Result> { let parser = ProtobufParser::new(&schema.row_schema_location.0, &schema.message_name.0)?; @@ -112,6 +132,20 @@ pub async fn handle_create_source( pk_column_ids: pk_column_ids.into_iter().map(Into::into).collect(), } } + SourceSchema::Avro(avro_schema) => { + assert_eq!(columns.len(), 1); + assert_eq!(pk_column_ids, vec![0.into()]); + assert_eq!(row_id_index, Some(0)); + columns.extend(extract_avro_table_schema(avro_schema, with_properties.clone()).await?); + StreamSourceInfo { + properties: with_properties.clone(), + row_format: RowFormatType::Avro as i32, + row_schema_location: avro_schema.row_schema_location.0.clone(), + row_id_index: row_id_index.map(|index| ProstColumnIndex { index: index as _ }), + columns, + pk_column_ids: pk_column_ids.into_iter().map(Into::into).collect(), + } + } SourceSchema::Json => StreamSourceInfo { properties: with_properties.clone(), row_format: RowFormatType::Json as i32, diff --git a/src/source/src/parser/avro_parser.rs b/src/source/src/parser/avro_parser.rs index 82eee08e09a2b..2c0501a88d62c 100644 --- a/src/source/src/parser/avro_parser.rs +++ b/src/source/src/parser/avro_parser.rs @@ -20,6 +20,7 @@ use std::path::Path; use apache_avro::types::Value; use apache_avro::{Reader, Schema}; use chrono::{Datelike, NaiveDate}; +use itertools::Itertools; use num_traits::FromPrimitive; use risingwave_common::array::Op; use risingwave_common::error::ErrorCode::{InternalError, InvalidConfigValue, ProtocolError}; @@ -28,6 +29,7 @@ use risingwave_common::types::{ DataType, Datum, Decimal, NaiveDateTimeWrapper, NaiveDateWrapper, ScalarImpl, }; use risingwave_connector::aws_utils::{default_conn_config, s3_client, AwsConfigV2}; +use risingwave_pb::plan_common::ColumnDesc; use url::Url; use crate::{Event, SourceColumnDesc, SourceParser}; @@ -77,6 +79,86 @@ impl AvroParser { Err(arvo_schema.err().unwrap()) } } + + pub fn map_to_columns(&self) -> Result> { + if let Schema::Record { fields, .. } = &self.schema { + let mut index = 0; + Ok(fields + .iter() + .map(|field| { + Self::avro_field_to_column_desc(&field.name, &field.schema, &mut index) + }) + .collect::>>()?) + } else { + Err(RwError::from(InternalError( + "schema invalid, record required".into(), + ))) + } + } + + fn avro_field_to_column_desc( + name: &str, + schema: &Schema, + index: &mut i32, + ) -> Result { + let data_type = Self::avro_type_mapping(schema)?; + if let Schema::Record { + name: schema_name, + fields, + .. + } = schema + { + let vec_column = fields + .iter() + .map(|f| Self::avro_field_to_column_desc(&f.name, &f.schema, index)) + .collect::>>()?; + *index += 1; + Ok(ColumnDesc { + column_type: Some(data_type.to_protobuf()), + column_id: *index, + name: name.to_owned(), + field_descs: vec_column, + type_name: schema_name.to_string(), + }) + } else { + *index += 1; + Ok(ColumnDesc { + column_type: Some(data_type.to_protobuf()), + column_id: *index, + name: name.to_owned(), + ..Default::default() + }) + } + } + + fn avro_type_mapping(schema: &Schema) -> Result { + let data_type = match schema { + Schema::String => DataType::Varchar, + Schema::Int => DataType::Int32, + Schema::Long => DataType::Int64, + Schema::Boolean => DataType::Boolean, + Schema::Float => DataType::Float32, + Schema::Double => DataType::Float64, + Schema::Date => DataType::Date, + Schema::TimestampMillis => DataType::Timestamp, + Schema::Record { fields, .. } => { + let struct_fields = fields + .iter() + .map(|f| Self::avro_type_mapping(&f.schema)) + .collect::>>()?; + let struct_names = fields.iter().map(|f| f.name.clone()).collect_vec(); + DataType::new_struct(struct_fields, struct_names) + } + _ => { + return Err(RwError::from(InternalError(format!( + "unsupported type in Avro: {:?}", + schema + )))); + } + }; + + Ok(data_type) + } } macro_rules! from_avro_datetime { @@ -579,6 +661,14 @@ mod test { record } + #[tokio::test] + async fn test_map_to_columns() { + let avro_parser_rs = new_avro_parser_from_local("simple-schema.avsc") + .await + .unwrap(); + println!("{:?}", avro_parser_rs.map_to_columns().unwrap()); + } + #[tokio::test] async fn test_new_avro_parser() { let avro_parser_rs = new_avro_parser_from_local("simple-schema.avsc").await; diff --git a/src/source/src/parser/mod.rs b/src/source/src/parser/mod.rs index 15ebd198affb0..9c4cdfc8c76df 100644 --- a/src/source/src/parser/mod.rs +++ b/src/source/src/parser/mod.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; +pub use avro_parser::*; pub use debezium::*; pub use json_parser::*; pub use protobuf_parser::*; @@ -24,7 +25,6 @@ use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; use risingwave_common::types::Datum; -use crate::parser::avro_parser::AvroParser; use crate::{SourceColumnDesc, SourceFormat}; mod avro_parser; diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 1b8d7eb02ab70..8bae2892f7625 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -85,8 +85,9 @@ pub struct CreateSourceStatement { pub enum SourceSchema { Protobuf(ProtobufSchema), // Keyword::PROTOBUF ProtobufSchema - Json, // Keyword::JSON - DebeziumJson, // Keyword::DEBEZIUM_JSON + Json, // Keyword::JSON + DebeziumJson, // Keyword::DEBEZIUM_JSON + Avro(AvroSchema), // Keyword::AVRO } impl ParseTo for SourceSchema { @@ -98,9 +99,12 @@ impl ParseTo for SourceSchema { SourceSchema::Protobuf(protobuf_schema) } else if p.parse_keywords(&[Keyword::DEBEZIUM_JSON]) { SourceSchema::DebeziumJson + } else if p.parse_keywords(&[Keyword::AVRO]) { + impl_parse_to!(avro_schema: AvroSchema, p); + SourceSchema::Avro(avro_schema) } else { return Err(ParserError::ParserError( - "expected JSON | PROTOBUF after ROW FORMAT".to_string(), + "expected JSON | PROTOBUF | DEBEZIUMJSON | AVRO after ROW FORMAT".to_string(), )); }; Ok(schema) @@ -113,6 +117,7 @@ impl fmt::Display for SourceSchema { SourceSchema::Protobuf(protobuf_schema) => write!(f, "PROTOBUF {}", protobuf_schema), SourceSchema::Json => write!(f, "JSON"), SourceSchema::DebeziumJson => write!(f, "DEBEZIUM JSON"), + SourceSchema::Avro(avro_schema) => write!(f, "AVRO {}", avro_schema), } } } @@ -154,6 +159,43 @@ impl fmt::Display for ProtobufSchema { } } +// sql_grammar!(AvroSchema { +// [Keyword::MESSAGE], +// message_name: AstString, +// [Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], +// row_schema_location: AstString, +// }); +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct AvroSchema { + pub message_name: AstString, + pub row_schema_location: AstString, +} + +impl ParseTo for AvroSchema { + fn parse_to(p: &mut Parser) -> Result { + impl_parse_to!([Keyword::MESSAGE], p); + impl_parse_to!(message_name: AstString, p); + impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p); + impl_parse_to!(row_schema_location: AstString, p); + Ok(Self { + message_name, + row_schema_location, + }) + } +} + +impl fmt::Display for AvroSchema { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut v: Vec = vec![]; + impl_fmt_display!([Keyword::MESSAGE], v); + impl_fmt_display!(message_name, v, self); + impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v); + impl_fmt_display!(row_schema_location, v, self); + v.iter().join(" ").fmt(f) + } +} + impl ParseTo for CreateSourceStatement { fn parse_to(p: &mut Parser) -> Result { impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);