Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add executor terminating status for graceful shutdown #667

Merged
merged 3 commits into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ message ExecutorStatus {
string active = 1;
string dead = 2;
string unknown = 3;
string terminating = 4;
}
}

Expand Down
4 changes: 3 additions & 1 deletion ballista/core/src/serde/generated/ballista.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ pub mod executor_metric {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorStatus {
#[prost(oneof = "executor_status::Status", tags = "1, 2, 3")]
#[prost(oneof = "executor_status::Status", tags = "1, 2, 3, 4")]
pub status: ::core::option::Option<executor_status::Status>,
}
/// Nested message and enum types in `ExecutorStatus`.
Expand All @@ -554,6 +554,8 @@ pub mod executor_status {
Dead(::prost::alloc::string::String),
#[prost(string, tag = "3")]
Unknown(::prost::alloc::string::String),
#[prost(string, tag = "4")]
Terminating(::prost::alloc::string::String),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
21 changes: 21 additions & 0 deletions ballista/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

use dashmap::DashMap;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::metrics::ExecutorMetricsCollector;
use ballista_core::error::BallistaError;
Expand All @@ -37,6 +40,20 @@ use futures::future::AbortHandle;

use ballista_core::serde::scheduler::PartitionId;

pub struct TasksDrainedFuture(pub Arc<Executor>);

impl Future for TasksDrainedFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.0.abort_handles.len() > 0 {
Poll::Pending
} else {
Poll::Ready(())
}
}
}

Comment on lines +43 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the StopExecutorParams struct, there is a force flag to tell the Executor whether it should quit immediately or waiting for draining all the running tasks. I'm not sure whether it is correctly implemented or not. Maybe you can take a look.

pub struct StopExecutorParams {
    #[prost(string, tag = "1")]
    pub executor_id: ::prost::alloc::string::String,
    /// stop reason
    #[prost(string, tag = "2")]
    pub reason: ::prost::alloc::string::String,
    /// force to stop the executor immediately
    #[prost(bool, tag = "3")]
    pub force: bool,
}

type AbortHandles = Arc<DashMap<(usize, PartitionId), AbortHandle>>;

/// Ballista executor
Expand Down Expand Up @@ -175,6 +192,10 @@ impl Executor {
pub fn work_dir(&self) -> &str {
&self.work_dir
}

pub fn active_task_count(&self) -> usize {
self.abort_handles.len()
}
}

#[cfg(test)]
Expand Down
60 changes: 51 additions & 9 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Ballista Executor Process

use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::{env, io};
Expand All @@ -41,18 +42,21 @@ use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};

use ballista_core::config::{LogRotationPolicy, TaskSchedulingPolicy};
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::executor_resource::Resource;
use ballista_core::serde::protobuf::executor_status::Status;
use ballista_core::serde::protobuf::{
executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
ExecutorRegistration, ExecutorStoppedParams,
ExecutorRegistration, ExecutorResource, ExecutorSpecification, ExecutorStatus,
ExecutorStoppedParams, HeartBeatParams,
};
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::{
create_grpc_client_connection, create_grpc_server, with_object_store_provider,
};
use ballista_core::BALLISTA_VERSION;

use crate::executor::Executor;
use crate::executor::{Executor, TasksDrainedFuture};
use crate::executor_server::TERMINATING;
use crate::flight_service::BallistaFlightService;
use crate::metrics::LoggingMetricsCollector;
use crate::shutdown::Shutdown;
Expand Down Expand Up @@ -155,12 +159,11 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
.map(executor_registration::OptionalHost::Host),
port: opt.port as u32,
grpc_port: opt.grpc_port as u32,
specification: Some(
ExecutorSpecification {
task_slots: concurrent_tasks as u32,
}
.into(),
),
specification: Some(ExecutorSpecification {
resources: vec![ExecutorResource {
resource: Some(Resource::TaskSlots(concurrent_tasks as u32)),
}],
}),
};

let config = with_object_store_provider(
Expand Down Expand Up @@ -295,6 +298,8 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
shutdown_noti.subscribe_for_shutdown(),
)));

let tasks_drained = TasksDrainedFuture(executor);

// Concurrently run the service checking and listen for the `shutdown` signal and wait for the stop request coming.
// The check_services runs until an error is encountered, so under normal circumstances, this `select!` statement runs
// until the `shutdown` signal is received or a stop request is coming.
Expand All @@ -319,7 +324,41 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
},
};

// Set status to fenced
info!("setting executor to TERMINATING status");
TERMINATING.store(true, Ordering::Release);

if notify_scheduler {
// Send a heartbeat to update status of executor to `Fenced`. This should signal to the
// scheduler to no longer scheduler tasks on this executor
if let Err(error) = scheduler
.heart_beat_from_executor(HeartBeatParams {
executor_id: executor_id.clone(),
metrics: vec![],
status: Some(ExecutorStatus {
status: Some(Status::Terminating(String::default())),
}),
metadata: Some(ExecutorRegistration {
id: executor_id.clone(),
optional_host: opt
.external_host
.clone()
.map(executor_registration::OptionalHost::Host),
port: opt.port as u32,
grpc_port: opt.grpc_port as u32,
specification: Some(ExecutorSpecification {
resources: vec![ExecutorResource {
resource: Some(Resource::TaskSlots(concurrent_tasks as u32)),
}],
}),
}),
})
.await
{
error!("error sending heartbeat with fenced status: {:?}", error);
}

// TODO we probably don't need a separate rpc call for this....
if let Err(error) = scheduler
.executor_stopped(ExecutorStoppedParams {
executor_id,
Comment on lines +361 to 364
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove this RPC call (in the following PR), since this RPC is only used here.

Expand All @@ -329,6 +368,9 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
{
error!("ExecutorStopped grpc failed: {:?}", error);
}

// Wait for tasks to drain
tasks_drained.await;
}

// Extract the `shutdown_complete` receiver and transmitter
Expand Down
13 changes: 12 additions & 1 deletion ballista/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::collections::HashMap;
use std::convert::TryInto;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -195,6 +196,10 @@ struct ExecutorEnv {

unsafe impl Sync for ExecutorEnv {}

/// Global flag indicating whether the executor is terminating. This should be
/// set to `true` when the executor receives a shutdown signal
pub static TERMINATING: AtomicBool = AtomicBool::new(false);

impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T, U> {
fn new(
scheduler_to_register: SchedulerGrpcClient<Channel>,
Expand Down Expand Up @@ -240,11 +245,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
/// 1. First Heartbeat to its registration scheduler, if successful then return; else go next.
/// 2. Heartbeat to schedulers which has launching tasks to this executor until one succeeds
async fn heartbeat(&self) {
let status = if TERMINATING.load(Ordering::Acquire) {
executor_status::Status::Terminating(String::default())
} else {
executor_status::Status::Active(String::default())
};

let heartbeat_params = HeartBeatParams {
executor_id: self.executor.metadata.id.clone(),
metrics: self.get_executor_metrics(),
status: Some(ExecutorStatus {
status: Some(executor_status::Status::Active("".to_string())),
status: Some(status),
}),
metadata: Some(self.executor.metadata.clone()),
};
Expand Down
6 changes: 6 additions & 0 deletions ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,9 @@ name = "job_resubmit_interval_ms"
type = "u64"
default = "0"
doc = "If job is not able to be scheduled on submission, wait for this interval and resubmit. Default value of 0 indicates that job shuuld not be resubmitted"

[[param]]
name = "executor_termination_grace_period"
type = "u64"
default = "30"
doc = "Time in seconds an executor should be considered lost after it enters terminating status"
1 change: 1 addition & 0 deletions ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ async fn main() -> Result<()> {
cluster_storage: ClusterStorageConfig::Memory,
job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0)
.then_some(opt.job_resubmit_interval_ms),
executor_termination_grace_period: opt.executor_termination_grace_period,
};

let cluster = BallistaCluster::new_from_config(&config).await?;
Expand Down
9 changes: 9 additions & 0 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub struct SchedulerConfig {
pub job_resubmit_interval_ms: Option<u64>,
/// Configuration for ballista cluster storage
pub cluster_storage: ClusterStorageConfig,
/// Time in seconds to allow executor for graceful shutdown. Once an executor signals it has entered Terminating status
/// the scheduler should only consider the executor dead after this time interval has elapsed
pub executor_termination_grace_period: u64,
}

impl Default for SchedulerConfig {
Expand All @@ -65,6 +68,7 @@ impl Default for SchedulerConfig {
advertise_flight_sql_endpoint: None,
cluster_storage: ClusterStorageConfig::Memory,
job_resubmit_interval_ms: None,
executor_termination_grace_period: 0,
}
}
}
Expand Down Expand Up @@ -141,6 +145,11 @@ impl SchedulerConfig {
self.job_resubmit_interval_ms = Some(interval_ms);
self
}

pub fn with_remove_executor_wait_secs(mut self, value: u64) -> Self {
self.executor_termination_grace_period = value;
self
}
}

#[derive(Clone, Debug)]
Expand Down
34 changes: 23 additions & 11 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
metrics,
status,
};

self.state
.executor_manager
.save_executor_heartbeat(executor_heartbeat)
Expand Down Expand Up @@ -521,13 +522,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
error!("{}", msg);
Status::internal(msg)
})?;
Self::remove_executor(executor_manager, event_sender, &executor_id, Some(reason))
.await
.map_err(|e| {
let msg = format!("Error to remove executor in Scheduler due to {e:?}");
error!("{}", msg);
Status::internal(msg)
})?;

Self::remove_executor(
executor_manager,
event_sender,
&executor_id,
Some(reason),
self.executor_termination_grace_period,
);

Ok(Response::new(ExecutorStoppedResult {}))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should rename the method name to remove_executor_in_secs()

}
Expand Down Expand Up @@ -603,6 +605,7 @@ mod test {

use crate::state::executor_manager::DEFAULT_EXECUTOR_TIMEOUT_SECONDS;
use crate::state::SchedulerState;
use crate::test_utils::await_condition;
use crate::test_utils::test_cluster_context;

use super::{SchedulerGrpc, SchedulerServer};
Expand Down Expand Up @@ -702,7 +705,7 @@ mod test {
"localhost:50050".to_owned(),
cluster.clone(),
BallistaCodec::default(),
SchedulerConfig::default(),
SchedulerConfig::default().with_remove_executor_wait_secs(0),
default_metrics_collector().unwrap(),
);
scheduler.init().await?;
Expand Down Expand Up @@ -760,15 +763,22 @@ mod test {
.await
.expect("getting executor");

let is_stopped = await_condition(Duration::from_millis(10), 5, || {
futures::future::ready(Ok(state.executor_manager.is_dead_executor("abc")))
})
.await?;

// executor should be marked to dead
assert!(state.executor_manager.is_dead_executor("abc"));
assert!(is_stopped, "Executor not marked dead after 50ms");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test


let active_executors = state
.executor_manager
.get_alive_executors_within_one_minute();
assert!(active_executors.is_empty());

let expired_executors = state.executor_manager.get_expired_executors();
let expired_executors = state
.executor_manager
.get_expired_executors(scheduler.executor_termination_grace_period);
assert!(expired_executors.is_empty());

Ok(())
Expand Down Expand Up @@ -895,7 +905,9 @@ mod test {
.get_alive_executors_within_one_minute();
assert_eq!(active_executors.len(), 1);

let expired_executors = state.executor_manager.get_expired_executors();
let expired_executors = state
.executor_manager
.get_expired_executors(scheduler.executor_termination_grace_period);
assert!(expired_executors.is_empty());

// simulate the heartbeat timeout
Expand Down
Loading