Skip to content

Commit

Permalink
feat: create source with avro row format (#5059)
Browse files Browse the repository at this point in the history
* stash

Signed-off-by: tabVersion <[email protected]>

* create with avro format

Signed-off-by: tabVersion <[email protected]>

* fix

Signed-off-by: tabVersion <[email protected]>

* fix

Signed-off-by: tabVersion <[email protected]>

* trigger

Signed-off-by: tabVersion <[email protected]>

* resolve comment

Signed-off-by: tabVersion <[email protected]>

* fix

Signed-off-by: tabVersion <[email protected]>

* add e2e

Signed-off-by: tabVersion <[email protected]>

* another try

Signed-off-by: tabVersion <[email protected]>

* another try

Signed-off-by: tabVersion <[email protected]>

Signed-off-by: tabVersion <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tabVersion and mergify[bot] authored Sep 6, 2022
1 parent ede7432 commit d0a377d
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 6 deletions.
4 changes: 4 additions & 0 deletions ci/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ buildkite-agent artifact upload risingwave-"$profile"
buildkite-agent artifact upload risedev-dev-"$profile"
buildkite-agent artifact upload risingwave_regress_test-"$profile"
buildkite-agent artifact upload ./sqlsmith-"$profile"

echo "--- upload misc"
cp src/source/src/test_data/simple-schema.avsc ./avro-simple-schema.avsc
buildkite-agent artifact upload ./avro-simple-schema.avsc
3 changes: 3 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ buildkite-agent artifact download risedev-dev-"$profile" target/debug/
mv target/debug/risingwave-"$profile" target/debug/risingwave
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev

echo "--- Download mise"
buildkite-agent artifact download avro-simple-schema.avsc ./

echo "--- Adjust permission"
chmod +x ./target/debug/risingwave
chmod +x ./target/debug/risedev-dev
Expand Down
13 changes: 13 additions & 0 deletions e2e_test/source/basic_test.slt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ create materialized source s8 (
statement ok
select * from s8

statement ok
create materialized source s9 with (
connector = 'kafka', topic = 'avro_bin',
properties.bootstrap.server = '127.0.0.1:29092',
scan.startup.mode = 'earliest'
) row format avro message 'test_student' row schema location 'file:///risingwave/avro-simple-schema.avsc'

statement ok
select * from s9

statement ok
flush;

Expand Down Expand Up @@ -242,3 +252,6 @@ drop source s6

statement ok
drop source s8

statement ok
drop source s9
Binary file added scripts/source/test_data/avro_bin.1
Binary file not shown.
38 changes: 36 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::DEFAULT_SCHEMA_NAME;
Expand All @@ -22,8 +24,10 @@ use risingwave_pb::catalog::{
};
use risingwave_pb::plan_common::{ColumnCatalog as ProstColumnCatalog, RowFormatType};
use risingwave_pb::user::grant_privilege::{Action, Object};
use risingwave_source::ProtobufParser;
use risingwave_sqlparser::ast::{CreateSourceStatement, ObjectName, ProtobufSchema, SourceSchema};
use risingwave_source::{AvroParser, ProtobufParser};
use risingwave_sqlparser::ast::{
AvroSchema, CreateSourceStatement, ObjectName, ProtobufSchema, SourceSchema,
};

use super::create_table::{
bind_sql_columns, bind_sql_table_constraints, gen_materialized_source_plan,
Expand Down Expand Up @@ -72,6 +76,22 @@ pub(crate) fn make_prost_source(
})
}

/// Map an Avro schema to a relational schema.
async fn extract_avro_table_schema(
schema: &AvroSchema,
with_properties: HashMap<String, String>,
) -> Result<Vec<ProstColumnCatalog>> {
let parser = AvroParser::new(schema.row_schema_location.0.as_str(), with_properties).await?;
let vec_column_desc = parser.map_to_columns()?;
Ok(vec_column_desc
.into_iter()
.map(|c| ProstColumnCatalog {
column_desc: Some(c),
is_hidden: false,
})
.collect_vec())
}

/// Map a protobuf schema to a relational schema.
fn extract_protobuf_table_schema(schema: &ProtobufSchema) -> Result<Vec<ProstColumnCatalog>> {
let parser = ProtobufParser::new(&schema.row_schema_location.0, &schema.message_name.0)?;
Expand Down Expand Up @@ -112,6 +132,20 @@ pub async fn handle_create_source(
pk_column_ids: pk_column_ids.into_iter().map(Into::into).collect(),
}
}
SourceSchema::Avro(avro_schema) => {
assert_eq!(columns.len(), 1);
assert_eq!(pk_column_ids, vec![0.into()]);
assert_eq!(row_id_index, Some(0));
columns.extend(extract_avro_table_schema(avro_schema, with_properties.clone()).await?);
StreamSourceInfo {
properties: with_properties.clone(),
row_format: RowFormatType::Avro as i32,
row_schema_location: avro_schema.row_schema_location.0.clone(),
row_id_index: row_id_index.map(|index| ProstColumnIndex { index: index as _ }),
columns,
pk_column_ids: pk_column_ids.into_iter().map(Into::into).collect(),
}
}
SourceSchema::Json => StreamSourceInfo {
properties: with_properties.clone(),
row_format: RowFormatType::Json as i32,
Expand Down
90 changes: 90 additions & 0 deletions src/source/src/parser/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::path::Path;
use apache_avro::types::Value;
use apache_avro::{Reader, Schema};
use chrono::{Datelike, NaiveDate};
use itertools::Itertools;
use num_traits::FromPrimitive;
use risingwave_common::array::Op;
use risingwave_common::error::ErrorCode::{InternalError, InvalidConfigValue, ProtocolError};
Expand All @@ -28,6 +29,7 @@ use risingwave_common::types::{
DataType, Datum, Decimal, NaiveDateTimeWrapper, NaiveDateWrapper, ScalarImpl,
};
use risingwave_connector::aws_utils::{default_conn_config, s3_client, AwsConfigV2};
use risingwave_pb::plan_common::ColumnDesc;
use url::Url;

use crate::{Event, SourceColumnDesc, SourceParser};
Expand Down Expand Up @@ -77,6 +79,86 @@ impl AvroParser {
Err(arvo_schema.err().unwrap())
}
}

pub fn map_to_columns(&self) -> Result<Vec<ColumnDesc>> {
if let Schema::Record { fields, .. } = &self.schema {
let mut index = 0;
Ok(fields
.iter()
.map(|field| {
Self::avro_field_to_column_desc(&field.name, &field.schema, &mut index)
})
.collect::<Result<Vec<_>>>()?)
} else {
Err(RwError::from(InternalError(
"schema invalid, record required".into(),
)))
}
}

fn avro_field_to_column_desc(
name: &str,
schema: &Schema,
index: &mut i32,
) -> Result<ColumnDesc> {
let data_type = Self::avro_type_mapping(schema)?;
if let Schema::Record {
name: schema_name,
fields,
..
} = schema
{
let vec_column = fields
.iter()
.map(|f| Self::avro_field_to_column_desc(&f.name, &f.schema, index))
.collect::<Result<Vec<_>>>()?;
*index += 1;
Ok(ColumnDesc {
column_type: Some(data_type.to_protobuf()),
column_id: *index,
name: name.to_owned(),
field_descs: vec_column,
type_name: schema_name.to_string(),
})
} else {
*index += 1;
Ok(ColumnDesc {
column_type: Some(data_type.to_protobuf()),
column_id: *index,
name: name.to_owned(),
..Default::default()
})
}
}

fn avro_type_mapping(schema: &Schema) -> Result<DataType> {
let data_type = match schema {
Schema::String => DataType::Varchar,
Schema::Int => DataType::Int32,
Schema::Long => DataType::Int64,
Schema::Boolean => DataType::Boolean,
Schema::Float => DataType::Float32,
Schema::Double => DataType::Float64,
Schema::Date => DataType::Date,
Schema::TimestampMillis => DataType::Timestamp,
Schema::Record { fields, .. } => {
let struct_fields = fields
.iter()
.map(|f| Self::avro_type_mapping(&f.schema))
.collect::<Result<Vec<_>>>()?;
let struct_names = fields.iter().map(|f| f.name.clone()).collect_vec();
DataType::new_struct(struct_fields, struct_names)
}
_ => {
return Err(RwError::from(InternalError(format!(
"unsupported type in Avro: {:?}",
schema
))));
}
};

Ok(data_type)
}
}

macro_rules! from_avro_datetime {
Expand Down Expand Up @@ -579,6 +661,14 @@ mod test {
record
}

#[tokio::test]
async fn test_map_to_columns() {
let avro_parser_rs = new_avro_parser_from_local("simple-schema.avsc")
.await
.unwrap();
println!("{:?}", avro_parser_rs.map_to_columns().unwrap());
}

#[tokio::test]
async fn test_new_avro_parser() {
let avro_parser_rs = new_avro_parser_from_local("simple-schema.avsc").await;
Expand Down
2 changes: 1 addition & 1 deletion src/source/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

pub use avro_parser::*;
pub use debezium::*;
pub use json_parser::*;
pub use protobuf_parser::*;
Expand All @@ -24,7 +25,6 @@ use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::Datum;

use crate::parser::avro_parser::AvroParser;
use crate::{SourceColumnDesc, SourceFormat};

mod avro_parser;
Expand Down
48 changes: 45 additions & 3 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ pub struct CreateSourceStatement {
pub enum SourceSchema {
Protobuf(ProtobufSchema),
// Keyword::PROTOBUF ProtobufSchema
Json, // Keyword::JSON
DebeziumJson, // Keyword::DEBEZIUM_JSON
Json, // Keyword::JSON
DebeziumJson, // Keyword::DEBEZIUM_JSON
Avro(AvroSchema), // Keyword::AVRO
}

impl ParseTo for SourceSchema {
Expand All @@ -98,9 +99,12 @@ impl ParseTo for SourceSchema {
SourceSchema::Protobuf(protobuf_schema)
} else if p.parse_keywords(&[Keyword::DEBEZIUM_JSON]) {
SourceSchema::DebeziumJson
} else if p.parse_keywords(&[Keyword::AVRO]) {
impl_parse_to!(avro_schema: AvroSchema, p);
SourceSchema::Avro(avro_schema)
} else {
return Err(ParserError::ParserError(
"expected JSON | PROTOBUF after ROW FORMAT".to_string(),
"expected JSON | PROTOBUF | DEBEZIUMJSON | AVRO after ROW FORMAT".to_string(),
));
};
Ok(schema)
Expand All @@ -113,6 +117,7 @@ impl fmt::Display for SourceSchema {
SourceSchema::Protobuf(protobuf_schema) => write!(f, "PROTOBUF {}", protobuf_schema),
SourceSchema::Json => write!(f, "JSON"),
SourceSchema::DebeziumJson => write!(f, "DEBEZIUM JSON"),
SourceSchema::Avro(avro_schema) => write!(f, "AVRO {}", avro_schema),
}
}
}
Expand Down Expand Up @@ -154,6 +159,43 @@ impl fmt::Display for ProtobufSchema {
}
}

// sql_grammar!(AvroSchema {
// [Keyword::MESSAGE],
// message_name: AstString,
// [Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION],
// row_schema_location: AstString,
// });
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct AvroSchema {
pub message_name: AstString,
pub row_schema_location: AstString,
}

impl ParseTo for AvroSchema {
fn parse_to(p: &mut Parser) -> Result<Self, ParserError> {
impl_parse_to!([Keyword::MESSAGE], p);
impl_parse_to!(message_name: AstString, p);
impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
impl_parse_to!(row_schema_location: AstString, p);
Ok(Self {
message_name,
row_schema_location,
})
}
}

impl fmt::Display for AvroSchema {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut v: Vec<String> = vec![];
impl_fmt_display!([Keyword::MESSAGE], v);
impl_fmt_display!(message_name, v, self);
impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
impl_fmt_display!(row_schema_location, v, self);
v.iter().join(" ").fmt(f)
}
}

impl ParseTo for CreateSourceStatement {
fn parse_to(p: &mut Parser) -> Result<Self, ParserError> {
impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
Expand Down

0 comments on commit d0a377d

Please sign in to comment.