Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): throttling source based on storage stats #8049

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async fn test_table_materialize() -> StreamResult<()> {
None, // There is no external stream source.
Arc::new(StreamingMetrics::unused()),
barrier_rx,
u64::MAX,
vec![],
1,
);

Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/hummock/local_version/pinned_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ impl PinnedVersion {
pub fn version(&self) -> HummockVersion {
self.version.deref().clone()
}

pub fn version_ref(&self) -> &HummockVersion {
self.version.deref()
}
}

pub(crate) async fn start_pinned_version_worker(
Expand Down
11 changes: 11 additions & 0 deletions src/storage/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,17 @@ impl HummockStorage {
pub fn get_pinned_version(&self) -> PinnedVersion {
self.pinned_version.load().deref().deref().clone()
}

pub fn need_write_throttling(&self) -> bool {
Copy link
Contributor

@Li0k Li0k Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to distinguish between different groups of l0 ? , I think their write paths are independent

// TODO #7997 make it configurable
const MAX_L0_SUB_LEVEL_NUMBER: usize = 1000;
self.pinned_version
.load()
.version_ref()
.levels
.values()
.any(|levels| levels.l0.as_ref().unwrap().sub_levels.len() > MAX_L0_SUB_LEVEL_NUMBER)
}
}

#[cfg(any(test, feature = "test"))]
Expand Down
25 changes: 10 additions & 15 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ use risingwave_connector::source::{
use risingwave_source::source_desc::{FsSourceDesc, SourceDescBuilder};
use risingwave_storage::StateStore;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::time::Instant;

use super::executor_core::StreamSourceCore;
use crate::error::StreamResult;
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::source::reader::SourceReaderStream;
use crate::executor::throttler::SourceThrottlerImpl;
use crate::executor::*;

/// [`FsSourceExecutor`] is a streaming source, fir external file systems
/// such as s3.
pub struct FsSourceExecutor<S: StateStore> {
Expand All @@ -55,8 +56,7 @@ pub struct FsSourceExecutor<S: StateStore> {
/// Receiver of barrier channel.
barrier_receiver: Option<UnboundedReceiver<Barrier>>,

/// Expected barrier latency
expected_barrier_latency_ms: u64,
throttlers: Vec<SourceThrottlerImpl>,
}

impl<S: StateStore> FsSourceExecutor<S> {
Expand All @@ -68,7 +68,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
stream_source_core: StreamSourceCore<S>,
metrics: Arc<StreamingMetrics>,
barrier_receiver: UnboundedReceiver<Barrier>,
expected_barrier_latency_ms: u64,
throttlers: Vec<SourceThrottlerImpl>,
executor_id: u64,
) -> StreamResult<Self> {
Ok(Self {
Expand All @@ -79,7 +79,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
stream_source_core,
metrics,
barrier_receiver: Some(barrier_receiver),
expected_barrier_latency_ms,
throttlers,
})
}

Expand Down Expand Up @@ -336,18 +336,16 @@ impl<S: StateStore> FsSourceExecutor<S> {

yield Message::Barrier(barrier);

// We allow data to flow for 5 * `expected_barrier_latency_ms` milliseconds, considering
// some other latencies like network and cost in Meta.
let max_wait_barrier_time_ms = self.expected_barrier_latency_ms as u128 * 5;
let mut last_barrier_time = Instant::now();
let mut self_paused = false;
let mut metric_row_per_barrier: u64 = 0;
while let Some(msg) = stream.next().await {
match msg? {
// This branch will be preferred.
Either::Left(barrier) => {
last_barrier_time = Instant::now();
if self_paused {
for throttler in &mut self.throttlers {
throttler.on_barrier();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need multiple throttler in source exec

if self_paused && self.throttlers.iter().all(|t| !t.should_pause()) {
stream.resume_source();
self_paused = false;
}
Expand Down Expand Up @@ -386,10 +384,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
chunk,
split_offset_mapping,
}) => {
if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
// Exceeds the max wait barrier time, the source will be paused. Currently
// we can guarantee the source is not paused since it received stream
// chunks.
if !self_paused && self.throttlers.iter().any(|t| t.should_pause()) {
self_paused = true;
stream.pause_source();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add warn! logging to inform us/users that streaming is paused due to accumulated L0 SST files.

}
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ pub mod source_executor;

mod reader;
pub mod state_table_handler;
pub mod throttler;

pub use state_table_handler::*;
34 changes: 12 additions & 22 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@ use risingwave_connector::source::{
use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder};
use risingwave_storage::StateStore;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::time::Instant;

use super::executor_core::StreamSourceCore;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::source::reader::SourceReaderStream;
use crate::executor::throttler::SourceThrottlerImpl;
use crate::executor::*;

/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
/// some latencies in network and cost in meta.
const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;

pub struct SourceExecutor<S: StateStore> {
ctx: ActorContextRef,

Expand All @@ -53,8 +49,7 @@ pub struct SourceExecutor<S: StateStore> {
/// Receiver of barrier channel.
barrier_receiver: Option<UnboundedReceiver<Barrier>>,

/// Expected barrier latency.
expected_barrier_latency_ms: u64,
throttlers: Vec<SourceThrottlerImpl>,
}

impl<S: StateStore> SourceExecutor<S> {
Expand All @@ -66,7 +61,7 @@ impl<S: StateStore> SourceExecutor<S> {
stream_source_core: Option<StreamSourceCore<S>>,
metrics: Arc<StreamingMetrics>,
barrier_receiver: UnboundedReceiver<Barrier>,
expected_barrier_latency_ms: u64,
throttlers: Vec<SourceThrottlerImpl>,
executor_id: u64,
) -> Self {
Self {
Expand All @@ -77,7 +72,7 @@ impl<S: StateStore> SourceExecutor<S> {
stream_source_core,
metrics,
barrier_receiver: Some(barrier_receiver),
expected_barrier_latency_ms,
throttlers,
}
}

Expand Down Expand Up @@ -300,19 +295,17 @@ impl<S: StateStore> SourceExecutor<S> {

yield Message::Barrier(barrier);

// We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
// milliseconds, considering some other latencies like network and cost in Meta.
let max_wait_barrier_time_ms =
self.expected_barrier_latency_ms as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
let mut last_barrier_time = Instant::now();
// Whether the source is paused.
let mut self_paused = false;
let mut metric_row_per_barrier: u64 = 0;
while let Some(msg) = stream.next().await {
match msg? {
// This branch will be preferred.
Either::Left(barrier) => {
last_barrier_time = Instant::now();
if self_paused {
for throttler in &mut self.throttlers {
throttler.on_barrier();
}
if self_paused && self.throttlers.iter().all(|t| !t.should_pause()) {
stream.resume_source();
self_paused = false;
}
Expand Down Expand Up @@ -356,10 +349,7 @@ impl<S: StateStore> SourceExecutor<S> {
chunk,
split_offset_mapping,
}) => {
if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
// Exceeds the max wait barrier time, the source will be paused. Currently
// we can guarantee the source is not paused since it received stream
// chunks.
if !self_paused && self.throttlers.iter().any(|t| t.should_pause()) {
self_paused = true;
stream.pause_source();
}
Expand Down Expand Up @@ -551,7 +541,7 @@ mod tests {
Some(core),
Arc::new(StreamingMetrics::unused()),
barrier_rx,
u64::MAX,
vec![],
1,
);
let mut executor = Box::new(executor).execute();
Expand Down Expand Up @@ -643,7 +633,7 @@ mod tests {
Some(core),
Arc::new(StreamingMetrics::unused()),
barrier_rx,
u64::MAX,
vec![],
1,
);

Expand Down
70 changes: 70 additions & 0 deletions src/stream/src/executor/source/throttler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_storage::StateStoreImpl;
use tokio::time::Instant;

pub enum SourceThrottlerImpl {
MaxWaitBarrier(MaxWaitBarrierThrottler),
StateStore(StateStoreImpl),
}

/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
/// some latencies in network and cost in meta.
const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;

pub struct MaxWaitBarrierThrottler {
max_wait_barrier_time_ms: u128,
last_barrier_time: Instant,
}

impl MaxWaitBarrierThrottler {
pub fn new(barrier_interval_ms: u128) -> Self {
Self {
max_wait_barrier_time_ms: barrier_interval_ms * WAIT_BARRIER_MULTIPLE_TIMES,
last_barrier_time: Instant::now(),
}
}

fn should_pause(&self) -> bool {
// We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` *
// `expected_barrier_latency_ms` milliseconds, considering some
// other latencies like network and cost in Meta.
self.last_barrier_time.elapsed().as_millis() > self.max_wait_barrier_time_ms
}
}

impl SourceThrottlerImpl {
pub fn should_pause(&self) -> bool {
match self {
SourceThrottlerImpl::MaxWaitBarrier(inner) => inner.should_pause(),
SourceThrottlerImpl::StateStore(inner) => {
if let Some(hummock) = inner.as_hummock() {
return hummock.need_write_throttling();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any hint on when hummock needs throttle?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any hint on when hummock needs throttle?

when ingest_batch finds that there are too many L0 SSTs

}
false
}
}
}

pub fn on_barrier(&mut self) {
#[allow(clippy::single_match)]
match self {
SourceThrottlerImpl::MaxWaitBarrier(inner) => {
inner.last_barrier_time = Instant::now();
}
_ => {}
}
}
}
12 changes: 9 additions & 3 deletions src/stream/src/from_proto/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::*;
use crate::executor::source::StreamSourceCore;
use crate::executor::source_executor::SourceExecutor;
use crate::executor::state_table_handler::SourceStateTableHandler;
use crate::executor::throttler::{MaxWaitBarrierThrottler, SourceThrottlerImpl};
use crate::executor::FsSourceExecutor;

const FS_CONNECTORS: &[&str] = &["s3"];
Expand All @@ -44,6 +45,11 @@ impl ExecutorBuilder for SourceExecutorBuilder {
.lock_barrier_manager()
.register_sender(params.actor_context.id, sender);

let mut throttlers = vec![];
throttlers.push(SourceThrottlerImpl::MaxWaitBarrier(
MaxWaitBarrierThrottler::new(stream.config.barrier_interval_ms as u128),
));
throttlers.push(SourceThrottlerImpl::StateStore(stream.state_store.clone()));
if let Some(source) = &node.source_inner {
let source_id = TableId::new(source.source_id);
let source_name = source.source_name.clone();
Expand Down Expand Up @@ -108,7 +114,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
stream_source_core,
params.executor_stats,
barrier_receiver,
stream.config.barrier_interval_ms as u64,
throttlers,
params.executor_id,
)?))
} else {
Expand All @@ -119,7 +125,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
Some(stream_source_core),
params.executor_stats,
barrier_receiver,
stream.config.barrier_interval_ms as u64,
throttlers,
params.executor_id,
)))
}
Expand All @@ -133,7 +139,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
None,
params.executor_stats,
barrier_receiver,
stream.config.barrier_interval_ms as u64,
throttlers,
params.executor_id,
)))
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct LocalStreamManagerCore {
actor_monitor_tasks: HashMap<ActorId, ActorHandle>,

/// The state store implement
state_store: StateStoreImpl,
pub(crate) state_store: StateStoreImpl,

/// Metrics of the stream manager
pub(crate) streaming_metrics: Arc<StreamingMetrics>,
Expand Down