Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): support mask failed serving worker temporarily #10328

Merged
merged 5 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::HashMap;
use std::mem;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use arc_swap::ArcSwap;
Expand Down Expand Up @@ -816,21 +817,20 @@ impl StageRunner {
plan_fragment: PlanFragment,
worker: Option<WorkerNode>,
) -> SchedulerResult<Fuse<Streaming<TaskInfoResponse>>> {
let worker_node_addr = worker
.unwrap_or(self.worker_node_manager.next_random_worker()?)
.host
.unwrap();

let mut worker = worker.unwrap_or(self.worker_node_manager.next_random_worker()?);
let worker_node_addr = worker.host.take().unwrap();
let compute_client = self
.compute_client_pool
.get_by_addr((&worker_node_addr).into())
.await
.inspect_err(|_| self.mask_failed_serving_worker(&worker))
.map_err(|e| anyhow!(e))?;

let t_id = task_id.task_id;
let stream_status = compute_client
.create_task(task_id, plan_fragment, self.epoch.clone())
.await
.inspect_err(|_| self.mask_failed_serving_worker(&worker))
.map_err(|e| anyhow!(e))?
.fuse();

Expand Down Expand Up @@ -964,4 +964,23 @@ impl StageRunner {
fn is_root_stage(&self) -> bool {
self.stage.id == 0
}

fn mask_failed_serving_worker(&self, worker: &WorkerNode) {
if !worker.property.as_ref().map_or(false, |p| p.is_serving) {
return;
}
let duration = std::cmp::max(
Duration::from_secs(
self.ctx
.session
.env()
.meta_config()
.max_heartbeat_interval_secs as _,
) / 10,
Duration::from_secs(1),
);
self.worker_node_manager
.manager
.mask_worker_node(worker.id, duration);
}
}
62 changes: 55 additions & 7 deletions src/frontend/src/scheduler/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::Duration;

use rand::seq::SliceRandom;
use risingwave_common::bail;
use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping};
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
use risingwave_pb::common::{WorkerNode, WorkerType};

use crate::catalog::FragmentId;
Expand All @@ -27,6 +29,8 @@ use crate::scheduler::{SchedulerError, SchedulerResult};
/// `WorkerNodeManager` manages live worker nodes and table vnode mapping information.
pub struct WorkerNodeManager {
inner: RwLock<WorkerNodeManagerInner>,
/// Temporarily make worker invisible from serving cluster.
worker_node_mask: Arc<RwLock<HashSet<u32>>>,
}

struct WorkerNodeManagerInner {
Expand All @@ -53,6 +57,7 @@ impl WorkerNodeManager {
streaming_fragment_vnode_mapping: Default::default(),
serving_fragment_vnode_mapping: Default::default(),
}),
worker_node_mask: Arc::new(Default::default()),
}
}

Expand All @@ -63,7 +68,10 @@ impl WorkerNodeManager {
streaming_fragment_vnode_mapping: HashMap::new(),
serving_fragment_vnode_mapping: HashMap::new(),
});
Self { inner }
Self {
inner,
worker_node_mask: Arc::new(Default::default()),
}
}

pub fn list_worker_nodes(&self) -> Vec<WorkerNode> {
Expand Down Expand Up @@ -248,6 +256,26 @@ impl WorkerNodeManager {
guard.serving_fragment_vnode_mapping.remove(fragment_id);
}
}

fn worker_node_mask(&self) -> RwLockReadGuard<'_, HashSet<u32>> {
self.worker_node_mask.read().unwrap()
}

pub fn mask_worker_node(&self, worker_node_id: u32, duration: Duration) {
let mut worker_node_mask = self.worker_node_mask.write().unwrap();
if worker_node_mask.contains(&worker_node_id) {
return;
}
worker_node_mask.insert(worker_node_id);
let worker_node_mask_ref = self.worker_node_mask.clone();
tokio::spawn(async move {
tokio::time::sleep(duration).await;
worker_node_mask_ref
.write()
.unwrap()
.remove(&worker_node_id);
});
}
}

impl WorkerNodeManagerInner {
Expand Down Expand Up @@ -277,15 +305,16 @@ impl WorkerNodeSelector {
if self.enable_barrier_read {
self.manager.list_streaming_worker_nodes().len()
} else {
self.manager.list_serving_worker_nodes().len()
self.apply_worker_node_mask(self.manager.list_serving_worker_nodes())
.len()
}
}

pub fn schedule_unit_count(&self) -> usize {
let worker_nodes = if self.enable_barrier_read {
self.manager.list_streaming_worker_nodes()
} else {
self.manager.list_serving_worker_nodes()
self.apply_worker_node_mask(self.manager.list_serving_worker_nodes())
};
worker_nodes
.iter()
Expand All @@ -300,21 +329,40 @@ impl WorkerNodeSelector {
if self.enable_barrier_read {
self.manager.get_streaming_fragment_mapping(&fragment_id)
} else {
self.manager.serving_fragment_mapping(fragment_id)
let origin = self.manager.serving_fragment_mapping(fragment_id)?;
if self.manager.worker_node_mask().is_empty() {
return Ok(origin);
}
let new_workers = self.apply_worker_node_mask(self.manager.list_serving_worker_nodes());
let masked_mapping =
place_vnode(Some(&origin), &new_workers, origin.iter_unique().count());
masked_mapping.ok_or_else(|| SchedulerError::EmptyWorkerNodes)
}
}

pub fn next_random_worker(&self) -> SchedulerResult<WorkerNode> {
let worker_nodes = if self.enable_barrier_read {
self.manager.list_streaming_worker_nodes()
} else {
self.manager.list_serving_worker_nodes()
self.apply_worker_node_mask(self.manager.list_serving_worker_nodes())
};
worker_nodes
.choose(&mut rand::thread_rng())
.ok_or_else(|| SchedulerError::EmptyWorkerNodes)
.map(|w| (*w).clone())
}

fn apply_worker_node_mask(&self, origin: Vec<WorkerNode>) -> Vec<WorkerNode> {
if origin.len() <= 1 {
// If there is at most one worker, don't apply mask.
return origin;
}
let mask = self.manager.worker_node_mask();
origin
.into_iter()
.filter(|w| !mask.contains(&w.id))
.collect()
}
}

#[cfg(test)]
Expand Down
12 changes: 10 additions & 2 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_common::catalog::DEFAULT_SCHEMA_NAME;
use risingwave_common::catalog::{
DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
};
use risingwave_common::config::{load_config, BatchConfig};
use risingwave_common::config::{load_config, BatchConfig, MetaConfig};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common::session_config::{ConfigMap, VisibilityMode};
Expand Down Expand Up @@ -111,6 +111,7 @@ pub struct FrontendEnv {
source_metrics: Arc<SourceMetrics>,

batch_config: BatchConfig,
meta_config: MetaConfig,

/// Track creating streaming jobs, used to cancel creating streaming job when cancel request
/// received.
Expand Down Expand Up @@ -157,6 +158,7 @@ impl FrontendEnv {
sessions_map: Arc::new(Mutex::new(HashMap::new())),
frontend_metrics: Arc::new(FrontendMetrics::for_test()),
batch_config: BatchConfig::default(),
meta_config: MetaConfig::default(),
source_metrics: Arc::new(SourceMetrics::default()),
creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
}
Expand All @@ -173,6 +175,7 @@ impl FrontendEnv {
info!("> version: {} ({})", RW_VERSION, GIT_SHA);

let batch_config = config.batch;
let meta_config = config.meta;

let frontend_address: HostAddr = opts
.advertise_addr
Expand All @@ -191,7 +194,7 @@ impl FrontendEnv {
WorkerType::Frontend,
&frontend_address,
Default::default(),
&config.meta,
&meta_config,
)
.await?;

Expand Down Expand Up @@ -321,6 +324,7 @@ impl FrontendEnv {
frontend_metrics,
sessions_map: Arc::new(Mutex::new(HashMap::new())),
batch_config,
meta_config,
source_metrics,
creating_streaming_job_tracker,
},
Expand Down Expand Up @@ -385,6 +389,10 @@ impl FrontendEnv {
&self.batch_config
}

pub fn meta_config(&self) -> &MetaConfig {
&self.meta_config
}

pub fn source_metrics(&self) -> Arc<SourceMetrics> {
self.source_metrics.clone()
}
Expand Down
6 changes: 5 additions & 1 deletion src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub enum ConfigPath {
}

impl ConfigPath {
fn as_str(&self) -> &str {
pub fn as_str(&self) -> &str {
match self {
ConfigPath::Regular(s) => s,
ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(),
Expand Down Expand Up @@ -526,6 +526,10 @@ impl Cluster {
self.config.clone()
}

pub fn handle(&self) -> &Handle {
&self.handle
}

/// Graceful shutdown all RisingWave nodes.
pub async fn graceful_shutdown(&self) {
let mut nodes = vec![];
Expand Down
Loading