From a4c62ff3fb5046c7176edfbc660f7a07ec00ad2f Mon Sep 17 00:00:00 2001 From: waruto Date: Fri, 10 Feb 2023 15:51:55 +0800 Subject: [PATCH] fix row_id of nexmark --- e2e_test/compaction/ingest_rows.slt | 6 +- e2e_test/nexmark/create_sources.slt.part | 2 +- e2e_test/source/basic/datagen.slt | 6 +- .../source/nexmark/source/combined_event.rs | 32 +++++++--- .../src/source/nexmark/source/reader.rs | 16 +++-- .../tests/testdata/nexmark_source.yaml | 64 +++++++++---------- .../planner_test/tests/testdata/share.yaml | 8 +-- src/source/src/source_desc.rs | 1 + src/sqlparser/tests/testdata/create.yaml | 4 +- .../src/executor/source/source_executor.rs | 8 +-- .../simulation/src/nexmark/create_source.sql | 6 +- 11 files changed, 86 insertions(+), 67 deletions(-) diff --git a/e2e_test/compaction/ingest_rows.slt b/e2e_test/compaction/ingest_rows.slt index ed48470ed7df8..3b59de609f674 100644 --- a/e2e_test/compaction/ingest_rows.slt +++ b/e2e_test/compaction/ingest_rows.slt @@ -6,7 +6,7 @@ with ( nexmark.table.type = 'Person', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0' -) ROW FORMAT JSON; +); statement ok CREATE SOURCE auction (id BIGINT, "item_name" VARCHAR, description VARCHAR, "initial_bid" BIGINT, reserve BIGINT, "date_time" TIMESTAMP, expires TIMESTAMP, seller BIGINT, category BIGINT, "extra" VARCHAR) @@ -15,7 +15,7 @@ with ( nexmark.table.type = 'Auction', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0' -) ROW FORMAT JSON; +); statement ok CREATE SOURCE bid (auction BIGINT, bidder BIGINT, price BIGINT, "channel" VARCHAR, "url" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR) @@ -24,7 +24,7 @@ with ( nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0' -) ROW FORMAT JSON; +); statement ok CREATE MATERIALIZED VIEW nexmark_q7 AS diff --git a/e2e_test/nexmark/create_sources.slt.part b/e2e_test/nexmark/create_sources.slt.part index 1b852ea5a410d..88e62229efe96 100644 --- a/e2e_test/nexmark/create_sources.slt.part +++ b/e2e_test/nexmark/create_sources.slt.part @@ -30,7 +30,7 @@ CREATE SOURCE nexmark ( connector = 'nexmark', nexmark.split.num = '2', nexmark.min.event.gap.in.ns = '100' -) ROW FORMAT JSON; +); statement ok CREATE VIEW PERSON as select (person).* from nexmark where event_type = 0; diff --git a/e2e_test/source/basic/datagen.slt b/e2e_test/source/basic/datagen.slt index c32e95f33b06a..d238f1baa9e75 100644 --- a/e2e_test/source/basic/datagen.slt +++ b/e2e_test/source/basic/datagen.slt @@ -9,7 +9,7 @@ create table s1 (v1 int, v2 float) with ( fields.v2.end = '20', datagen.rows.per.second='15', datagen.split.num = '1' -) row format json; +); # Wait enough time to ensure Datagen connector generate data sleep 2s @@ -19,7 +19,7 @@ flush; # Will only generate 10 records since `fields.v1.end` is 10 query II rowsort -select v1, v2 from s1 where v1 is not null limit 15; +select v1, v2 from s1 limit 15; ---- 1 11 10 20 @@ -43,7 +43,7 @@ create table s1 (v1 int) with ( fields.v1.end = '100', datagen.rows.per.second = '10', datagen.split.num = '5' -) row format json; +); # Wait enough time to ensure Datagen connector generate data sleep 2s diff --git a/src/connector/src/source/nexmark/source/combined_event.rs b/src/connector/src/source/nexmark/source/combined_event.rs index e43de301b7cb6..290ee1596c26b 100644 --- a/src/connector/src/source/nexmark/source/combined_event.rs +++ b/src/connector/src/source/nexmark/source/combined_event.rs @@ -66,7 +66,10 @@ pub fn new_combined_event(event: Event) -> CombinedEvent { } } -pub(crate) fn get_event_data_types(event_type: Option) -> Vec { +pub(crate) fn get_event_data_types( + event_type: Option, + row_id_index: Option, +) -> Vec { let mut fields = match event_type { None => { vec![ @@ -80,8 +83,12 @@ pub(crate) fn get_event_data_types(event_type: Option) -> Vec get_auction_struct_type().fields, Some(EventType::Bid) => get_bid_struct_type().fields, }; - // _row_id - fields.push(DataType::Int64); + + if let Some(row_id_index) = row_id_index { + // _row_id + fields.insert(row_id_index, DataType::Int64); + } + fields } @@ -179,8 +186,8 @@ pub(crate) fn get_bid_struct_type() -> StructType { } } -pub(crate) fn combined_event_to_row(e: CombinedEvent) -> OwnedRow { - let fields = vec![ +pub(crate) fn combined_event_to_row(e: CombinedEvent, row_id_index: Option) -> OwnedRow { + let mut fields = vec![ Some(ScalarImpl::Int64(e.event_type as i64)), e.person .map(person_to_datum) @@ -191,21 +198,26 @@ pub(crate) fn combined_event_to_row(e: CombinedEvent) -> OwnedRow { e.bid .map(bid_to_datum) .map(|fields| StructValue::new(fields).into()), - // _row_id - None, ]; + if let Some(row_id_index) = row_id_index { + // _row_id + fields.insert(row_id_index, None); + } + OwnedRow::new(fields) } -pub(crate) fn event_to_row(e: Event) -> OwnedRow { +pub(crate) fn event_to_row(e: Event, row_id_index: Option) -> OwnedRow { let mut fields = match e { Event::Person(p) => person_to_datum(p), Event::Auction(a) => auction_to_datum(a), Event::Bid(b) => bid_to_datum(b), }; - // _row_id - fields.push(None); + if let Some(row_id_index) = row_id_index { + // _row_id + fields.insert(row_id_index, None); + } OwnedRow::new(fields) } diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index e77d4572eae0e..387b9db0843a1 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -49,8 +49,8 @@ pub struct NexmarkSplitReader { min_event_gap_in_ns: u64, max_chunk_size: u64, + row_id_index: Option, split_id: SplitId, - parser_config: ParserConfig, metrics: Arc, source_info: SourceInfo, } @@ -88,6 +88,12 @@ impl SplitReaderV2 for NexmarkSplitReader { generator = generator.with_type_filter(*event_type); } + let row_id_index = parser_config + .common + .rw_columns + .into_iter() + .position(|column| column.is_row_id); + Ok(NexmarkSplitReader { generator, assigned_split, @@ -97,7 +103,7 @@ impl SplitReaderV2 for NexmarkSplitReader { event_type: properties.table_type, use_real_time: properties.use_real_time, min_event_gap_in_ns: properties.min_event_gap_in_ns, - parser_config, + row_id_index, metrics, source_info, }) @@ -116,7 +122,7 @@ impl NexmarkSplitReader { let start_time = Instant::now(); let start_offset = self.generator.global_offset(); let start_ts = self.generator.timestamp(); - let event_dtypes = get_event_data_types(self.event_type); + let event_dtypes = get_event_data_types(self.event_type, self.row_id_index); loop { let mut rows = vec![]; while (rows.len() as u64) < self.max_chunk_size { @@ -125,8 +131,8 @@ impl NexmarkSplitReader { } let event = self.generator.next().unwrap(); let row = match self.event_type { - Some(_) => event_to_row(event), - None => combined_event_to_row(new_combined_event(event)), + Some(_) => event_to_row(event, self.row_id_index), + None => combined_event_to_row(new_combined_event(event), self.row_id_index), }; rows.push((Op::Insert, row)); } diff --git a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml index 2068dc2addb81..ce219f8e19189 100644 --- a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml +++ b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml @@ -14,7 +14,7 @@ with ( connector = 'nexmark', nexmark.table.type = 'Auction' - ) row format JSON; + ); create source bid ( auction INTEGER, @@ -27,7 +27,7 @@ with ( connector = 'nexmark', nexmark.table.type = 'Bid' - ) row format JSON; + ); create source person ( id INTEGER, @@ -40,10 +40,10 @@ with ( connector = 'nexmark', nexmark.table.type = 'Person' - ) row format JSON; + ); - id: nexmark_q0 before: - - create_sources + - create_sources sql: | SELECT auction, bidder, price, date_time FROM bid; batch_plan: | @@ -72,7 +72,7 @@ Table 4294967294 { columns: [auction, bidder, price, date_time, _row_id], primary key: [$4 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [4] } - id: nexmark_q1 before: - - create_sources + - create_sources sql: | SELECT auction, @@ -106,7 +106,7 @@ Table 4294967294 { columns: [auction, bidder, price, date_time, _row_id], primary key: [$4 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [4] } - id: nexmark_q2 before: - - create_sources + - create_sources sql: SELECT auction, price FROM bid WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087; batch_plan: | BatchExchange { order: [], dist: Single } @@ -137,7 +137,7 @@ Table 4294967294 { columns: [auction, price, _row_id], primary key: [$2 ASC], value indices: [0, 1, 2], distribution key: [2] } - id: nexmark_q3 before: - - create_sources + - create_sources sql: | SELECT P.name, P.city, P.state, A.id @@ -201,7 +201,7 @@ Table 4294967294 { columns: [name, city, state, id, _row_id, seller, _row_id#1, id#1], primary key: [$4 ASC, $6 ASC, $5 ASC, $7 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7], distribution key: [5] } - id: nexmark_q4 before: - - create_sources + - create_sources sql: | SELECT Q.category, @@ -289,7 +289,7 @@ Table 4294967294 { columns: [category, avg], primary key: [$0 ASC], value indices: [0, 1], distribution key: [0] } - id: nexmark_q5 before: - - create_sources + - create_sources sql: | SELECT AuctionBids.auction, AuctionBids.num FROM ( SELECT @@ -420,7 +420,7 @@ Table 4294967294 { columns: [auction, num, window_start, window_start#1], primary key: [$2 ASC, $0 ASC, $3 ASC], value indices: [0, 1, 2, 3], distribution key: [2] } - id: nexmark_q6 before: - - create_sources + - create_sources sql: | SELECT Q.seller, @@ -438,7 +438,7 @@ Tracking issue: https://github.com/risingwavelabs/risingwave/issues/4978 - id: nexmark_q7 before: - - create_sources + - create_sources sql: | SELECT B.auction, @@ -533,7 +533,7 @@ Table 4294967294 { columns: [auction, price, bidder, date_time, _row_id, (TumbleStart(date_time, '00:00:10':Interval) + '00:00:10':Interval), max(price)], primary key: [$4 ASC, $5 ASC, $1 ASC, $6 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [1] } - id: nexmark_q8 before: - - create_sources + - create_sources sql: | SELECT P.id, @@ -636,7 +636,7 @@ Table 4294967294 { columns: [id, name, starttime, (TumbleStart(date_time, '00:00:10':Interval) + '00:00:10':Interval), seller, TumbleStart(date_time, '00:00:10':Interval), (TumbleStart(date_time, '00:00:10':Interval) + '00:00:10':Interval)#1], primary key: [$0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC, $5 ASC, $6 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [0, 2, 3] } - id: nexmark_q9 before: - - create_sources + - create_sources sql: | SELECT id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, @@ -718,7 +718,7 @@ Table 4294967294 { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id, _row_id#1], primary key: [$13 ASC, $14 ASC, $0 ASC, $9 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], distribution key: [0] } - id: nexmark_q10 before: - - create_sources + - create_sources sql: | SELECT auction, bidder, price, date_time, TO_CHAR(date_time, 'YYYY-MM-DD') as date, TO_CHAR(date_time, 'HH:MI') as time FROM bid; batch_plan: | @@ -747,7 +747,7 @@ Table 4294967294 { columns: [auction, bidder, price, date_time, date, time, _row_id], primary key: [$6 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [6] } - id: nexmark_q11 before: - - create_sources + - create_sources sql: | SELECT B.bidder, @@ -761,7 +761,7 @@ Tracking issue: https://github.com/risingwavelabs/risingwave/issues/112 - id: nexmark_q12 before: - - create_sources + - create_sources sql: | SELECT B.bidder, @@ -775,7 +775,7 @@ Tracking issue: https://github.com/risingwavelabs/risingwave/issues/112 - id: nexmark_q13 before: - - create_sources + - create_sources sql: | /* SELECT B.auction, @@ -787,10 +787,10 @@ JOIN side_input FOR SYSTEM_TIME AS OF B.p_time AS S ON mod(B.auction, 10000) = S.key; */ select 1; - stream_error: 'Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation' + stream_error: "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation" - id: nexmark_q14 before: - - create_sources + - create_sources sql: | SELECT auction, @@ -843,7 +843,7 @@ Table 4294967294 { columns: [auction, bidder, price, bidtimetype, date_time, extra, _row_id], primary key: [$6 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [6] } - id: nexmark_q15 before: - - create_sources + - create_sources sql: | SELECT TO_CHAR(date_time, 'yyyy-MM-dd') as day, @@ -911,7 +911,7 @@ Table 4294967294 { columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], primary key: [$0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], distribution key: [0] } - id: nexmark_q16 before: - - create_sources + - create_sources sql: | SELECT channel, @@ -982,7 +982,7 @@ Table 4294967294 { columns: [channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], primary key: [$0 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], distribution key: [0, 1] } - id: nexmark_q17 before: - - create_sources + - create_sources sql: | SELECT auction, @@ -1032,7 +1032,7 @@ Table 4294967294 { columns: [auction, day, total_bids, rank1_bids, rank2_bids, rank3_bids, min_price, max_price, avg_price, sum_price], primary key: [$0 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], distribution key: [0, 1] } - id: nexmark_q18 before: - - create_sources + - create_sources sql: | SELECT auction, bidder, price, channel, url, date_time, extra FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY bidder, auction ORDER BY date_time DESC) AS rank_number @@ -1080,7 +1080,7 @@ Table 4294967294 { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], primary key: [$7 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7], distribution key: [7] } - id: nexmark_q19 before: - - create_sources + - create_sources sql: | SELECT * FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY auction ORDER BY price DESC) AS rank_number FROM bid) @@ -1099,7 +1099,7 @@ └─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], time_range: [(Unbounded, Unbounded)] } - id: nexmark_q20 before: - - create_sources + - create_sources sql: | SELECT auction, bidder, price, channel, url, B.date_time as date_timeB, @@ -1160,7 +1160,7 @@ Table 4294967294 { columns: [auction, bidder, price, channel, url, date_timeb, item_name, description, initial_bid, reserve, date_timea, expires, seller, category, _row_id, _row_id#1, id], primary key: [$14 ASC, $15 ASC, $0 ASC, $16 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], distribution key: [0] } - id: nexmark_q21 before: - - create_sources + - create_sources sql: | SELECT auction, bidder, price, channel, @@ -1179,7 +1179,7 @@ Tracking issue: https://github.com/risingwavelabs/risingwave/issues/112 - id: nexmark_q22 before: - - create_sources + - create_sources sql: | SELECT auction, bidder, price, channel, @@ -1212,7 +1212,7 @@ Table 4294967294 { columns: [auction, bidder, price, channel, dir1, dir2, dir3, _row_id], primary key: [$7 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7], distribution key: [7] } - id: nexmark_q101 before: - - create_sources + - create_sources sql: | -- A self-made query that covers outer join. -- @@ -1284,7 +1284,7 @@ Table 4294967294 { columns: [auction_id, auction_item_name, current_highest_bid, _row_id, auction], primary key: [$3 ASC, $4 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [0] } - id: nexmark_q102 before: - - create_sources + - create_sources sql: | -- A self-made query that covers dynamic filter. -- @@ -1409,7 +1409,7 @@ Table 4294967294 { columns: [auction_id, auction_item_name, bid_count], primary key: [$0 ASC, $1 ASC], value indices: [0, 1, 2], distribution key: [0] } - id: nexmark_q103 before: - - create_sources + - create_sources sql: | -- A self-made query that covers semi join. -- @@ -1483,7 +1483,7 @@ Table 4294967294 { columns: [auction_id, auction_item_name, _row_id], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [0] } - id: nexmark_q104 before: - - create_sources + - create_sources sql: | -- A self-made query that covers anti join. -- @@ -1557,7 +1557,7 @@ Table 4294967294 { columns: [auction_id, auction_item_name, _row_id], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [0] } - id: nexmark_q105 before: - - create_sources + - create_sources sql: | -- A self-made query that covers singleton top-n (and local-phase group top-n). -- diff --git a/src/frontend/planner_test/tests/testdata/share.yaml b/src/frontend/planner_test/tests/testdata/share.yaml index 5f2db0d1f7da5..fb85b3b225368 100644 --- a/src/frontend/planner_test/tests/testdata/share.yaml +++ b/src/frontend/planner_test/tests/testdata/share.yaml @@ -7,17 +7,17 @@ nexmark.table.type = 'Auction', nexmark.split.num = '4', nexmark.min.event.gap.in.ns = '1000' - ) row format json; + ); create source bid (auction BIGINT, bidder BIGINT, price BIGINT, "channel" VARCHAR, "url" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR) with ( connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '4', nexmark.min.event.gap.in.ns = '1000' - ) row format json; + ); - id: self_join before: - - create_sources + - create_sources sql: | select count(*) cnt from auction A join auction B on A.id = B.id where A.initial_bid = 1 and B.initial_bid = 2; batch_plan: | @@ -64,7 +64,7 @@ └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"] } - id: nexmark_q5 before: - - create_sources + - create_sources sql: | SELECT AuctionBids.auction, AuctionBids.num FROM ( SELECT diff --git a/src/source/src/source_desc.rs b/src/source/src/source_desc.rs index 29169b5a20dd6..6bea844fb9bb0 100644 --- a/src/source/src/source_desc.rs +++ b/src/source/src/source_desc.rs @@ -98,6 +98,7 @@ impl SourceDescBuilder { ProstRowFormatType::Avro => SourceFormat::Avro, ProstRowFormatType::Maxwell => SourceFormat::Maxwell, ProstRowFormatType::CanalJson => SourceFormat::CanalJson, + ProstRowFormatType::Native => SourceFormat::Native, _ => unreachable!(), }; diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index 33e011175cfe2..59faa9f1cc80a 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -41,8 +41,8 @@ formatted_ast: | CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: Protobuf(ProtobufSchema { message_name: AstString("Foo"), row_schema_location: AstString("http://"), use_schema_registry: true }), source_watermarks: [] } } -- input: CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, WATERMARK FOR auction AS auction - 1, "date_time" TIMESTAMP) with (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') ROW FORMAT JSON - formatted_sql: CREATE SOURCE bid (auction INT, bidder INT, price INT, "date_time" TIMESTAMP, WATERMARK FOR auction AS auction - 1) WITH (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') ROW FORMAT JSON +- input: CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, WATERMARK FOR auction AS auction - 1, "date_time" TIMESTAMP) with (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') + formatted_sql: CREATE SOURCE bid (auction INT, bidder INT, price INT, "date_time" TIMESTAMP, WATERMARK FOR auction AS auction - 1) WITH (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') formatted_ast: | CreateSource { stmt: CreateSourceStatement { if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some('"') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: Json, source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }] } } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index ece356878c8e6..d54372bec1b20 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -503,15 +503,15 @@ mod tests { let schema = Schema { fields: vec![Field::with_name(DataType::Int32, "sequence_int")], }; - let row_id_index = Some(0); + let row_id_index = None; let pk_column_ids = vec![0]; let pk_indices = vec![0]; let source_info = StreamSourceInfo { - row_format: ProstRowFormatType::Json as i32, + row_format: ProstRowFormatType::Native as i32, ..Default::default() }; let (barrier_tx, barrier_rx) = unbounded_channel::(); - let column_ids = vec![0, 1].into_iter().map(ColumnId::from).collect(); + let column_ids = vec![0].into_iter().map(ColumnId::from).collect(); // This datagen will generate 3 rows at one time. let properties: HashMap = convert_args!(hashmap!( @@ -598,7 +598,7 @@ mod tests { let pk_column_ids = vec![0]; let pk_indices = vec![0_usize]; let source_info = StreamSourceInfo { - row_format: ProstRowFormatType::Json as i32, + row_format: ProstRowFormatType::Native as i32, ..Default::default() }; let properties = convert_args!(hashmap!( diff --git a/src/tests/simulation/src/nexmark/create_source.sql b/src/tests/simulation/src/nexmark/create_source.sql index 7ac5c2825d447..8427779980f5b 100644 --- a/src/tests/simulation/src/nexmark/create_source.sql +++ b/src/tests/simulation/src/nexmark/create_source.sql @@ -13,7 +13,7 @@ with ( connector = 'nexmark', nexmark.table.type = 'Auction' {extra_args} -) row format JSON; +); create source bid ( auction BIGINT, @@ -27,7 +27,7 @@ with ( connector = 'nexmark', nexmark.table.type = 'Bid' {extra_args} -) row format JSON; +); create source person ( id BIGINT, @@ -42,4 +42,4 @@ with ( connector = 'nexmark', nexmark.table.type = 'Person' {extra_args} -) row format JSON; +);