From cdaa401f5157b3688698449657a226ac1edcc65f Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Thu, 23 Feb 2023 11:44:21 +0800 Subject: [PATCH 1/5] for stream --- src/common/src/config.rs | 8 ++++++ src/stream/src/executor/actor.rs | 38 +++++++++++++-------------- src/stream/src/task/stream_manager.rs | 1 + 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 8afb1dc8f0822..90123abc0974a 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -252,6 +252,10 @@ pub struct StreamingConfig { #[serde(default)] pub developer: DeveloperConfig, + + /// Max unique user stream errors per actor + #[serde(default = "default::streaming::unique_user_stream_errors")] + pub unique_user_stream_errors: usize, } impl Default for StreamingConfig { @@ -636,6 +640,10 @@ mod default { pub fn async_stack_trace() -> AsyncStackTraceOption { AsyncStackTraceOption::On } + + pub fn unique_user_stream_errors() -> usize { + 10 + } } pub mod file_cache { diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 6c4a941dfa35f..5772f11251821 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -23,6 +22,7 @@ use hytra::TrAdder; use minitrace::prelude::*; use parking_lot::Mutex; use risingwave_common::util::epoch::EpochPair; +use risingwave_common::error::ErrorSuppressor; use risingwave_expr::ExprError; use tokio_stream::StreamExt; @@ -37,13 +37,11 @@ pub struct ActorContext { pub id: ActorId, pub fragment_id: u32, - // TODO: report errors and prompt the user. - pub errors: Mutex>>, - last_mem_val: Arc, cur_mem_val: Arc, total_mem_val: Arc>, streaming_metrics: Arc, + pub error_suppressor: Arc> } pub type ActorContextRef = Arc; @@ -53,11 +51,11 @@ impl ActorContext { Arc::new(Self { id, fragment_id: 0, - errors: Default::default(), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val: Arc::new(TrAdder::new()), streaming_metrics: Arc::new(StreamingMetrics::unused()), + error_suppressor: Arc::new(Mutex::new(ErrorSuppressor::new(10))), }) } @@ -66,35 +64,35 @@ impl ActorContext { fragment_id: u32, total_mem_val: Arc>, streaming_metrics: Arc, + unique_user_errors: usize, ) -> ActorContextRef { Arc::new(Self { id, fragment_id, - errors: Default::default(), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val, streaming_metrics, + error_suppressor: Arc::new(Mutex::new(ErrorSuppressor::new(unique_user_errors))) }) } pub fn on_compute_error(&self, err: ExprError, identity: &str) { tracing::error!("Compute error: {}, executor: {identity}", err); let executor_name = identity.split(' ').next().unwrap_or("name_not_found"); - self.streaming_metrics - .user_compute_error_count - .with_label_values(&[ - "ExprError", - &err.to_string(), - executor_name, - &self.fragment_id.to_string(), - ]) - .inc(); - self.errors - .lock() - .entry(identity.to_owned()) - .or_default() - .push(err); + let err_str = err.to_string(); + if !self.error_suppressor.lock().suppress_error(&err_str) { + self.streaming_metrics + .user_compute_error_count + .with_label_values(&[ + "ExprError", + &err_str, + executor_name, + &self.fragment_id.to_string(), + ]) + .inc(); + + } } pub fn store_mem_usage(&self, val: usize) { diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index d3e5ffd11e0b2..130d508a68dec 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -607,6 +607,7 @@ impl LocalStreamManagerCore { actor.fragment_id, self.total_mem_val.clone(), self.streaming_metrics.clone(), + self.config.unique_user_stream_errors ); let vnode_bitmap = actor .vnode_bitmap From 9bd1198f6285943d1a9ea36557ee2e1a05c5659d Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Thu, 23 Feb 2023 11:48:00 +0800 Subject: [PATCH 2/5] fix --- src/stream/src/executor/actor.rs | 7 +++---- src/stream/src/task/stream_manager.rs | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 5772f11251821..d11459369bd0f 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -21,8 +21,8 @@ use futures::pin_mut; use hytra::TrAdder; use minitrace::prelude::*; use parking_lot::Mutex; -use risingwave_common::util::epoch::EpochPair; use risingwave_common::error::ErrorSuppressor; +use risingwave_common::util::epoch::EpochPair; use risingwave_expr::ExprError; use tokio_stream::StreamExt; @@ -41,7 +41,7 @@ pub struct ActorContext { cur_mem_val: Arc, total_mem_val: Arc>, streaming_metrics: Arc, - pub error_suppressor: Arc> + pub error_suppressor: Arc>, } pub type ActorContextRef = Arc; @@ -73,7 +73,7 @@ impl ActorContext { last_mem_val: Arc::new(0.into()), total_mem_val, streaming_metrics, - error_suppressor: Arc::new(Mutex::new(ErrorSuppressor::new(unique_user_errors))) + error_suppressor: Arc::new(Mutex::new(ErrorSuppressor::new(unique_user_errors))), }) } @@ -91,7 +91,6 @@ impl ActorContext { &self.fragment_id.to_string(), ]) .inc(); - } } diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 130d508a68dec..54e94f832c421 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -607,7 +607,7 @@ impl LocalStreamManagerCore { actor.fragment_id, self.total_mem_val.clone(), self.streaming_metrics.clone(), - self.config.unique_user_stream_errors + self.config.unique_user_stream_errors, ); let vnode_bitmap = actor .vnode_bitmap From 4be9bfdfcd6558e8e870971bfea151acfa56f202 Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Thu, 23 Feb 2023 11:49:59 +0800 Subject: [PATCH 3/5] minor --- src/common/src/error.rs | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/src/common/src/error.rs b/src/common/src/error.rs index afb768e7ed25d..8bd8a35eb6616 100644 --- a/src/common/src/error.rs +++ b/src/common/src/error.rs @@ -13,9 +13,11 @@ // limitations under the License. use std::backtrace::Backtrace; +use std::collections::HashSet; use std::convert::Infallible; use std::fmt::{Debug, Display, Formatter}; use std::io::Error as IoError; +use std::time::{Duration, SystemTime}; use memcomparable::Error as MemComparableError; use risingwave_pb::ProstFieldNotFound; @@ -422,6 +424,43 @@ macro_rules! bail { }; } +const ERROR_SUPRESSOR_RESET_TIME_DURATION: Duration = Duration::from_millis(60 * 60 * 1000); // 1h +#[derive(Debug)] +pub struct ErrorSuppressor { + max_unique: usize, + unique: HashSet, + last_reset_time: SystemTime, +} + +impl ErrorSuppressor { + pub fn new(max_unique: usize) -> Self { + Self { + max_unique, + last_reset_time: SystemTime::now(), + unique: Default::default(), + } + } + + pub fn suppress_error(&mut self, error: &str) -> bool { + self.try_reset(); + if self.unique.contains(error) { + false + } else if self.unique.len() < self.max_unique { + self.unique.insert(error.to_string()); + false + } else { + // We have exceeded the capacity. + true + } + } + + fn try_reset(&mut self) { + if self.last_reset_time.elapsed().unwrap() >= ERROR_SUPRESSOR_RESET_TIME_DURATION { + *self = Self::new(self.max_unique) + } + } +} + #[cfg(test)] mod tests { use std::convert::Into; From faa41a5780f8388e2c6ae9f7e858ea445406d817 Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Thu, 23 Feb 2023 11:51:59 +0800 Subject: [PATCH 4/5] minor --- src/common/src/error.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/src/error.rs b/src/common/src/error.rs index 8bd8a35eb6616..b9445dff39c13 100644 --- a/src/common/src/error.rs +++ b/src/common/src/error.rs @@ -31,6 +31,8 @@ use crate::util::value_encoding::error::ValueEncodingError; /// Header used to store serialized [`RwError`] in grpc status. pub const RW_ERROR_GRPC_HEADER: &str = "risingwave-error-bin"; +const ERROR_SUPRESSOR_RESET_TIME_DURATION: Duration = Duration::from_millis(60 * 60 * 1000); // 1h + pub trait Error = std::error::Error + Send + Sync + 'static; pub type BoxedError = Box; @@ -424,7 +426,6 @@ macro_rules! bail { }; } -const ERROR_SUPRESSOR_RESET_TIME_DURATION: Duration = Duration::from_millis(60 * 60 * 1000); // 1h #[derive(Debug)] pub struct ErrorSuppressor { max_unique: usize, From 00f4853ee69612ebbbed0f18a4786a97f4310d76 Mon Sep 17 00:00:00 2001 From: jon-chuang Date: Thu, 23 Feb 2023 11:53:38 +0800 Subject: [PATCH 5/5] rename --- src/common/src/error.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/common/src/error.rs b/src/common/src/error.rs index b9445dff39c13..df25081431370 100644 --- a/src/common/src/error.rs +++ b/src/common/src/error.rs @@ -31,7 +31,7 @@ use crate::util::value_encoding::error::ValueEncodingError; /// Header used to store serialized [`RwError`] in grpc status. pub const RW_ERROR_GRPC_HEADER: &str = "risingwave-error-bin"; -const ERROR_SUPRESSOR_RESET_TIME_DURATION: Duration = Duration::from_millis(60 * 60 * 1000); // 1h +const ERROR_SUPPRESSOR_RESET_DURATION: Duration = Duration::from_millis(60 * 60 * 1000); // 1h pub trait Error = std::error::Error + Send + Sync + 'static; pub type BoxedError = Box; @@ -456,7 +456,7 @@ impl ErrorSuppressor { } fn try_reset(&mut self) { - if self.last_reset_time.elapsed().unwrap() >= ERROR_SUPRESSOR_RESET_TIME_DURATION { + if self.last_reset_time.elapsed().unwrap() >= ERROR_SUPPRESSOR_RESET_DURATION { *self = Self::new(self.max_unique) } }