Skip to content

Commit

Permalink
feat(metric): support backfill throughput metrics (#8883)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Mar 30, 2023
1 parent f51404d commit 5d93d99
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 6 deletions.
20 changes: 20 additions & 0 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,26 @@ def section_streaming(panels):
)
]
),
panels.timeseries_rowsps(
"Backfill Snapshot Read Throughput(rows)",
"Total number of rows that have been read from the backfill snapshot",
[
panels.target(
f"rate({metric('stream_backfill_snapshot_read_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{instance}}"
),
],
),
panels.timeseries_rowsps(
"Backfill Upstream Throughput(rows)",
"Total number of rows that have been output from the backfill upstream",
[
panels.target(
f"rate({metric('stream_backfill_upstream_output_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{instance}}"
),
],
),
panels.timeseries_count(
"Barrier Number",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
.map(|&i| self.output_indices.iter().position(|&j| i == j))
.collect()
}

pub fn table_id(&self) -> TableId {
self.table_id
}
}

/// Point get
Expand Down
43 changes: 38 additions & 5 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::cmp::Ordering;
use std::ops::Bound;
use std::sync::Arc;

use await_tree::InstrumentAwait;
use either::Either;
Expand All @@ -34,6 +35,7 @@ use risingwave_storage::StateStore;

use super::error::StreamExecutorError;
use super::{expect_first_barrier, BoxedExecutor, Executor, ExecutorInfo, Message, PkIndicesRef};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{PkIndices, Watermark};
use crate::task::{ActorId, CreateMviewProgress};

Expand Down Expand Up @@ -72,6 +74,8 @@ pub struct BackfillExecutor<S: StateStore> {
actor_id: ActorId,

info: ExecutorInfo,

metrics: Arc<StreamingMetrics>,
}

const CHUNK_SIZE: usize = 1024;
Expand All @@ -87,6 +91,7 @@ where
progress: CreateMviewProgress,
schema: Schema,
pk_indices: PkIndices,
metrics: Arc<StreamingMetrics>,
) -> Self {
Self {
info: ExecutorInfo {
Expand All @@ -99,6 +104,7 @@ where
output_indices,
actor_id: progress.actor_id(),
progress,
metrics,
}
}

Expand All @@ -108,6 +114,8 @@ where
let pk_in_output_indices = self.table.pk_in_output_indices().unwrap();
let pk_order = self.table.pk_serializer().get_order_types();

let table_id = self.table.table_id().table_id;

let mut upstream = self.upstream.execute();

// Poll the upstream to get the first barrier.
Expand Down Expand Up @@ -153,8 +161,8 @@ where
// `None` means it starts from the beginning.
let mut current_pos: Option<OwnedRow> = None;

// Keep track of rows from the upstream and snapshot.
let mut processed_rows: u64 = 0;
// Keep track of rows from the snapshot.
let mut total_snapshot_processed_rows: u64 = 0;

// Backfill Algorithm:
//
Expand Down Expand Up @@ -196,6 +204,9 @@ where
stream::PollNext::Left
});

let mut cur_barrier_snapshot_processed_rows: u64 = 0;
let mut cur_barrier_upstream_processed_rows: u64 = 0;

#[for_await]
for either in backfill_stream {
match either {
Expand All @@ -207,6 +218,8 @@ where

// Consume upstream buffer chunk
for chunk in upstream_chunk_buffer.drain(..) {
cur_barrier_upstream_processed_rows +=
chunk.cardinality() as u64;
if let Some(current_pos) = &current_pos {
yield Message::Chunk(Self::mapping_chunk(
Self::mark_chunk(
Expand All @@ -220,13 +233,29 @@ where
}
}

self.metrics
.backfill_snapshot_read_row_count
.with_label_values(&[
table_id.to_string().as_str(),
self.actor_id.to_string().as_str(),
])
.inc_by(cur_barrier_snapshot_processed_rows);

self.metrics
.backfill_upstream_output_row_count
.with_label_values(&[
table_id.to_string().as_str(),
self.actor_id.to_string().as_str(),
])
.inc_by(cur_barrier_upstream_processed_rows);

// Update snapshot read epoch.
snapshot_read_epoch = barrier.epoch.prev;

self.progress.update(
barrier.epoch.curr,
snapshot_read_epoch,
processed_rows,
total_snapshot_processed_rows,
);

yield Message::Barrier(barrier);
Expand All @@ -251,7 +280,9 @@ where
// in the buffer. Here we choose to never mark the chunk.
// Consume with the renaming stream buffer chunk without mark.
for chunk in upstream_chunk_buffer.drain(..) {
processed_rows += chunk.cardinality() as u64;
let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows += chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
yield Message::Chunk(Self::mapping_chunk(
chunk,
&self.output_indices,
Expand All @@ -274,7 +305,9 @@ where
.project(&pk_in_output_indices)
.into_owned_row(),
);
processed_rows += chunk.cardinality() as u64;
let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows += chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
yield Message::Chunk(Self::mapping_chunk(
chunk,
&self.output_indices,
Expand Down
22 changes: 22 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub struct StreamingMetrics {
pub agg_chunk_lookup_miss_count: GenericCounterVec<AtomicU64>,
pub agg_chunk_total_lookup_count: GenericCounterVec<AtomicU64>,

// Backfill
pub backfill_snapshot_read_row_count: GenericCounterVec<AtomicU64>,
pub backfill_upstream_output_row_count: GenericCounterVec<AtomicU64>,

/// The duration from receipt of barrier to all actors collection.
/// And the max of all node `barrier_inflight_latency` is the latency for a barrier
/// to flow through the graph.
Expand Down Expand Up @@ -385,6 +389,22 @@ impl StreamingMetrics {
)
.unwrap();

let backfill_snapshot_read_row_count = register_int_counter_vec_with_registry!(
"stream_backfill_snapshot_read_row_count",
"Total number of rows that have been read from the backfill snapshot",
&["table_id", "actor_id"],
registry
)
.unwrap();

let backfill_upstream_output_row_count = register_int_counter_vec_with_registry!(
"stream_backfill_upstream_output_row_count",
"Total number of rows that have been output from the backfill upstream",
&["table_id", "actor_id"],
registry
)
.unwrap();

let opts = histogram_opts!(
"stream_barrier_inflight_duration_seconds",
"barrier_inflight_latency",
Expand Down Expand Up @@ -488,6 +508,8 @@ impl StreamingMetrics {
agg_cached_keys,
agg_chunk_lookup_miss_count,
agg_chunk_total_lookup_count,
backfill_snapshot_read_row_count,
backfill_upstream_output_row_count,
barrier_inflight_latency,
barrier_sync_latency,
sink_commit_duration,
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl ExecutorBuilder for ChainExecutorBuilder {
progress,
schema,
params.pk_indices,
stream.streaming_metrics.clone(),
)
.boxed()
}
Expand Down

0 comments on commit 5d93d99

Please sign in to comment.