Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(source): intro native row format #7612

Merged
merged 7 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dashboard/proto/gen/plan_common.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions e2e_test/compaction/ingest_rows.slt
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
statement ok
CREATE SOURCE person
(id INTEGER, name VARCHAR, "email_address" VARCHAR, "credit_card" VARCHAR, city VARCHAR, state VARCHAR, "date_time" TIMESTAMP)
(id BIGINT, name VARCHAR, "email_address" VARCHAR, "credit_card" VARCHAR, city VARCHAR, state VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Person',
nexmark.split.num = '12',
nexmark.min.event.gap.in.ns = '0'
) ROW FORMAT JSON;
);

statement ok
CREATE SOURCE auction (id INTEGER, "item_name" VARCHAR, description VARCHAR, "initial_bid" INTEGER, reserve INTEGER, "date_time" TIMESTAMP, expires TIMESTAMP, seller INTEGER, category INTEGER)
CREATE SOURCE auction (id BIGINT, "item_name" VARCHAR, description VARCHAR, "initial_bid" BIGINT, reserve BIGINT, "date_time" TIMESTAMP, expires TIMESTAMP, seller BIGINT, category BIGINT, "extra" VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Auction',
nexmark.split.num = '12',
nexmark.min.event.gap.in.ns = '0'
) ROW FORMAT JSON;
);

statement ok
CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, "date_time" TIMESTAMP)
CREATE SOURCE bid (auction BIGINT, bidder BIGINT, price BIGINT, "channel" VARCHAR, "url" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Bid',
nexmark.split.num = '12',
nexmark.min.event.gap.in.ns = '0'
) ROW FORMAT JSON;
);

statement ok
CREATE MATERIALIZED VIEW nexmark_q7 AS
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/nexmark/create_sources.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ CREATE SOURCE nexmark (
connector = 'nexmark',
nexmark.split.num = '2',
nexmark.min.event.gap.in.ns = '100'
) ROW FORMAT JSON;
);

statement ok
CREATE VIEW PERSON as select (person).* from nexmark where event_type = 0;
Expand Down
6 changes: 3 additions & 3 deletions e2e_test/source/basic/datagen.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ create table s1 (v1 int, v2 float) with (
fields.v2.end = '20',
datagen.rows.per.second='15',
datagen.split.num = '1'
) row format json;
);

# Wait enough time to ensure Datagen connector generate data
sleep 2s
Expand All @@ -19,7 +19,7 @@ flush;

# Will only generate 10 records since `fields.v1.end` is 10
query II rowsort
select v1, v2 from s1 where v1 is not null limit 15;
select v1, v2 from s1 limit 15;
----
1 11
10 20
Expand All @@ -43,7 +43,7 @@ create table s1 (v1 int) with (
fields.v1.end = '100',
datagen.rows.per.second = '10',
datagen.split.num = '5'
) row format json;
);

# Wait enough time to ensure Datagen connector generate data
sleep 2s
Expand Down
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@ enum RowFormatType {
MAXWELL = 5;
CANAL_JSON = 6;
CSV = 7;
NATIVE = 8;
}
1 change: 1 addition & 0 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
RowFormatType::Avro => SourceFormat::Avro,
RowFormatType::Maxwell => SourceFormat::Maxwell,
RowFormatType::CanalJson => SourceFormat::CanalJson,
RowFormatType::Native => SourceFormat::Native,
_ => unreachable!(),
};
if format == SourceFormat::Protobuf && info.row_schema_location.is_empty() {
Expand Down
16 changes: 8 additions & 8 deletions src/common/src/field_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl FieldGeneratorImpl {
Ok(FieldGeneratorImpl::List(Box::new(field), list_length))
}

pub fn generate(&mut self, offset: u64) -> Value {
pub fn generate_json(&mut self, offset: u64) -> Value {
match self {
FieldGeneratorImpl::I16Sequence(f) => f.generate(),
FieldGeneratorImpl::I32Sequence(f) => f.generate(),
Expand All @@ -202,13 +202,13 @@ impl FieldGeneratorImpl {
FieldGeneratorImpl::Struct(fields) => {
let map = fields
.iter_mut()
.map(|(name, gen)| (name.clone(), gen.generate(offset)))
.map(|(name, gen)| (name.clone(), gen.generate_json(offset)))
.collect();
Value::Object(map)
}
FieldGeneratorImpl::List(field, list_length) => {
let vec = (0..*list_length)
.map(|_| field.generate(offset))
.map(|_| field.generate_json(offset))
.collect::<Vec<_>>();
Value::Array(vec)
}
Expand Down Expand Up @@ -269,7 +269,7 @@ mod tests {

for step in 0..5 {
for (index, i32_field) in i32_fields.iter_mut().enumerate() {
let value = i32_field.generate(0);
let value = i32_field.generate_json(0);
assert!(value.is_number());
let num = value.as_u64();
let expected_num = split_num * step + 1 + index as u64;
Expand Down Expand Up @@ -298,13 +298,13 @@ mod tests {
_ => FieldGeneratorImpl::with_number_random(data_type, None, None, seed).unwrap(),
};

let val1 = generator.generate(1);
let val2 = generator.generate(2);
let val1 = generator.generate_json(1);
let val2 = generator.generate_json(2);

assert_ne!(val1, val2);

let val1_new = generator.generate(1);
let val2_new = generator.generate(2);
let val1_new = generator.generate_json(1);
let val2_new = generator.generate_json(2);

assert_eq!(val1_new, val1);
assert_eq!(val2_new, val2);
Expand Down
20 changes: 19 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ impl ByteStreamSourceParserImpl {
DebeziumJsonParser::new(rw_columns).map(Self::DebeziumJson)
}
SpecificParserConfig::Maxwell => MaxwellParser::new(rw_columns).map(Self::Maxwell),
SpecificParserConfig::Native => {
unreachable!("Native parser should not be created")
}
}
}
}
Expand All @@ -370,14 +373,28 @@ pub enum SpecificParserConfig {
Csv(CsvParserConfig),
Avro(AvroParserConfig),
Protobuf(ProtobufParserConfig),
#[default]
Json,
DebeziumJson,
Maxwell,
CanalJson,
#[default]
Native,
}

impl SpecificParserConfig {
pub fn get_source_format(&self) -> SourceFormat {
match self {
SpecificParserConfig::Avro(_) => SourceFormat::Avro,
SpecificParserConfig::Csv(_) => SourceFormat::Csv,
SpecificParserConfig::Protobuf(_) => SourceFormat::Protobuf,
SpecificParserConfig::Json => SourceFormat::Json,
SpecificParserConfig::DebeziumJson => SourceFormat::DebeziumJson,
SpecificParserConfig::Maxwell => SourceFormat::Maxwell,
SpecificParserConfig::CanalJson => SourceFormat::CanalJson,
SpecificParserConfig::Native => SourceFormat::Native,
}
}

pub async fn new(
format: SourceFormat,
info: &StreamSourceInfo,
Expand Down Expand Up @@ -405,6 +422,7 @@ impl SpecificParserConfig {
SourceFormat::DebeziumJson => SpecificParserConfig::DebeziumJson,
SourceFormat::Maxwell => SpecificParserConfig::Maxwell,
SourceFormat::CanalJson => SpecificParserConfig::CanalJson,
SourceFormat::Native => SpecificParserConfig::Native,
_ => {
return Err(RwError::from(ProtocolError(
"invalid source format".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub enum SourceFormat {
Maxwell,
CanalJson,
Csv,
Native,
}

pub type BoxSourceStream = BoxStream<'static, Result<Vec<SourceMessage>>>;
Expand All @@ -109,7 +110,7 @@ pub type BoxSourceWithStateStream = BoxStream<'static, Result<StreamChunkWithSta
/// [`StreamChunkWithState`] returns stream chunk together with offset for each split. In the
/// current design, one connector source can have multiple split reader. The keys are unique
/// `split_id` and values are the latest offset for each split.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct StreamChunkWithState {
pub chunk: StreamChunk,
pub split_offset_mapping: Option<HashMap<SplitId, String>>,
Expand Down
Loading