Skip to content

Commit

Permalink
Merge branch 'main' into zhidong/refactor-full-key
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 11, 2022
2 parents d1e975c + 78fd8a0 commit c733124
Show file tree
Hide file tree
Showing 17 changed files with 271 additions and 139 deletions.
16 changes: 8 additions & 8 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ pub fn spawn_data_generation_stream<T: Send + 'static>(
stream: impl Stream<Item = T> + Send + 'static,
buffer_size: usize,
) -> impl Stream<Item = T> + Send + 'static {
static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("risingwave-data-generation")
.enable_all()
.build()
.expect("failed to build data-generation runtime")
});

let (generation_tx, generation_rx) = mpsc::channel(buffer_size);
RUNTIME.spawn(async move {
pin_mut!(stream);
Expand All @@ -209,14 +217,6 @@ pub fn spawn_data_generation_stream<T: Send + 'static>(
tokio_stream::wrappers::ReceiverStream::new(generation_rx)
}

static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("risingwave-data-generation")
.enable_all()
.build()
.expect("failed to build data-generation runtime")
});

#[cfg(test)]
mod tests {
use maplit::*;
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/nexmark/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,6 @@ impl NexmarkEventGenerator {
}
}

tracing::debug!(?self.event_type, "nexmark generator finished");
tracing::debug!(?self.event_type, self.split_index, "nexmark generator finished");
}
}
47 changes: 25 additions & 22 deletions src/connector/src/source/nexmark/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use futures::StreamExt;
use itertools::Itertools;

use crate::source::nexmark::config::NexmarkConfig;
use crate::source::nexmark::source::event::EventType;
use crate::source::nexmark::source::generator::NexmarkEventGenerator;
use crate::source::nexmark::{NexmarkProperties, NexmarkSplit};
use crate::source::{
spawn_data_generation_stream, BoxSourceStream, Column, ConnectorState, SplitImpl,
SplitMetaData, SplitReader,
spawn_data_generation_stream, BoxSourceStream, Column, ConnectorState, SplitMetaData,
SplitReader,
};

#[derive(Debug)]
Expand Down Expand Up @@ -83,20 +84,18 @@ impl SplitReader for NexmarkSplitReader {

if let Some(splits) = state {
tracing::debug!("Splits for nexmark found! {:?}", splits);
for split in splits {
// TODO: currently, assume there's only one split in one reader
let split_id = split.id();
if let SplitImpl::Nexmark(n) = split {
generator.split_index = n.split_index;
generator.split_num = n.split_num;
if let Some(s) = n.start_offset {
generator.events_so_far = s;
};
generator.split_id = split_id;
assigned_split = n;
break;
}
}
// TODO: currently, assume there's only one split in one reader
let split = splits.into_iter().exactly_one().unwrap();
let split_id = split.id();
let split = split.into_nexmark().unwrap();

generator.split_index = split.split_index;
generator.split_num = split.split_num;
if let Some(s) = split.start_offset {
generator.events_so_far = s;
};
generator.split_id = split_id;
assigned_split = split;
}

Ok(Self {
Expand Down Expand Up @@ -131,18 +130,22 @@ mod tests {
});

let mut enumerator = NexmarkSplitEnumerator::new(props.clone()).await?;
let list_splits_resp = enumerator
let list_splits_resp: Vec<SplitImpl> = enumerator
.list_splits()
.await?
.into_iter()
.map(SplitImpl::Nexmark)
.collect();

let state = Some(list_splits_resp);
let mut reader = NexmarkSplitReader::new(props, state, None)
.await?
.into_stream();
let _chunk = reader.next().await.unwrap()?;
assert_eq!(list_splits_resp.len(), 2);

for split in list_splits_resp {
let state = Some(vec![split]);
let mut reader = NexmarkSplitReader::new(props.clone(), state, None)
.await?
.into_stream();
let _chunk = reader.next().await.unwrap()?;
}

Ok(())
}
Expand Down
4 changes: 1 addition & 3 deletions src/storage/src/hummock/sstable/backward_sstable_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ impl HummockIterator for BackwardSstableIterator {
self.stats.total_key_count += 1;
async move {
let block_iter = self.block_iter.as_mut().expect("no block iter");
block_iter.prev();

if block_iter.is_valid() {
if block_iter.try_prev() {
Ok(())
} else {
// seek to the previous block
Expand Down
48 changes: 40 additions & 8 deletions src/storage/src/hummock/sstable/block_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,21 @@ impl BlockIterator {
self.next_inner();
}

pub fn try_next(&mut self) -> bool {
assert!(self.is_valid());
self.try_next_inner()
}

pub fn prev(&mut self) {
assert!(self.is_valid());
self.prev_inner();
}

pub fn try_prev(&mut self) -> bool {
assert!(self.is_valid());
self.try_prev_inner()
}

pub fn key(&self) -> &[u8] {
assert!(self.is_valid());
&self.key[..]
Expand Down Expand Up @@ -107,14 +117,24 @@ impl BlockIterator {
self.entry_len = 0;
}

/// Moves to the next entry.
/// Moving to the next entry
///
/// Note: Ensures that the current state is valid.
/// Note: The current state may be invalid if there is no more data to read
fn next_inner(&mut self) {
if !self.try_next_inner() {
self.invalidate();
}
}

/// Try moving to the next entry.
///
/// The current state will still be valid if there is no more data to read.
///
/// Return: true is the iterator is advanced and false otherwise.
fn try_next_inner(&mut self) -> bool {
let offset = self.offset + self.entry_len;
if offset >= self.block.len() {
self.invalidate();
return;
return false;
}
let prefix = self.decode_prefix_at(offset);
self.key.truncate(prefix.overlap_len());
Expand All @@ -128,6 +148,7 @@ impl BlockIterator {
{
self.restart_point_index += 1;
}
true
}

/// Moves forward until reaching the first that equals or larger than the given `key`.
Expand Down Expand Up @@ -156,20 +177,31 @@ impl BlockIterator {
}
}

/// Moves to the previous entry.
/// Moving to the previous entry
///
/// Note: Ensure that the current state is valid.
/// Note: The current state may be invalid if there is no more data to read
fn prev_inner(&mut self) {
if self.offset == 0 {
if !self.try_prev_inner() {
self.invalidate();
return;
}
}

/// Try moving to the previous entry.
///
/// The current state will still be valid if there is no more data to read.
///
/// Return: true is the iterator is advanced and false otherwise.
fn try_prev_inner(&mut self) -> bool {
if self.offset == 0 {
return false;
}
if self.block.restart_point(self.restart_point_index) as usize == self.offset {
self.restart_point_index -= 1;
}
let origin_offset = self.offset;
self.seek_restart_point_by_index(self.restart_point_index);
self.next_until_prev_offset(origin_offset);
true
}

/// Decodes [`KeyPrefix`] at given offset.
Expand Down
4 changes: 1 addition & 3 deletions src/storage/src/hummock/sstable/forward_sstable_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ impl HummockIterator for SstableIterator {
self.stats.total_key_count += 1;
async move {
let block_iter = self.block_iter.as_mut().expect("no block iter");
block_iter.next();

if block_iter.is_valid() {
if block_iter.try_next() {
Ok(())
} else {
// seek to next block
Expand Down
38 changes: 14 additions & 24 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,6 @@ const fn is_right_semi_or_anti(join_type: JoinTypePrimitive) -> bool {
join_type == JoinType::RightSemi || join_type == JoinType::RightAnti
}

const fn need_update_side_matched_degree(
join_type: JoinTypePrimitive,
side_type: SideTypePrimitive,
) -> bool {
only_forward_matched_side(join_type, side_type) || outer_side_null(join_type, side_type)
}

const fn need_update_side_update_degree(
join_type: JoinTypePrimitive,
side_type: SideTypePrimitive,
) -> bool {
forward_exactly_once(join_type, side_type) || is_outer_side(join_type, side_type)
}

const fn need_left_degree(join_type: JoinTypePrimitive) -> bool {
join_type == FullOuter
|| join_type == LeftOuter
Expand Down Expand Up @@ -176,6 +162,8 @@ struct JoinSide<K: HashKey, S: StateStore> {
start_pos: usize,
/// The mapping from input indices of a side to output columes.
i2o_mapping: Vec<(usize, usize)>,
/// Whether degree table is needed for this side.
need_degree_table: bool,
}

impl<K: HashKey, S: StateStore> std::fmt::Debug for JoinSide<K, S> {
Expand Down Expand Up @@ -538,8 +526,8 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
null_matched
};

let need_degree_table_l = need_left_degree(T) && !pk_contained_l;
let need_degree_table_r = need_right_degree(T) && !pk_contained_r;
let need_degree_table_l = need_left_degree(T) && !pk_contained_r;
let need_degree_table_r = need_right_degree(T) && !pk_contained_l;

let (left_to_output, right_to_output) = {
let (left_len, right_len) = if is_left_semi_or_anti(T) {
Expand Down Expand Up @@ -580,6 +568,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
pk_indices: state_pk_indices_l,
start_pos: 0,
i2o_mapping: left_to_output,
need_degree_table: need_degree_table_l,
},
side_r: JoinSide {
ht: JoinHashMap::new(
Expand All @@ -603,6 +592,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
pk_indices: state_pk_indices_r,
start_pos: side_l_column_n,
i2o_mapping: right_to_output,
need_degree_table: need_degree_table_r,
},
pk_indices,
cond,
Expand Down Expand Up @@ -777,8 +767,8 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
async fn eq_join_oneside<'a, const SIDE: SideTypePrimitive>(
ctx: &'a ActorContextRef,
identity: &'a str,
mut side_l: &'a mut JoinSide<K, S>,
mut side_r: &'a mut JoinSide<K, S>,
side_l: &'a mut JoinSide<K, S>,
side_r: &'a mut JoinSide<K, S>,
actual_output_data_types: &'a [DataType],
cond: &'a mut Option<BoxedExpression>,
chunk: StreamChunk,
Expand All @@ -788,9 +778,9 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
let chunk = chunk.compact();

let (side_update, side_match) = if SIDE == SideType::Left {
(&mut side_l, &mut side_r)
(side_l, side_r)
} else {
(&mut side_r, &mut side_l)
(side_r, side_l)
};

let mut hashjoin_chunk_builder = HashJoinChunkBuilder::<T, SIDE> {
Expand Down Expand Up @@ -847,7 +837,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
yield Message::Chunk(chunk);
}
}
if need_update_side_matched_degree(T, SIDE) {
if side_match.need_degree_table {
side_match.ht.inc_degree(matched_row_ref)?;
matched_row.inc_degree();
}
Expand Down Expand Up @@ -883,7 +873,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
// one row if matched
let [row]: [_; 1] = append_only_matched_rows.try_into().unwrap();
side_match.ht.delete(key, row);
} else if need_update_side_update_degree(T, SIDE) {
} else if side_update.need_degree_table {
side_update.ht.insert(key, JoinRow::new(value, degree));
} else {
side_update.ht.insert_row(key, value);
Expand All @@ -898,7 +888,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
let mut matched_row = matched_row?;
if check_join_condition(&row, &matched_row.row)? {
degree += 1;
if need_update_side_matched_degree(T, SIDE) {
if side_match.need_degree_table {
side_match.ht.dec_degree(matched_row_ref)?;
matched_row.dec_degree()?;
}
Expand Down Expand Up @@ -929,7 +919,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
{
yield Message::Chunk(chunk);
}
if need_update_side_update_degree(T, SIDE) {
if side_update.need_degree_table {
side_update.ht.delete(key, JoinRow::new(value, degree));
} else {
side_update.ht.delete_row(key, value);
Expand Down
Loading

0 comments on commit c733124

Please sign in to comment.