Skip to content

Commit

Permalink
test(stream): add ci for split change mutation in source (risingwavel…
Browse files Browse the repository at this point in the history
…abs#3039)

* stage

Signed-off-by: tabVersion <[email protected]>

* stage

Signed-off-by: tabVersion <[email protected]>

* add test

Signed-off-by: tabVersion <[email protected]>

* change e2e to datagen

Signed-off-by: tabVersion <[email protected]>

* stage

Signed-off-by: tabVersion <[email protected]>

* some bug to fix

Signed-off-by: tabVersion <[email protected]>

* fix async issue

Signed-off-by: tabVersion <[email protected]>

* add assert

Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Jun 20, 2022
1 parent 04fe6d6 commit 5d2bb42
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 73 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ itertools = "0.10"
lazy_static = "1"
log = "0.4"
madsim = "=0.2.0-alpha.3"
maplit = "1.0.2"
memcomparable = { path = "../utils/memcomparable" }
num-traits = "0.2"
paste = "1"
Expand All @@ -29,6 +30,7 @@ prost = "0.10"
rdkafka = { version = "0.28", features = ["cmake-build"] }
risingwave_batch = { path = "../batch" }
risingwave_common = { path = "../common" }
risingwave_connector = { path = "../connector" }
risingwave_pb = { path = "../prost" }
risingwave_rpc_client = { path = "../rpc_client" }
risingwave_source = { path = "../source" }
Expand Down
14 changes: 12 additions & 2 deletions src/connector/src/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,20 @@ impl SplitReader for DatagenSplitReader {
.get(&random_seed_key)
.map(|s| s.to_string())
{
Some(seed) => seed.parse::<u64>().unwrap_or(split_index),
Some(seed) => {
match seed.parse::<u64>() {
// we use given seed xor split_index to make sure every split has different
// seed
Ok(seed) => seed ^ split_index,
Err(e) => {
log::warn!("cannot parse {:?} to u64 due to {:?}, will use {:?} as random seed", seed, e, split_index);
split_index
}
}
}
None => split_index,
};
match column.data_type{
match column.data_type {
DataType::Timestamp => {
let max_past_key = format!("fields.{}.max_past", name);
let max_past_value =
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/datagen/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use crate::base::SplitMetaData;

#[derive(Clone, Serialize, Deserialize, Debug, Default, PartialEq, Hash)]
pub struct DatagenSplit {
pub(crate) split_index: i32,
pub(crate) split_num: i32,
pub(crate) start_offset: Option<u64>,
pub split_index: i32,
pub split_num: i32,
pub start_offset: Option<u64>,
}

impl SplitMetaData for DatagenSplit {
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
extern crate core;

pub mod base;
mod datagen;
pub mod datagen;
mod filesystem;
mod kafka;
pub mod kafka;
pub mod kinesis;
mod nexmark;
mod pulsar;
Expand Down
1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ itertools = "0.10"
lazy_static = "1"
log = "0.4"
madsim = "=0.2.0-alpha.3"
maplit = "1.0.2"
memcomparable = { path = "../utils/memcomparable" }
num-traits = "0.2"
parking_lot = "0.12"
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ pub trait ExprFn = Fn(&DataChunk) -> Result<Bitmap> + Send + Sync + 'static;

#[derive(Debug, PartialEq, Clone, Default)]
pub struct AddOutput {
map: HashMap<(ActorId, DispatcherId), Vec<ActorInfo>>,
splits: HashMap<ActorId, Vec<SplitImpl>>,
pub map: HashMap<(ActorId, DispatcherId), Vec<ActorInfo>>,
pub splits: HashMap<ActorId, Vec<SplitImpl>>,
}

#[derive(Debug, Clone, PartialEq)]
Expand Down
Loading

0 comments on commit 5d2bb42

Please sign in to comment.