Skip to content

Commit

Permalink
introduce native format
Browse files Browse the repository at this point in the history
  • Loading branch information
waruto210 committed Feb 10, 2023
1 parent df6b157 commit bf33c8d
Show file tree
Hide file tree
Showing 17 changed files with 516 additions and 140 deletions.
6 changes: 3 additions & 3 deletions e2e_test/compaction/ingest_rows.slt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
statement ok
CREATE SOURCE person
(id INTEGER, name VARCHAR, "email_address" VARCHAR, "credit_card" VARCHAR, city VARCHAR, state VARCHAR, "date_time" TIMESTAMP)
(id BIGINT, name VARCHAR, "email_address" VARCHAR, "credit_card" VARCHAR, city VARCHAR, state VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Person',
Expand All @@ -9,7 +9,7 @@ with (
) ROW FORMAT JSON;

statement ok
CREATE SOURCE auction (id INTEGER, "item_name" VARCHAR, description VARCHAR, "initial_bid" INTEGER, reserve INTEGER, "date_time" TIMESTAMP, expires TIMESTAMP, seller INTEGER, category INTEGER)
CREATE SOURCE auction (id BIGINT, "item_name" VARCHAR, description VARCHAR, "initial_bid" BIGINT, reserve BIGINT, "date_time" TIMESTAMP, expires TIMESTAMP, seller BIGINT, category BIGINT, "extra" VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Auction',
Expand All @@ -18,7 +18,7 @@ with (
) ROW FORMAT JSON;

statement ok
CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, "date_time" TIMESTAMP)
CREATE SOURCE bid (auction BIGINT, bidder BIGINT, price BIGINT, "channel" VARCHAR, "url" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Bid',
Expand Down
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@ enum RowFormatType {
MAXWELL = 5;
CANAL_JSON = 6;
CSV = 7;
Native = 8;
}
1 change: 1 addition & 0 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
RowFormatType::Avro => SourceFormat::Avro,
RowFormatType::Maxwell => SourceFormat::Maxwell,
RowFormatType::CanalJson => SourceFormat::CanalJson,
RowFormatType::Native => SourceFormat::Native,
_ => unreachable!(),
};
if format == SourceFormat::Protobuf && info.row_schema_location.is_empty() {
Expand Down
16 changes: 8 additions & 8 deletions src/common/src/field_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl FieldGeneratorImpl {
Ok(FieldGeneratorImpl::List(Box::new(field), list_length))
}

pub fn generate(&mut self, offset: u64) -> Value {
pub fn generate_json(&mut self, offset: u64) -> Value {
match self {
FieldGeneratorImpl::I16Sequence(f) => f.generate(),
FieldGeneratorImpl::I32Sequence(f) => f.generate(),
Expand All @@ -202,13 +202,13 @@ impl FieldGeneratorImpl {
FieldGeneratorImpl::Struct(fields) => {
let map = fields
.iter_mut()
.map(|(name, gen)| (name.clone(), gen.generate(offset)))
.map(|(name, gen)| (name.clone(), gen.generate_json(offset)))
.collect();
Value::Object(map)
}
FieldGeneratorImpl::List(field, list_length) => {
let vec = (0..*list_length)
.map(|_| field.generate(offset))
.map(|_| field.generate_json(offset))
.collect::<Vec<_>>();
Value::Array(vec)
}
Expand Down Expand Up @@ -269,7 +269,7 @@ mod tests {

for step in 0..5 {
for (index, i32_field) in i32_fields.iter_mut().enumerate() {
let value = i32_field.generate(0);
let value = i32_field.generate_json(0);
assert!(value.is_number());
let num = value.as_u64();
let expected_num = split_num * step + 1 + index as u64;
Expand Down Expand Up @@ -298,13 +298,13 @@ mod tests {
_ => FieldGeneratorImpl::with_number_random(data_type, None, None, seed).unwrap(),
};

let val1 = generator.generate(1);
let val2 = generator.generate(2);
let val1 = generator.generate_json(1);
let val2 = generator.generate_json(2);

assert_ne!(val1, val2);

let val1_new = generator.generate(1);
let val2_new = generator.generate(2);
let val1_new = generator.generate_json(1);
let val2_new = generator.generate_json(2);

assert_eq!(val1_new, val1);
assert_eq!(val2_new, val2);
Expand Down
18 changes: 18 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ impl ByteStreamSourceParserImpl {
DebeziumJsonParser::new(rw_columns).map(Self::DebeziumJson)
}
SpecificParserConfig::Maxwell => MaxwellParser::new(rw_columns).map(Self::Maxwell),
SpecificParserConfig::Native => {
unreachable!("Native parser should not be created")
}
}
}
}
Expand All @@ -375,9 +378,23 @@ pub enum SpecificParserConfig {
DebeziumJson,
Maxwell,
CanalJson,
Native,
}

impl SpecificParserConfig {
pub fn get_source_format(&self) -> SourceFormat {
match self {
SpecificParserConfig::Avro(_) => SourceFormat::Avro,
SpecificParserConfig::Csv(_) => SourceFormat::Csv,
SpecificParserConfig::Protobuf(_) => SourceFormat::Protobuf,
SpecificParserConfig::Json => SourceFormat::Json,
SpecificParserConfig::DebeziumJson => SourceFormat::DebeziumJson,
SpecificParserConfig::Maxwell => SourceFormat::Maxwell,
SpecificParserConfig::CanalJson => SourceFormat::CanalJson,
SpecificParserConfig::Native => SourceFormat::Native,
}
}

pub async fn new(
format: SourceFormat,
info: &StreamSourceInfo,
Expand Down Expand Up @@ -405,6 +422,7 @@ impl SpecificParserConfig {
SourceFormat::DebeziumJson => SpecificParserConfig::DebeziumJson,
SourceFormat::Maxwell => SpecificParserConfig::Maxwell,
SourceFormat::CanalJson => SpecificParserConfig::CanalJson,
SourceFormat::Native => SpecificParserConfig::Native,
_ => {
return Err(RwError::from(ProtocolError(
"invalid source format".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub enum SourceFormat {
Maxwell,
CanalJson,
Csv,
Native,
}

pub type BoxSourceStream = BoxStream<'static, Result<Vec<SourceMessage>>>;
Expand All @@ -109,7 +110,7 @@ pub type BoxSourceWithStateStream = BoxStream<'static, Result<StreamChunkWithSta
/// [`StreamChunkWithState`] returns stream chunk together with offset for each split. In the
/// current design, one connector source can have multiple split reader. The keys are unique
/// `split_id` and values are the latest offset for each split.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct StreamChunkWithState {
pub chunk: StreamChunk,
pub split_offset_mapping: Option<HashMap<SplitId, String>>,
Expand Down
151 changes: 114 additions & 37 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,27 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::Result;
use bytes::Bytes;
use futures_async_stream::try_stream;
use maplit::hashmap;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::error::RwError;
use risingwave_common::field_generator::FieldGeneratorImpl;
use serde_json::Value;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;

use crate::source::{SourceMessage, SourceMeta, SplitId};
use crate::source::{SourceFormat, SourceMessage, SourceMeta, SplitId, StreamChunkWithState};

pub struct DatagenEventGenerator {
fields_map: HashMap<String, FieldGeneratorImpl>,
// fields_map: HashMap<String, FieldGeneratorImpl>,
field_names: Vec<String>,
fields_vec: Vec<FieldGeneratorImpl>,
source_format: SourceFormat,
data_types: Vec<DataType>,
offset: u64,
split_id: SplitId,
partition_rows_per_second: u64,
Expand All @@ -36,8 +44,12 @@ pub struct DatagenMeta {
}

impl DatagenEventGenerator {
#[allow(clippy::too_many_arguments)]
pub fn new(
fields_map: HashMap<String, FieldGeneratorImpl>,
fields_vec: Vec<FieldGeneratorImpl>,
field_names: Vec<String>,
source_format: SourceFormat,
data_types: Vec<DataType>,
rows_per_second: u64,
offset: u64,
split_id: SplitId,
Expand All @@ -50,15 +62,18 @@ impl DatagenEventGenerator {
rows_per_second / split_num
};
Ok(Self {
fields_map,
field_names,
fields_vec,
source_format,
data_types,
offset,
split_id,
partition_rows_per_second,
})
}

#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
pub async fn into_stream(mut self) {
#[try_stream(boxed, ok = Vec<SourceMessage>, error = anyhow::Error)]
pub async fn into_msg_stream(mut self) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
const MAX_ROWS_PER_YIELD: u64 = 1024;
let mut reach_end = false;
Expand All @@ -73,24 +88,34 @@ impl DatagenEventGenerator {
);
let mut msgs = Vec::with_capacity(num_rows_to_generate as usize);
'outer: for _ in 0..num_rows_to_generate {
let mut fields = serde_json::Map::with_capacity(self.fields_map.len());
for (name, field_generator) in &mut self.fields_map {
let field = field_generator.generate(self.offset);
// sequence generator will return None when it reaches the end
if field.is_null() {
tracing::info!(
"datagen split {} stop generate, offset {}",
self.split_id,
self.offset
);
reach_end = true;
break 'outer;
let payload = match self.source_format {
SourceFormat::Json => {
let mut map = serde_json::Map::with_capacity(self.fields_vec.len());
for (name, field_generator) in self
.field_names
.iter()
.zip_eq_fast(self.fields_vec.iter_mut())
{
let value = field_generator.generate_json(self.offset);
if value.is_null() {
reach_end = true;
tracing::info!(
"datagen split {} stop generate, offset {}",
self.split_id,
self.offset
);
break 'outer;
}
map.insert(name.clone(), value);
}
Bytes::from(serde_json::Value::from(map).to_string())
}
fields.insert(name.to_string(), field);
}
let value = Value::Object(fields);
_ => {
unimplemented!("only json format is supported for now")
}
};
msgs.push(SourceMessage {
payload: Some(Bytes::from(value.to_string())),
payload: Some(payload),
offset: self.offset.to_string(),
split_id: self.split_id.clone(),
meta: SourceMeta::Datagen(DatagenMeta {
Expand All @@ -115,6 +140,60 @@ impl DatagenEventGenerator {
}
}
}

#[try_stream(ok = StreamChunkWithState, error = RwError)]
pub async fn into_native_stream(mut self) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
const MAX_ROWS_PER_YIELD: u64 = 1024;
let mut reach_end = false;
loop {
// generate `partition_rows_per_second` rows per second
interval.tick().await;
let mut rows_generated_this_second = 0;
while rows_generated_this_second < self.partition_rows_per_second {
let mut rows = vec![];
let num_rows_to_generate = std::cmp::min(
MAX_ROWS_PER_YIELD,
self.partition_rows_per_second - rows_generated_this_second,
);
'outer: for _ in 0..num_rows_to_generate {
let mut row = Vec::with_capacity(self.fields_vec.len());
for field_generator in &mut self.fields_vec {
let datum = field_generator.generate_datum(self.offset);
if datum.is_none() {
reach_end = true;
tracing::info!(
"datagen split {} stop generate, offset {}",
self.split_id,
self.offset
);
break 'outer;
}
row.push(datum);
}

rows.push((Op::Insert, OwnedRow::new(row)));
self.offset += 1;
rows_generated_this_second += 1;
}

if !rows.is_empty() {
let chunk = StreamChunk::from_rows(&rows, &self.data_types);
let mapping = hashmap! {
self.split_id.clone() => (self.offset - 1).to_string()
};
yield StreamChunkWithState {
chunk,
split_offset_mapping: Some(mapping),
};
}

if reach_end {
return Ok(());
}
}
}
}
}

#[cfg(test)]
Expand All @@ -132,33 +211,32 @@ mod tests {
let split_id = format!("{}-{}", split_num, split_index).into();
let start = 1;
let end = 10;
let mut fields_map = HashMap::new();
fields_map.insert(
"v1".to_string(),

let data_types = vec![DataType::Int32, DataType::Float32];
let fields_vec = vec![
FieldGeneratorImpl::with_number_sequence(
risingwave_common::types::DataType::Int32,
data_types[0].clone(),
Some(start.to_string()),
Some(end.to_string()),
split_index,
split_num,
)
.unwrap(),
);

fields_map.insert(
"v2".to_string(),
FieldGeneratorImpl::with_number_sequence(
risingwave_common::types::DataType::Float32,
data_types[1].clone(),
Some(start.to_string()),
Some(end.to_string()),
split_index,
split_num,
)
.unwrap(),
);
];

let generator = DatagenEventGenerator::new(
fields_map,
fields_vec,
vec!["c1".to_owned(), "c2".to_owned()],
SourceFormat::Json,
data_types,
rows_per_second,
0,
split_id,
Expand All @@ -167,13 +245,12 @@ mod tests {
)
.unwrap();

let mut stream = generator.into_stream().boxed();
let mut stream = generator.into_msg_stream().boxed();

let chunk = stream.next().await.unwrap().unwrap();
assert_eq!(expected_length, chunk.len());

let empty_chunk = stream.next().await;
println!("empty_chunk: {:?}", empty_chunk);
if rows_per_second >= (end - start + 1) {
assert!(empty_chunk.is_none());
} else {
Expand Down
Loading

0 comments on commit bf33c8d

Please sign in to comment.