From f96f7da41be6d17097dca53931a273fc94dbcc12 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Wed, 14 Jun 2023 12:40:48 +0800 Subject: [PATCH 1/4] feat(frontend): support mask failed serving worker temporarily --- .../src/scheduler/distributed/stage.rs | 24 ++- .../src/scheduler/worker_node_manager.rs | 58 +++++- src/frontend/src/session.rs | 12 +- src/tests/simulation/src/cluster.rs | 6 +- .../tests/integration_tests/batch/mod.rs | 171 ++++++++++++++++++ .../tests/integration_tests/main.rs | 1 + 6 files changed, 258 insertions(+), 14 deletions(-) create mode 100644 src/tests/simulation/tests/integration_tests/batch/mod.rs diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 84cd4b28c1457..3ad8cf9d724b6 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use std::mem; use std::rc::Rc; use std::sync::Arc; +use std::time::Duration; use anyhow::anyhow; use arc_swap::ArcSwap; @@ -816,21 +817,36 @@ impl StageRunner { plan_fragment: PlanFragment, worker: Option, ) -> SchedulerResult>> { - let worker_node_addr = worker - .unwrap_or(self.worker_node_manager.next_random_worker()?) - .host - .unwrap(); + let worker = worker.unwrap_or(self.worker_node_manager.next_random_worker()?); + let worker_node_addr = worker.host.unwrap(); + let mask_failed_worker = || { + let duration = std::cmp::max( + Duration::from_secs( + self.ctx + .session + .env() + .meta_config() + .max_heartbeat_interval_secs as _, + ) / 10, + Duration::from_secs(1), + ); + self.worker_node_manager + .manager + .mask_worker_node(worker.id, duration); + }; let compute_client = self .compute_client_pool .get_by_addr((&worker_node_addr).into()) .await + .inspect_err(|_| mask_failed_worker()) .map_err(|e| anyhow!(e))?; let t_id = task_id.task_id; let stream_status = compute_client .create_task(task_id, plan_fragment, self.epoch.clone()) .await + .inspect_err(|_| mask_failed_worker()) .map_err(|e| anyhow!(e))? .fuse(); diff --git a/src/frontend/src/scheduler/worker_node_manager.rs b/src/frontend/src/scheduler/worker_node_manager.rs index 5289dca23642a..09a647abfeb73 100644 --- a/src/frontend/src/scheduler/worker_node_manager.rs +++ b/src/frontend/src/scheduler/worker_node_manager.rs @@ -12,13 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, RwLock, RwLockReadGuard}; +use std::time::Duration; use rand::seq::SliceRandom; use risingwave_common::bail; use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping}; use risingwave_common::util::worker_util::get_pu_to_worker_mapping; +use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; use crate::catalog::FragmentId; @@ -27,6 +29,8 @@ use crate::scheduler::{SchedulerError, SchedulerResult}; /// `WorkerNodeManager` manages live worker nodes and table vnode mapping information. pub struct WorkerNodeManager { inner: RwLock, + /// Temporarily make worker invisible from serving cluster. + worker_node_mask: Arc>>, } struct WorkerNodeManagerInner { @@ -53,6 +57,7 @@ impl WorkerNodeManager { streaming_fragment_vnode_mapping: Default::default(), serving_fragment_vnode_mapping: Default::default(), }), + worker_node_mask: Arc::new(Default::default()), } } @@ -63,7 +68,10 @@ impl WorkerNodeManager { streaming_fragment_vnode_mapping: HashMap::new(), serving_fragment_vnode_mapping: HashMap::new(), }); - Self { inner } + Self { + inner, + worker_node_mask: Arc::new(Default::default()), + } } pub fn list_worker_nodes(&self) -> Vec { @@ -248,6 +256,26 @@ impl WorkerNodeManager { guard.serving_fragment_vnode_mapping.remove(fragment_id); } } + + fn worker_node_mask(&self) -> RwLockReadGuard<'_, HashSet> { + self.worker_node_mask.read().unwrap() + } + + pub fn mask_worker_node(&self, worker_node_id: u32, duration: Duration) { + let mut worker_node_mask = self.worker_node_mask.write().unwrap(); + if worker_node_mask.contains(&worker_node_id) { + return; + } + worker_node_mask.insert(worker_node_id); + let worker_node_mask_ref = self.worker_node_mask.clone(); + tokio::spawn(async move { + tokio::time::sleep(duration).await; + worker_node_mask_ref + .write() + .unwrap() + .remove(&worker_node_id); + }); + } } impl WorkerNodeManagerInner { @@ -277,7 +305,8 @@ impl WorkerNodeSelector { if self.enable_barrier_read { self.manager.list_streaming_worker_nodes().len() } else { - self.manager.list_serving_worker_nodes().len() + self.apply_worker_node_mask(self.manager.list_serving_worker_nodes()) + .len() } } @@ -285,7 +314,7 @@ impl WorkerNodeSelector { let worker_nodes = if self.enable_barrier_read { self.manager.list_streaming_worker_nodes() } else { - self.manager.list_serving_worker_nodes() + self.apply_worker_node_mask(self.manager.list_serving_worker_nodes()) }; worker_nodes .iter() @@ -300,7 +329,14 @@ impl WorkerNodeSelector { if self.enable_barrier_read { self.manager.get_streaming_fragment_mapping(&fragment_id) } else { - self.manager.serving_fragment_mapping(fragment_id) + let origin = self.manager.serving_fragment_mapping(fragment_id)?; + if self.manager.worker_node_mask().is_empty() { + return Ok(origin); + } + let new_workers = self.apply_worker_node_mask(self.manager.list_serving_worker_nodes()); + let masked_mapping = + place_vnode(Some(&origin), &new_workers, origin.iter_unique().count()); + masked_mapping.ok_or_else(|| SchedulerError::EmptyWorkerNodes) } } @@ -308,13 +344,21 @@ impl WorkerNodeSelector { let worker_nodes = if self.enable_barrier_read { self.manager.list_streaming_worker_nodes() } else { - self.manager.list_serving_worker_nodes() + self.apply_worker_node_mask(self.manager.list_serving_worker_nodes()) }; worker_nodes .choose(&mut rand::thread_rng()) .ok_or_else(|| SchedulerError::EmptyWorkerNodes) .map(|w| (*w).clone()) } + + fn apply_worker_node_mask(&self, origin: Vec) -> Vec { + let mask = self.manager.worker_node_mask(); + origin + .into_iter() + .filter(|w| !mask.contains(&w.id)) + .collect() + } } #[cfg(test)] diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index a0223ff572689..8a3ce14e24eeb 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -31,7 +31,7 @@ use risingwave_common::catalog::DEFAULT_SCHEMA_NAME; use risingwave_common::catalog::{ DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID, }; -use risingwave_common::config::{load_config, BatchConfig}; +use risingwave_common::config::{load_config, BatchConfig, MetaConfig}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::monitor::process_linux::monitor_process; use risingwave_common::session_config::{ConfigMap, VisibilityMode}; @@ -111,6 +111,7 @@ pub struct FrontendEnv { source_metrics: Arc, batch_config: BatchConfig, + meta_config: MetaConfig, /// Track creating streaming jobs, used to cancel creating streaming job when cancel request /// received. @@ -157,6 +158,7 @@ impl FrontendEnv { sessions_map: Arc::new(Mutex::new(HashMap::new())), frontend_metrics: Arc::new(FrontendMetrics::for_test()), batch_config: BatchConfig::default(), + meta_config: MetaConfig::default(), source_metrics: Arc::new(SourceMetrics::default()), creating_streaming_job_tracker: Arc::new(creating_streaming_tracker), } @@ -173,6 +175,7 @@ impl FrontendEnv { info!("> version: {} ({})", RW_VERSION, GIT_SHA); let batch_config = config.batch; + let meta_config = config.meta; let frontend_address: HostAddr = opts .advertise_addr @@ -191,7 +194,7 @@ impl FrontendEnv { WorkerType::Frontend, &frontend_address, Default::default(), - &config.meta, + &meta_config, ) .await?; @@ -321,6 +324,7 @@ impl FrontendEnv { frontend_metrics, sessions_map: Arc::new(Mutex::new(HashMap::new())), batch_config, + meta_config, source_metrics, creating_streaming_job_tracker, }, @@ -385,6 +389,10 @@ impl FrontendEnv { &self.batch_config } + pub fn meta_config(&self) -> &MetaConfig { + &self.meta_config + } + pub fn source_metrics(&self) -> Arc { self.source_metrics.clone() } diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 0ee5e905c3d64..b79d324d9eb59 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -41,7 +41,7 @@ pub enum ConfigPath { } impl ConfigPath { - fn as_str(&self) -> &str { + pub fn as_str(&self) -> &str { match self { ConfigPath::Regular(s) => s, ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(), @@ -526,6 +526,10 @@ impl Cluster { self.config.clone() } + pub fn handle(&self) -> &Handle { + &self.handle + } + /// Graceful shutdown all RisingWave nodes. pub async fn graceful_shutdown(&self) { let mut nodes = vec![]; diff --git a/src/tests/simulation/tests/integration_tests/batch/mod.rs b/src/tests/simulation/tests/integration_tests/batch/mod.rs new file mode 100644 index 0000000000000..66a02d404a4f4 --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/batch/mod.rs @@ -0,0 +1,171 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Write; + +use anyhow::Result; +use clap::Parser; +use itertools::Itertools; +use madsim::runtime::Handle; +use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration, Session}; +use tokio::time::Duration; + +fn create_compute_node(cluster: &Cluster, idx: usize, role: &str) { + let config = cluster.config(); + let opts = risingwave_compute::ComputeNodeOpts::parse_from([ + "compute-node", + "--config-path", + config.config_path.as_str(), + "--listen-addr", + "0.0.0.0:5688", + "--advertise-addr", + &format!("192.168.3.{idx}:5688"), + "--total-memory-bytes", + "6979321856", + "--parallelism", + &config.compute_node_cores.to_string(), + "--role", + role, + ]); + cluster + .handle() + .create_node() + .name(format!("compute-{idx}")) + .ip([192, 168, 3, idx as u8].into()) + .cores(config.compute_node_cores) + .init(move || risingwave_compute::start(opts.clone(), prometheus::Registry::new())) + .build(); +} + +fn cluster_config_no_compute_nodes() -> Configuration { + let config_path = { + let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); + file.write_all( + "\ +[meta] +max_heartbeat_interval_secs = 300 + +[system] +barrier_interval_ms = 1000 +checkpoint_frequency = 1 + +[server] +telemetry_enabled = false + " + .as_bytes(), + ) + .expect("failed to write config file"); + file.into_temp_path() + }; + + Configuration { + config_path: ConfigPath::Temp(config_path.into()), + frontend_nodes: 2, + compute_nodes: 0, + meta_nodes: 3, + compactor_nodes: 2, + compute_node_cores: 2, + etcd_timeout_rate: 0.0, + etcd_data_path: None, + } +} + +#[madsim::test] +async fn test_serving_cluster_availability() { + let config = cluster_config_no_compute_nodes(); + let mut cluster = Cluster::start(config).await.unwrap(); + let num_streaming = 3; + let num_serving = 2; + for idx in 1..=num_streaming { + create_compute_node(&cluster, idx, "streaming"); + } + for idx in num_streaming + 1..=num_streaming + num_serving { + create_compute_node(&cluster, idx, "serving"); + } + // wait for the service to be ready + tokio::time::sleep(Duration::from_secs(15)).await; + + let mut session = cluster.start_session(); + session + .run("create table t1 (c int primary key);") + .await + .unwrap(); + for i in 0..1000 { + session + .run(format!("insert into t1 values ({})", i)) + .await + .unwrap(); + } + session.run("flush;").await.unwrap(); + session + .run("set visibility_mode to checkpoint;") + .await + .unwrap(); + session.run("set query_mode to distributed;").await.unwrap(); + + let select = "select * from t1 order by c;"; + let query_and_assert = |mut session: Session| async move { + let result = session.run(select).await.unwrap(); + let rows: Vec = result + .split('\n') + .into_iter() + .map(|r| r.parse::().unwrap()) + .collect_vec(); + assert_eq!(rows, (0..1000).collect_vec()); + }; + query_and_assert(session.clone()).await; + // Leave one serving node, kill all other serving nodes. + for idx in num_streaming + 1..num_streaming + num_serving { + cluster.handle().kill(format!("compute-{}", idx)); + } + // Fail and mask node. + session.run(select).await.unwrap_err(); + // Succeed as failed node has been masked. + query_and_assert(session.clone()).await; + tokio::time::sleep(Duration::from_secs(60)).await; + // Fail because mask has expired. + session.run(select).await.unwrap_err(); + // Succeed as failed node has been masked. + query_and_assert(session.clone()).await; + + // Killed serving node is removed by meta permanently. + tokio::time::sleep(Duration::from_secs(300)).await; + query_and_assert(session.clone()).await; + + // kill the remaining serving nodes. + cluster + .handle() + .kill(format!("compute-{}", num_streaming + num_serving)); + // no serving nodes + session.run(select).await.unwrap_err(); + session.run(select).await.unwrap_err(); + session.run("set visibility_mode to all;").await.unwrap(); + query_and_assert(session.clone()).await; + session + .run("set visibility_mode to checkpoint;") + .await + .unwrap(); + session.run(select).await.unwrap_err(); + + create_compute_node(&cluster, num_streaming + num_serving + 1, "serving"); + // wait for the service to be ready + tokio::time::sleep(Duration::from_secs(15)).await; + query_and_assert(session.clone()).await; + // wait for mask expire + tokio::time::sleep(Duration::from_secs(30)).await; + session.run(select).await.unwrap_err(); + // wait for previous nodes expire + tokio::time::sleep(Duration::from_secs(300)).await; + query_and_assert(session.clone()).await; +} diff --git a/src/tests/simulation/tests/integration_tests/main.rs b/src/tests/simulation/tests/integration_tests/main.rs index 3e37baa2b99b6..6d20f0c317817 100644 --- a/src/tests/simulation/tests/integration_tests/main.rs +++ b/src/tests/simulation/tests/integration_tests/main.rs @@ -21,5 +21,6 @@ #![cfg(madsim)] #![feature(lazy_cell)] +mod batch; mod recovery; mod scale; From ba5c6c606513bc286e5b91bdf6954dbcf6329126 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Wed, 14 Jun 2023 13:29:49 +0800 Subject: [PATCH 2/4] mask serving node only --- src/frontend/src/scheduler/distributed/stage.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 3ad8cf9d724b6..e9eb261bf2b1f 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -819,7 +819,10 @@ impl StageRunner { ) -> SchedulerResult>> { let worker = worker.unwrap_or(self.worker_node_manager.next_random_worker()?); let worker_node_addr = worker.host.unwrap(); - let mask_failed_worker = || { + let mask_failed_serving_worker = || { + if !worker.property.as_ref().map_or(false, |p| p.is_serving) { + return; + } let duration = std::cmp::max( Duration::from_secs( self.ctx @@ -839,14 +842,14 @@ impl StageRunner { .compute_client_pool .get_by_addr((&worker_node_addr).into()) .await - .inspect_err(|_| mask_failed_worker()) + .inspect_err(|_| mask_failed_serving_worker()) .map_err(|e| anyhow!(e))?; let t_id = task_id.task_id; let stream_status = compute_client .create_task(task_id, plan_fragment, self.epoch.clone()) .await - .inspect_err(|_| mask_failed_worker()) + .inspect_err(|_| mask_failed_serving_worker()) .map_err(|e| anyhow!(e))? .fuse(); From d5599f11e3e77ce5235332f6fbae80b831cc3098 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Wed, 14 Jun 2023 13:37:19 +0800 Subject: [PATCH 3/4] minor refactor --- .../src/scheduler/distributed/stage.rs | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index e9eb261bf2b1f..797986ed69baa 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -817,39 +817,20 @@ impl StageRunner { plan_fragment: PlanFragment, worker: Option, ) -> SchedulerResult>> { - let worker = worker.unwrap_or(self.worker_node_manager.next_random_worker()?); - let worker_node_addr = worker.host.unwrap(); - let mask_failed_serving_worker = || { - if !worker.property.as_ref().map_or(false, |p| p.is_serving) { - return; - } - let duration = std::cmp::max( - Duration::from_secs( - self.ctx - .session - .env() - .meta_config() - .max_heartbeat_interval_secs as _, - ) / 10, - Duration::from_secs(1), - ); - self.worker_node_manager - .manager - .mask_worker_node(worker.id, duration); - }; - + let mut worker = worker.unwrap_or(self.worker_node_manager.next_random_worker()?); + let worker_node_addr = worker.host.take().unwrap(); let compute_client = self .compute_client_pool .get_by_addr((&worker_node_addr).into()) .await - .inspect_err(|_| mask_failed_serving_worker()) + .inspect_err(|_| self.mask_failed_serving_worker(&worker)) .map_err(|e| anyhow!(e))?; let t_id = task_id.task_id; let stream_status = compute_client .create_task(task_id, plan_fragment, self.epoch.clone()) .await - .inspect_err(|_| mask_failed_serving_worker()) + .inspect_err(|_| self.mask_failed_serving_worker(&worker)) .map_err(|e| anyhow!(e))? .fuse(); @@ -983,4 +964,23 @@ impl StageRunner { fn is_root_stage(&self) -> bool { self.stage.id == 0 } + + fn mask_failed_serving_worker(&self, worker: &WorkerNode) { + if !worker.property.as_ref().map_or(false, |p| p.is_serving) { + return; + } + let duration = std::cmp::max( + Duration::from_secs( + self.ctx + .session + .env() + .meta_config() + .max_heartbeat_interval_secs as _, + ) / 10, + Duration::from_secs(1), + ); + self.worker_node_manager + .manager + .mask_worker_node(worker.id, duration); + } } From 5e73150103efa191e1948f9520937553bbd5c24d Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Fri, 16 Jun 2023 11:44:16 +0800 Subject: [PATCH 4/4] minor --- src/frontend/src/scheduler/worker_node_manager.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/frontend/src/scheduler/worker_node_manager.rs b/src/frontend/src/scheduler/worker_node_manager.rs index 09a647abfeb73..4f55afc7c8d6e 100644 --- a/src/frontend/src/scheduler/worker_node_manager.rs +++ b/src/frontend/src/scheduler/worker_node_manager.rs @@ -353,6 +353,10 @@ impl WorkerNodeSelector { } fn apply_worker_node_mask(&self, origin: Vec) -> Vec { + if origin.len() <= 1 { + // If there is at most one worker, don't apply mask. + return origin; + } let mask = self.manager.worker_node_mask(); origin .into_iter()