Skip to content

Commit

Permalink
feat(streaming): call may_exist when insert cache miss in join executor
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Feb 16, 2023
1 parent ff6db4a commit 03c26b0
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 8 deletions.
6 changes: 5 additions & 1 deletion grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,11 @@ def section_streaming_actors(outer_panels):
),
panels.target(
f"rate({metric('stream_join_insert_cache_miss_count')}[$__rate_interval])",
"cache miss when insert{{actor_id}} {{side}}",
"cache miss when insert {{actor_id}} {{side}}",
),
panels.target(
f"rate({metric('stream_join_may_exist_true_count')}[$__rate_interval])",
"may_exist true when insert {{actor_id}} {{side}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

34 changes: 31 additions & 3 deletions src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ pub struct JoinHashMapMetrics {
total_lookup_count: usize,
/// How many times have we miss the cache when insert row
insert_cache_miss_count: usize,
may_exist_true_count: usize,
}

impl JoinHashMapMetrics {
Expand All @@ -171,6 +172,7 @@ impl JoinHashMapMetrics {
lookup_miss_count: 0,
total_lookup_count: 0,
insert_cache_miss_count: 0,
may_exist_true_count: 0,
}
}

Expand All @@ -187,9 +189,14 @@ impl JoinHashMapMetrics {
.join_insert_cache_miss_count
.with_label_values(&[&self.actor_id, self.side])
.inc_by(self.insert_cache_miss_count as u64);
self.metrics
.join_may_exist_true_count
.with_label_values(&[&self.actor_id, self.side])
.inc_by(self.may_exist_true_count as u64);
self.total_lookup_count = 0;
self.lookup_miss_count = 0;
self.insert_cache_miss_count = 0;
self.may_exist_true_count = 0;
}
}

Expand Down Expand Up @@ -417,11 +424,22 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
// Update cache
entry.insert(pk, value.encode());
} else if self.pk_contained_in_jk {
// Refill cache when the join key exist in neither cache or storage.
// Refill cache when the join key contains primary key.
self.metrics.insert_cache_miss_count += 1;
let mut state = JoinEntryState::default();
state.insert(pk, value.encode());
self.update_state(key, state.into());
} else {
let prefix = key.deserialize(&self.join_key_data_types)?;
self.metrics.insert_cache_miss_count += 1;
// Refill cache when the join key exist in neither cache or storage.
if !self.state.table.may_exist(&prefix).await? {
let mut state = JoinEntryState::default();
state.insert(pk, value.encode());
self.update_state(key, state.into());
} else {
self.metrics.may_exist_true_count += 1;
}
}

// Update the flush buffer.
Expand All @@ -433,7 +451,6 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {

/// Insert a row.
/// Used when the side does not need to update degree.
#[allow(clippy::unused_async)]
pub async fn insert_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> {
let join_row = JoinRow::new(&value, 0);
let pk = (&value)
Expand All @@ -443,11 +460,22 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
// Update cache
entry.insert(pk, join_row.encode());
} else if self.pk_contained_in_jk {
// Refill cache when the join key exist in neither cache or storage.
// Refill cache when the join key contains primary key.
self.metrics.insert_cache_miss_count += 1;
let mut state = JoinEntryState::default();
state.insert(pk, join_row.encode());
self.update_state(key, state.into());
} else {
let prefix = key.deserialize(&self.join_key_data_types)?;
self.metrics.insert_cache_miss_count += 1;
// Refill cache when the join key exist in neither cache or storage.
if !self.state.table.may_exist(&prefix).await? {
let mut state = JoinEntryState::default();
state.insert(pk, join_row.encode());
self.update_state(key, state.into());
} else {
self.metrics.may_exist_true_count += 1;
}
}

// Update the flush buffer.
Expand Down
16 changes: 13 additions & 3 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct StreamingMetrics {
pub join_lookup_miss_count: GenericCounterVec<AtomicU64>,
pub join_total_lookup_count: GenericCounterVec<AtomicU64>,
pub join_insert_cache_miss_count: GenericCounterVec<AtomicU64>,
pub join_may_exist_true_count: GenericCounterVec<AtomicU64>,
pub join_actor_input_waiting_duration_ns: GenericCounterVec<AtomicU64>,
pub join_match_duration_ns: GenericCounterVec<AtomicU64>,
pub join_barrier_align_duration: HistogramVec,
Expand Down Expand Up @@ -263,23 +264,31 @@ impl StreamingMetrics {

let join_lookup_miss_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_miss_count",
"Join executor lookup miss duration",
"Join executor lookup miss count",
&["actor_id", "side"],
registry
)
.unwrap();

let join_total_lookup_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_total_count",
"Join executor lookup total operation",
"Join executor lookup total count",
&["actor_id", "side"],
registry
)
.unwrap();

let join_insert_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_join_insert_cache_miss_count",
"Join executor cache miss when insert operation",
"Count of cache miss when insert rows in join executor",
&["actor_id", "side"],
registry
)
.unwrap();

let join_may_exist_true_count = register_int_counter_vec_with_registry!(
"stream_join_may_exist_true_count",
"Count of may_exist's true returns of when insert rows in join executor",
&["actor_id", "side"],
registry
)
Expand Down Expand Up @@ -457,6 +466,7 @@ impl StreamingMetrics {
join_lookup_miss_count,
join_total_lookup_count,
join_insert_cache_miss_count,
join_may_exist_true_count,
join_actor_input_waiting_duration_ns,
join_match_duration_ns,
join_barrier_align_duration,
Expand Down

0 comments on commit 03c26b0

Please sign in to comment.