Skip to content

Commit

Permalink
perf(source): intro native row format (#7612)
Browse files Browse the repository at this point in the history
- intorcude new `NATIVE` ROW FORMAT, which is the default row format of `nexmark` and  `datagen` connecotr, and it is invisible to user.
- `nexmark` only support `NATIVE` ROW FORMAT, but `datagen` can support multiple formats(only `native` and `json` now)

Approved-By: tabVersion
  • Loading branch information
waruto210 authored Feb 13, 2023
1 parent 45b5e6b commit 1bbf7bd
Show file tree
Hide file tree
Showing 24 changed files with 620 additions and 202 deletions.
6 changes: 6 additions & 0 deletions dashboard/proto/gen/plan_common.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions e2e_test/compaction/ingest_rows.slt
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
statement ok
CREATE SOURCE person
(id INTEGER, name VARCHAR, "email_address" VARCHAR, "credit_card" VARCHAR, city VARCHAR, state VARCHAR, "date_time" TIMESTAMP)
(id BIGINT, name VARCHAR, "email_address" VARCHAR, "credit_card" VARCHAR, city VARCHAR, state VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Person',
nexmark.split.num = '12',
nexmark.min.event.gap.in.ns = '0'
) ROW FORMAT JSON;
);

statement ok
CREATE SOURCE auction (id INTEGER, "item_name" VARCHAR, description VARCHAR, "initial_bid" INTEGER, reserve INTEGER, "date_time" TIMESTAMP, expires TIMESTAMP, seller INTEGER, category INTEGER)
CREATE SOURCE auction (id BIGINT, "item_name" VARCHAR, description VARCHAR, "initial_bid" BIGINT, reserve BIGINT, "date_time" TIMESTAMP, expires TIMESTAMP, seller BIGINT, category BIGINT, "extra" VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Auction',
nexmark.split.num = '12',
nexmark.min.event.gap.in.ns = '0'
) ROW FORMAT JSON;
);

statement ok
CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, "date_time" TIMESTAMP)
CREATE SOURCE bid (auction BIGINT, bidder BIGINT, price BIGINT, "channel" VARCHAR, "url" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Bid',
nexmark.split.num = '12',
nexmark.min.event.gap.in.ns = '0'
) ROW FORMAT JSON;
);

statement ok
CREATE MATERIALIZED VIEW nexmark_q7 AS
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/nexmark/create_sources.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ CREATE SOURCE nexmark (
connector = 'nexmark',
nexmark.split.num = '2',
nexmark.min.event.gap.in.ns = '100'
) ROW FORMAT JSON;
);

statement ok
CREATE VIEW PERSON as select (person).* from nexmark where event_type = 0;
Expand Down
6 changes: 3 additions & 3 deletions e2e_test/source/basic/datagen.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ create table s1 (v1 int, v2 float) with (
fields.v2.end = '20',
datagen.rows.per.second='15',
datagen.split.num = '1'
) row format json;
);

# Wait enough time to ensure Datagen connector generate data
sleep 2s
Expand All @@ -19,7 +19,7 @@ flush;

# Will only generate 10 records since `fields.v1.end` is 10
query II rowsort
select v1, v2 from s1 where v1 is not null limit 15;
select v1, v2 from s1 limit 15;
----
1 11
10 20
Expand All @@ -43,7 +43,7 @@ create table s1 (v1 int) with (
fields.v1.end = '100',
datagen.rows.per.second = '10',
datagen.split.num = '5'
) row format json;
);

# Wait enough time to ensure Datagen connector generate data
sleep 2s
Expand Down
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@ enum RowFormatType {
MAXWELL = 5;
CANAL_JSON = 6;
CSV = 7;
NATIVE = 8;
}
1 change: 1 addition & 0 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
RowFormatType::Avro => SourceFormat::Avro,
RowFormatType::Maxwell => SourceFormat::Maxwell,
RowFormatType::CanalJson => SourceFormat::CanalJson,
RowFormatType::Native => SourceFormat::Native,
_ => unreachable!(),
};
if format == SourceFormat::Protobuf && info.row_schema_location.is_empty() {
Expand Down
16 changes: 8 additions & 8 deletions src/common/src/field_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl FieldGeneratorImpl {
Ok(FieldGeneratorImpl::List(Box::new(field), list_length))
}

pub fn generate(&mut self, offset: u64) -> Value {
pub fn generate_json(&mut self, offset: u64) -> Value {
match self {
FieldGeneratorImpl::I16Sequence(f) => f.generate(),
FieldGeneratorImpl::I32Sequence(f) => f.generate(),
Expand All @@ -202,13 +202,13 @@ impl FieldGeneratorImpl {
FieldGeneratorImpl::Struct(fields) => {
let map = fields
.iter_mut()
.map(|(name, gen)| (name.clone(), gen.generate(offset)))
.map(|(name, gen)| (name.clone(), gen.generate_json(offset)))
.collect();
Value::Object(map)
}
FieldGeneratorImpl::List(field, list_length) => {
let vec = (0..*list_length)
.map(|_| field.generate(offset))
.map(|_| field.generate_json(offset))
.collect::<Vec<_>>();
Value::Array(vec)
}
Expand Down Expand Up @@ -269,7 +269,7 @@ mod tests {

for step in 0..5 {
for (index, i32_field) in i32_fields.iter_mut().enumerate() {
let value = i32_field.generate(0);
let value = i32_field.generate_json(0);
assert!(value.is_number());
let num = value.as_u64();
let expected_num = split_num * step + 1 + index as u64;
Expand Down Expand Up @@ -298,13 +298,13 @@ mod tests {
_ => FieldGeneratorImpl::with_number_random(data_type, None, None, seed).unwrap(),
};

let val1 = generator.generate(1);
let val2 = generator.generate(2);
let val1 = generator.generate_json(1);
let val2 = generator.generate_json(2);

assert_ne!(val1, val2);

let val1_new = generator.generate(1);
let val2_new = generator.generate(2);
let val1_new = generator.generate_json(1);
let val2_new = generator.generate_json(2);

assert_eq!(val1_new, val1);
assert_eq!(val2_new, val2);
Expand Down
20 changes: 19 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ impl ByteStreamSourceParserImpl {
DebeziumJsonParser::new(rw_columns).map(Self::DebeziumJson)
}
SpecificParserConfig::Maxwell => MaxwellParser::new(rw_columns).map(Self::Maxwell),
SpecificParserConfig::Native => {
unreachable!("Native parser should not be created")
}
}
}
}
Expand All @@ -370,14 +373,28 @@ pub enum SpecificParserConfig {
Csv(CsvParserConfig),
Avro(AvroParserConfig),
Protobuf(ProtobufParserConfig),
#[default]
Json,
DebeziumJson,
Maxwell,
CanalJson,
#[default]
Native,
}

impl SpecificParserConfig {
pub fn get_source_format(&self) -> SourceFormat {
match self {
SpecificParserConfig::Avro(_) => SourceFormat::Avro,
SpecificParserConfig::Csv(_) => SourceFormat::Csv,
SpecificParserConfig::Protobuf(_) => SourceFormat::Protobuf,
SpecificParserConfig::Json => SourceFormat::Json,
SpecificParserConfig::DebeziumJson => SourceFormat::DebeziumJson,
SpecificParserConfig::Maxwell => SourceFormat::Maxwell,
SpecificParserConfig::CanalJson => SourceFormat::CanalJson,
SpecificParserConfig::Native => SourceFormat::Native,
}
}

pub async fn new(
format: SourceFormat,
info: &StreamSourceInfo,
Expand Down Expand Up @@ -405,6 +422,7 @@ impl SpecificParserConfig {
SourceFormat::DebeziumJson => SpecificParserConfig::DebeziumJson,
SourceFormat::Maxwell => SpecificParserConfig::Maxwell,
SourceFormat::CanalJson => SpecificParserConfig::CanalJson,
SourceFormat::Native => SpecificParserConfig::Native,
_ => {
return Err(RwError::from(ProtocolError(
"invalid source format".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub enum SourceFormat {
Maxwell,
CanalJson,
Csv,
Native,
}

pub type BoxSourceStream = BoxStream<'static, Result<Vec<SourceMessage>>>;
Expand All @@ -109,7 +110,7 @@ pub type BoxSourceWithStateStream = BoxStream<'static, Result<StreamChunkWithSta
/// [`StreamChunkWithState`] returns stream chunk together with offset for each split. In the
/// current design, one connector source can have multiple split reader. The keys are unique
/// `split_id` and values are the latest offset for each split.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct StreamChunkWithState {
pub chunk: StreamChunk,
pub split_offset_mapping: Option<HashMap<SplitId, String>>,
Expand Down
Loading

0 comments on commit 1bbf7bd

Please sign in to comment.