Skip to content

Commit

Permalink
gen native StreamChunk
Browse files Browse the repository at this point in the history
  • Loading branch information
waruto210 committed Jan 31, 2023
1 parent c52e550 commit e6bf3cc
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 171 deletions.
6 changes: 3 additions & 3 deletions e2e_test/compaction/ingest_rows.slt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
statement ok
CREATE SOURCE person
(id INTEGER, name VARCHAR, "email_address" VARCHAR, "credit_card" VARCHAR, city VARCHAR, state VARCHAR, "date_time" TIMESTAMP)
(id BIGINT, name VARCHAR, "email_address" VARCHAR, "credit_card" VARCHAR, city VARCHAR, state VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Person',
Expand All @@ -9,7 +9,7 @@ with (
) ROW FORMAT JSON;

statement ok
CREATE SOURCE auction (id INTEGER, "item_name" VARCHAR, description VARCHAR, "initial_bid" INTEGER, reserve INTEGER, "date_time" TIMESTAMP, expires TIMESTAMP, seller INTEGER, category INTEGER)
CREATE SOURCE auction (id BIGINT, "item_name" VARCHAR, description VARCHAR, "initial_bid" BIGINT, reserve BIGINT, "date_time" TIMESTAMP, expires TIMESTAMP, seller BIGINT, category BIGINT, "extra" VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Auction',
Expand All @@ -18,7 +18,7 @@ with (
) ROW FORMAT JSON;

statement ok
CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, "date_time" TIMESTAMP)
CREATE SOURCE bid (auction BIGINT, bidder BIGINT, price BIGINT, "channel" VARCHAR, "url" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Bid',
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub type BoxSourceWithStateStream = BoxStream<'static, Result<StreamChunkWithSta
/// [`StreamChunkWithState`] returns stream chunk together with offset for each split. In the
/// current design, one connector source can have multiple split reader. The keys are unique
/// `split_id` and values are the latest offset for each split.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct StreamChunkWithState {
pub chunk: StreamChunk,
pub split_offset_mapping: Option<HashMap<SplitId, String>>,
Expand Down
83 changes: 39 additions & 44 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,23 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::Duration;

use anyhow::Result;
use bytes::Bytes;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::error::RwError;
use risingwave_common::field_generator::FieldGeneratorImpl;
use serde_json::Value;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::DataType;

use crate::source::{SourceMessage, SourceMeta, SplitId};
use crate::source::{SplitId, StreamChunkWithState};

pub struct DatagenEventGenerator {
fields_map: HashMap<String, FieldGeneratorImpl>,
// fields_map: HashMap<String, FieldGeneratorImpl>,
fields_vec: Vec<FieldGeneratorImpl>,
data_types: Vec<DataType>,
offset: u64,
split_id: SplitId,
partition_rows_per_second: u64,
Expand All @@ -37,7 +41,8 @@ pub struct DatagenMeta {

impl DatagenEventGenerator {
pub fn new(
fields_map: HashMap<String, FieldGeneratorImpl>,
fields_vec: Vec<FieldGeneratorImpl>,
data_types: Vec<DataType>,
rows_per_second: u64,
offset: u64,
split_id: SplitId,
Expand All @@ -50,14 +55,15 @@ impl DatagenEventGenerator {
rows_per_second / split_num
};
Ok(Self {
fields_map,
fields_vec,
data_types,
offset,
split_id,
partition_rows_per_second,
})
}

#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
#[try_stream(ok = StreamChunkWithState, error = RwError)]
pub async fn into_stream(mut self) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
const MAX_ROWS_PER_YIELD: u64 = 1024;
Expand All @@ -66,37 +72,30 @@ impl DatagenEventGenerator {
interval.tick().await;
let mut rows_generated_this_second = 0;
while rows_generated_this_second < self.partition_rows_per_second {
let mut msgs = vec![];
let mut rows = vec![];
let num_rows_to_generate = std::cmp::min(
MAX_ROWS_PER_YIELD,
self.partition_rows_per_second - rows_generated_this_second,
);
for _ in 0..num_rows_to_generate {
let value = Value::Object(
self.fields_map
.iter_mut()
.map(|(name, field_generator)| {
(name.to_string(), field_generator.generate(self.offset))
})
.collect(),
);
msgs.push(SourceMessage {
payload: Some(Bytes::from(value.to_string())),
offset: self.offset.to_string(),
split_id: self.split_id.clone(),
meta: SourceMeta::Datagen(DatagenMeta {
timestamp: Some(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64,
),
}),
});
let row = self
.fields_vec
.iter_mut()
.map(|field_generator| field_generator.generate_datum(self.offset))
.collect_vec();

rows.push((Op::Insert, OwnedRow::new(row)));
self.offset += 1;
rows_generated_this_second += 1;
}
yield msgs;
let chunk = StreamChunk::from_rows(&rows, &self.data_types);
let state = [(self.split_id.clone(), self.offset.to_string())]
.into_iter()
.collect();
yield StreamChunkWithState {
chunk,
split_offset_mapping: Some(state),
};
}
}
}
Expand All @@ -115,33 +114,29 @@ mod tests {
expected_length: usize,
) {
let split_id = format!("{}-{}", split_num, split_index).into();
let mut fields_map = HashMap::new();
fields_map.insert(
"v1".to_string(),
let data_types = vec![DataType::Int32, DataType::Float32];
let fields_vec = vec![
FieldGeneratorImpl::with_number_sequence(
risingwave_common::types::DataType::Int32,
data_types[0].clone(),
Some("1".to_string()),
Some("10".to_string()),
split_index,
split_num,
)
.unwrap(),
);

fields_map.insert(
"v2".to_string(),
FieldGeneratorImpl::with_number_sequence(
risingwave_common::types::DataType::Float32,
data_types[1].clone(),
Some("1".to_string()),
Some("10".to_string()),
split_index,
split_num,
)
.unwrap(),
);
];

let generator = DatagenEventGenerator::new(
fields_map,
fields_vec,
data_types,
rows_per_second,
0,
split_id,
Expand All @@ -157,7 +152,7 @@ mod tests {
.await
.unwrap()
.unwrap();
assert_eq!(expected_length, chunk.len());
assert_eq!(expected_length, chunk.chunk.cardinality());
}

#[tokio::test]
Expand Down
57 changes: 39 additions & 18 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,22 @@ use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use futures::StreamExt;
use itertools::zip_eq;
use risingwave_common::field_generator::FieldGeneratorImpl;

use super::generator::DatagenEventGenerator;
use crate::impl_common_split_reader_logic;
use crate::parser::ParserConfig;
use crate::source::data_gen_util::spawn_data_generation_stream;
use crate::source::datagen::source::SEQUENCE_FIELD_KIND;
use crate::source::datagen::{DatagenProperties, DatagenSplit};
use crate::source::monitor::SourceMetrics;
use crate::source::{
BoxSourceStream, Column, DataType, SourceInfo, SplitId, SplitImpl, SplitMetaData,
BoxSourceWithStateStream, Column, DataType, SourceInfo, SplitId, SplitImpl, SplitMetaData,
SplitReaderV2,
};

impl_common_split_reader_logic!(DatagenSplitReader, DatagenProperties);
// impl_common_split_reader_logic!(DatagenSplitReader, DatagenProperties);

pub struct DatagenSplitReader {
generator: DatagenEventGenerator,
Expand All @@ -45,7 +44,10 @@ pub struct DatagenSplitReader {
source_info: SourceInfo,
}

impl DatagenSplitReader {
#[async_trait]
impl SplitReaderV2 for DatagenSplitReader {
type Properties = DatagenProperties;

#[allow(clippy::unused_async)]
async fn new(
properties: DatagenProperties,
Expand Down Expand Up @@ -77,11 +79,12 @@ impl DatagenSplitReader {

let rows_per_second = properties.rows_per_second;
let fields_option_map = properties.fields;
let mut fields_map = HashMap::<String, FieldGeneratorImpl>::new();

// check columns
assert!(columns.as_ref().is_some());
let columns = columns.unwrap();
let mut fields_vec = Vec::with_capacity(columns.len());
let mut data_types = Vec::with_capacity(columns.len());

// parse field connector option to build FieldGeneratorImpl
// for example:
Expand All @@ -102,19 +105,22 @@ impl DatagenSplitReader {
// )

for column in columns {
let name = column.name.clone();
// let name = column.name.clone();
let data_type = column.data_type.clone();
let gen = generator_from_data_type(
column.data_type,
&fields_option_map,
&name,
&column.name,
split_index,
split_num,
)?;
fields_map.insert(name, gen);
fields_vec.push(gen);
data_types.push(data_type);
}

let generator = DatagenEventGenerator::new(
fields_map,
fields_vec,
data_types,
rows_per_second,
events_so_far,
split_id.clone(),
Expand All @@ -132,7 +138,7 @@ impl DatagenSplitReader {
})
}

pub(crate) fn into_data_stream(self) -> BoxSourceStream {
fn into_stream(self) -> BoxSourceWithStateStream {
// Will buffer at most 4 event chunks.
const BUFFER_SIZE: usize = 4;
spawn_data_generation_stream(self.generator.into_stream(), BUFFER_SIZE).boxed()
Expand Down Expand Up @@ -248,7 +254,10 @@ mod tests {
use std::sync::Arc;

use maplit::{convert_args, hashmap};
use risingwave_common::array::{Op, StructValue};
use risingwave_common::row::Row;
use risingwave_common::types::struct_type::StructType;
use risingwave_common::types::{ScalarImpl, ToDatumRef};

use super::*;

Expand Down Expand Up @@ -311,12 +320,23 @@ mod tests {
Some(mock_datum),
)
.await?
.into_data_stream();
.into_stream();

let msg = reader.next().await.unwrap().unwrap();
let stream_chunk = reader.next().await.unwrap().unwrap();
let (op, row) = stream_chunk.chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
assert_eq!(row.datum_at(0), Some(ScalarImpl::Int32(533)).to_datum_ref(),);
assert_eq!(
row.datum_at(1),
Some(ScalarImpl::Float32(533.148_86.into())).to_datum_ref(),
);
assert_eq!(row.datum_at(2), Some(ScalarImpl::Int32(1)).to_datum_ref());
assert_eq!(
std::str::from_utf8(msg[0].payload.as_ref().unwrap().as_ref()).unwrap(),
"{\"random_float\":533.1488647460938,\"random_int\":533,\"sequence_int\":1,\"struct\":{\"random_int\":1533}}"
row.datum_at(3),
Some(ScalarImpl::Struct(StructValue::new(vec![Some(
ScalarImpl::Int32(1533)
)])))
.to_datum_ref()
);

Ok(())
Expand Down Expand Up @@ -353,7 +373,7 @@ mod tests {
Some(mock_datum.clone()),
)
.await?
.into_data_stream();
.into_stream();

let v1 = stream.skip(1).next().await.unwrap()?;

Expand All @@ -371,10 +391,11 @@ mod tests {
Some(mock_datum),
)
.await?
.into_data_stream();
.into_stream();
let v2 = stream.next().await.unwrap()?;

assert_eq!(v1, v2);

Ok(())
}
}
Loading

0 comments on commit e6bf3cc

Please sign in to comment.