Skip to content

Commit

Permalink
fix row_id of nexmark
Browse files Browse the repository at this point in the history
  • Loading branch information
waruto210 committed Feb 10, 2023
1 parent 1268f9a commit 4ca4d8d
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 76 deletions.
6 changes: 3 additions & 3 deletions e2e_test/compaction/ingest_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ with (
nexmark.table.type = 'Person',
nexmark.split.num = '12',
nexmark.min.event.gap.in.ns = '0'
) ROW FORMAT JSON;
);

statement ok
CREATE SOURCE auction (id BIGINT, "item_name" VARCHAR, description VARCHAR, "initial_bid" BIGINT, reserve BIGINT, "date_time" TIMESTAMP, expires TIMESTAMP, seller BIGINT, category BIGINT, "extra" VARCHAR)
Expand All @@ -15,7 +15,7 @@ with (
nexmark.table.type = 'Auction',
nexmark.split.num = '12',
nexmark.min.event.gap.in.ns = '0'
) ROW FORMAT JSON;
);

statement ok
CREATE SOURCE bid (auction BIGINT, bidder BIGINT, price BIGINT, "channel" VARCHAR, "url" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR)
Expand All @@ -24,7 +24,7 @@ with (
nexmark.table.type = 'Bid',
nexmark.split.num = '12',
nexmark.min.event.gap.in.ns = '0'
) ROW FORMAT JSON;
);

statement ok
CREATE MATERIALIZED VIEW nexmark_q7 AS
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/nexmark/create_sources.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ CREATE SOURCE nexmark (
connector = 'nexmark',
nexmark.split.num = '2',
nexmark.min.event.gap.in.ns = '100'
) ROW FORMAT JSON;
);

statement ok
CREATE VIEW PERSON as select (person).* from nexmark where event_type = 0;
Expand Down
6 changes: 3 additions & 3 deletions e2e_test/source/basic/datagen.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ create table s1 (v1 int, v2 float) with (
fields.v2.end = '20',
datagen.rows.per.second='15',
datagen.split.num = '1'
) row format json;
);

# Wait enough time to ensure Datagen connector generate data
sleep 2s
Expand All @@ -19,7 +19,7 @@ flush;

# Will only generate 10 records since `fields.v1.end` is 10
query II rowsort
select v1, v2 from s1 where v1 is not null limit 15;
select v1, v2 from s1 limit 15;
----
1 11
10 20
Expand All @@ -43,7 +43,7 @@ create table s1 (v1 int) with (
fields.v1.end = '100',
datagen.rows.per.second = '10',
datagen.split.num = '5'
) row format json;
);

# Wait enough time to ensure Datagen connector generate data
sleep 2s
Expand Down
32 changes: 22 additions & 10 deletions src/connector/src/source/nexmark/source/combined_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ pub fn new_combined_event(event: Event) -> CombinedEvent {
}
}

pub(crate) fn get_event_data_types(event_type: Option<EventType>) -> Vec<DataType> {
pub(crate) fn get_event_data_types(
event_type: Option<EventType>,
row_id_index: Option<usize>,
) -> Vec<DataType> {
let mut fields = match event_type {
None => {
vec![
Expand All @@ -80,8 +83,12 @@ pub(crate) fn get_event_data_types(event_type: Option<EventType>) -> Vec<DataTyp
Some(EventType::Auction) => get_auction_struct_type().fields,
Some(EventType::Bid) => get_bid_struct_type().fields,
};
// _row_id
fields.push(DataType::Int64);

if let Some(row_id_index) = row_id_index {
// _row_id
fields.insert(row_id_index, DataType::Int64);
}

fields
}

Expand Down Expand Up @@ -179,8 +186,8 @@ pub(crate) fn get_bid_struct_type() -> StructType {
}
}

pub(crate) fn combined_event_to_row(e: CombinedEvent) -> OwnedRow {
let fields = vec![
pub(crate) fn combined_event_to_row(e: CombinedEvent, row_id_index: Option<usize>) -> OwnedRow {
let mut fields = vec![
Some(ScalarImpl::Int64(e.event_type as i64)),
e.person
.map(person_to_datum)
Expand All @@ -191,21 +198,26 @@ pub(crate) fn combined_event_to_row(e: CombinedEvent) -> OwnedRow {
e.bid
.map(bid_to_datum)
.map(|fields| StructValue::new(fields).into()),
// _row_id
None,
];

if let Some(row_id_index) = row_id_index {
// _row_id
fields.insert(row_id_index, None);
}

OwnedRow::new(fields)
}

pub(crate) fn event_to_row(e: Event) -> OwnedRow {
pub(crate) fn event_to_row(e: Event, row_id_index: Option<usize>) -> OwnedRow {
let mut fields = match e {
Event::Person(p) => person_to_datum(p),
Event::Auction(a) => auction_to_datum(a),
Event::Bid(b) => bid_to_datum(b),
};
// _row_id
fields.push(None);
if let Some(row_id_index) = row_id_index {
// _row_id
fields.insert(row_id_index, None);
}
OwnedRow::new(fields)
}

Expand Down
16 changes: 11 additions & 5 deletions src/connector/src/source/nexmark/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ pub struct NexmarkSplitReader {
min_event_gap_in_ns: u64,
max_chunk_size: u64,

row_id_index: Option<usize>,
split_id: SplitId,
parser_config: ParserConfig,
metrics: Arc<SourceMetrics>,
source_info: SourceInfo,
}
Expand Down Expand Up @@ -88,6 +88,12 @@ impl SplitReaderV2 for NexmarkSplitReader {
generator = generator.with_type_filter(*event_type);
}

let row_id_index = parser_config
.common
.rw_columns
.into_iter()
.position(|column| column.is_row_id);

Ok(NexmarkSplitReader {
generator,
assigned_split,
Expand All @@ -97,7 +103,7 @@ impl SplitReaderV2 for NexmarkSplitReader {
event_type: properties.table_type,
use_real_time: properties.use_real_time,
min_event_gap_in_ns: properties.min_event_gap_in_ns,
parser_config,
row_id_index,
metrics,
source_info,
})
Expand All @@ -116,7 +122,7 @@ impl NexmarkSplitReader {
let start_time = Instant::now();
let start_offset = self.generator.global_offset();
let start_ts = self.generator.timestamp();
let event_dtypes = get_event_data_types(self.event_type);
let event_dtypes = get_event_data_types(self.event_type, self.row_id_index);
loop {
let mut rows = vec![];
while (rows.len() as u64) < self.max_chunk_size {
Expand All @@ -125,8 +131,8 @@ impl NexmarkSplitReader {
}
let event = self.generator.next().unwrap();
let row = match self.event_type {
Some(_) => event_to_row(event),
None => combined_event_to_row(new_combined_event(event)),
Some(_) => event_to_row(event, self.row_id_index),
None => combined_event_to_row(new_combined_event(event), self.row_id_index),
};
rows.push((Op::Insert, row));
}
Expand Down
Loading

0 comments on commit 4ca4d8d

Please sign in to comment.