From 0cd845b3abebb26dd8bafae36a86d64044b751c8 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sun, 12 Feb 2023 07:25:56 -0500 Subject: [PATCH 1/3] Add executor terminating status for graceful shutdown --- ballista/core/proto/ballista.proto | 1 + ballista/core/src/serde/generated/ballista.rs | 4 +- ballista/executor/src/executor.rs | 21 +++ ballista/executor/src/executor_process.rs | 60 ++++++-- ballista/executor/src/executor_server.rs | 13 +- ballista/scheduler/scheduler_config_spec.toml | 6 + ballista/scheduler/src/bin/main.rs | 1 + ballista/scheduler/src/config.rs | 9 ++ ballista/scheduler/src/main.rs | 0 .../scheduler/src/scheduler_server/grpc.rs | 34 +++-- .../scheduler/src/scheduler_server/mod.rs | 130 +++++++++++------- .../scheduler/src/state/executor_manager.rs | 117 ++++++++++++++-- 12 files changed, 316 insertions(+), 80 deletions(-) create mode 100644 ballista/scheduler/src/main.rs diff --git a/ballista/core/proto/ballista.proto b/ballista/core/proto/ballista.proto index c169791d0..722baaa9e 100644 --- a/ballista/core/proto/ballista.proto +++ b/ballista/core/proto/ballista.proto @@ -323,6 +323,7 @@ message ExecutorStatus { string active = 1; string dead = 2; string unknown = 3; + string terminating = 4; } } diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs index 28236ad04..f511f2a2d 100644 --- a/ballista/core/src/serde/generated/ballista.rs +++ b/ballista/core/src/serde/generated/ballista.rs @@ -540,7 +540,7 @@ pub mod executor_metric { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutorStatus { - #[prost(oneof = "executor_status::Status", tags = "1, 2, 3")] + #[prost(oneof = "executor_status::Status", tags = "1, 2, 3, 4")] pub status: ::core::option::Option, } /// Nested message and enum types in `ExecutorStatus`. @@ -554,6 +554,8 @@ pub mod executor_status { Dead(::prost::alloc::string::String), #[prost(string, tag = "3")] Unknown(::prost::alloc::string::String), + #[prost(string, tag = "4")] + Terminating(::prost::alloc::string::String), } } #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/ballista/executor/src/executor.rs b/ballista/executor/src/executor.rs index d903db69e..867b3ba8b 100644 --- a/ballista/executor/src/executor.rs +++ b/ballista/executor/src/executor.rs @@ -19,7 +19,10 @@ use dashmap::DashMap; use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use crate::metrics::ExecutorMetricsCollector; use ballista_core::error::BallistaError; @@ -37,6 +40,20 @@ use futures::future::AbortHandle; use ballista_core::serde::scheduler::PartitionId; +pub struct TasksDrainedFuture(pub Arc); + +impl Future for TasksDrainedFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + if self.0.abort_handles.len() > 0 { + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + type AbortHandles = Arc>; /// Ballista executor @@ -175,6 +192,10 @@ impl Executor { pub fn work_dir(&self) -> &str { &self.work_dir } + + pub fn active_task_count(&self) -> usize { + self.abort_handles.len() + } } #[cfg(test)] diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index 8ec76038a..49e8c6a31 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -18,6 +18,7 @@ //! Ballista Executor Process use std::net::SocketAddr; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::{env, io}; @@ -41,18 +42,21 @@ use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode}; use ballista_core::config::{LogRotationPolicy, TaskSchedulingPolicy}; use ballista_core::error::BallistaError; +use ballista_core::serde::protobuf::executor_resource::Resource; +use ballista_core::serde::protobuf::executor_status::Status; use ballista_core::serde::protobuf::{ executor_registration, scheduler_grpc_client::SchedulerGrpcClient, - ExecutorRegistration, ExecutorStoppedParams, + ExecutorRegistration, ExecutorResource, ExecutorSpecification, ExecutorStatus, + ExecutorStoppedParams, HeartBeatParams, }; -use ballista_core::serde::scheduler::ExecutorSpecification; use ballista_core::serde::BallistaCodec; use ballista_core::utils::{ create_grpc_client_connection, create_grpc_server, with_object_store_provider, }; use ballista_core::BALLISTA_VERSION; -use crate::executor::Executor; +use crate::executor::{Executor, TasksDrainedFuture}; +use crate::executor_server::TERMINATING; use crate::flight_service::BallistaFlightService; use crate::metrics::LoggingMetricsCollector; use crate::shutdown::Shutdown; @@ -155,12 +159,11 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> { .map(executor_registration::OptionalHost::Host), port: opt.port as u32, grpc_port: opt.grpc_port as u32, - specification: Some( - ExecutorSpecification { - task_slots: concurrent_tasks as u32, - } - .into(), - ), + specification: Some(ExecutorSpecification { + resources: vec![ExecutorResource { + resource: Some(Resource::TaskSlots(concurrent_tasks as u32)), + }], + }), }; let config = with_object_store_provider( @@ -295,6 +298,8 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> { shutdown_noti.subscribe_for_shutdown(), ))); + let tasks_drained = TasksDrainedFuture(executor); + // Concurrently run the service checking and listen for the `shutdown` signal and wait for the stop request coming. // The check_services runs until an error is encountered, so under normal circumstances, this `select!` statement runs // until the `shutdown` signal is received or a stop request is coming. @@ -319,7 +324,41 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> { }, }; + // Set status to fenced + info!("setting executor to TERMINATING status"); + TERMINATING.store(true, Ordering::Release); + if notify_scheduler { + // Send a heartbeat to update status of executor to `Fenced`. This should signal to the + // scheduler to no longer scheduler tasks on this executor + if let Err(error) = scheduler + .heart_beat_from_executor(HeartBeatParams { + executor_id: executor_id.clone(), + metrics: vec![], + status: Some(ExecutorStatus { + status: Some(Status::Terminating(String::default())), + }), + metadata: Some(ExecutorRegistration { + id: executor_id.clone(), + optional_host: opt + .external_host + .clone() + .map(executor_registration::OptionalHost::Host), + port: opt.port as u32, + grpc_port: opt.grpc_port as u32, + specification: Some(ExecutorSpecification { + resources: vec![ExecutorResource { + resource: Some(Resource::TaskSlots(concurrent_tasks as u32)), + }], + }), + }), + }) + .await + { + error!("error sending heartbeat with fenced status: {:?}", error); + } + + // TODO we probably don't need a separate rpc call for this.... if let Err(error) = scheduler .executor_stopped(ExecutorStoppedParams { executor_id, @@ -329,6 +368,9 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> { { error!("ExecutorStopped grpc failed: {:?}", error); } + + // Wait for tasks to drain + tasks_drained.await; } // Extract the `shutdown_complete` receiver and transmitter diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs index 2372b6ca0..89f2eef59 100644 --- a/ballista/executor/src/executor_server.rs +++ b/ballista/executor/src/executor_server.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::convert::TryInto; use std::ops::Deref; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; @@ -195,6 +196,10 @@ struct ExecutorEnv { unsafe impl Sync for ExecutorEnv {} +/// Global flag indicating whether the executor is terminating. This should be +/// set to `true` when the executor receives a shutdown signal +pub static TERMINATING: AtomicBool = AtomicBool::new(false); + impl ExecutorServer { fn new( scheduler_to_register: SchedulerGrpcClient, @@ -240,11 +245,17 @@ impl ExecutorServer Result<()> { cluster_storage: ClusterStorageConfig::Memory, job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0) .then_some(opt.job_resubmit_interval_ms), + executor_termination_grace_period: opt.executor_termination_grace_period, }; let cluster = BallistaCluster::new_from_config(&config).await?; diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs index ddd05b786..087e38674 100644 --- a/ballista/scheduler/src/config.rs +++ b/ballista/scheduler/src/config.rs @@ -49,6 +49,9 @@ pub struct SchedulerConfig { pub job_resubmit_interval_ms: Option, /// Configuration for ballista cluster storage pub cluster_storage: ClusterStorageConfig, + /// Time in seconds to allow executor for graceful shutdown. Once an executor signals it has entered Terminating status + /// the scheduler should only consider the executor dead after this time interval has elapsed + pub executor_termination_grace_period: u64, } impl Default for SchedulerConfig { @@ -65,6 +68,7 @@ impl Default for SchedulerConfig { advertise_flight_sql_endpoint: None, cluster_storage: ClusterStorageConfig::Memory, job_resubmit_interval_ms: None, + executor_termination_grace_period: 0, } } } @@ -141,6 +145,11 @@ impl SchedulerConfig { self.job_resubmit_interval_ms = Some(interval_ms); self } + + pub fn with_remove_executor_wait_secs(mut self, value: u64) -> Self { + self.executor_termination_grace_period = value; + self + } } #[derive(Clone, Debug)] diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs new file mode 100644 index 000000000..e69de29bb diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index bff078d01..de07a06d5 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -258,6 +258,7 @@ impl SchedulerGrpc metrics, status, }; + self.state .executor_manager .save_executor_heartbeat(executor_heartbeat) @@ -521,13 +522,14 @@ impl SchedulerGrpc error!("{}", msg); Status::internal(msg) })?; - Self::remove_executor(executor_manager, event_sender, &executor_id, Some(reason)) - .await - .map_err(|e| { - let msg = format!("Error to remove executor in Scheduler due to {e:?}"); - error!("{}", msg); - Status::internal(msg) - })?; + + Self::remove_executor( + executor_manager, + event_sender, + &executor_id, + Some(reason), + self.executor_termination_grace_period, + ); Ok(Response::new(ExecutorStoppedResult {})) } @@ -603,6 +605,7 @@ mod test { use crate::state::executor_manager::DEFAULT_EXECUTOR_TIMEOUT_SECONDS; use crate::state::SchedulerState; + use crate::test_utils::await_condition; use crate::test_utils::test_cluster_context; use super::{SchedulerGrpc, SchedulerServer}; @@ -702,7 +705,7 @@ mod test { "localhost:50050".to_owned(), cluster.clone(), BallistaCodec::default(), - SchedulerConfig::default(), + SchedulerConfig::default().with_remove_executor_wait_secs(0), default_metrics_collector().unwrap(), ); scheduler.init().await?; @@ -760,15 +763,22 @@ mod test { .await .expect("getting executor"); + let is_stopped = await_condition(Duration::from_millis(10), 5, || { + futures::future::ready(Ok(state.executor_manager.is_dead_executor("abc"))) + }) + .await?; + // executor should be marked to dead - assert!(state.executor_manager.is_dead_executor("abc")); + assert!(is_stopped, "Executor not marked dead after 50ms"); let active_executors = state .executor_manager .get_alive_executors_within_one_minute(); assert!(active_executors.is_empty()); - let expired_executors = state.executor_manager.get_expired_executors(); + let expired_executors = state + .executor_manager + .get_expired_executors(scheduler.executor_termination_grace_period); assert!(expired_executors.is_empty()); Ok(()) @@ -895,7 +905,9 @@ mod test { .get_alive_executors_within_one_minute(); assert_eq!(active_executors.len(), 1); - let expired_executors = state.executor_manager.get_expired_executors(); + let expired_executors = state + .executor_manager + .get_expired_executors(scheduler.executor_termination_grace_period); assert!(expired_executors.is_empty()); // simulate the heartbeat timeout diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index de37365b9..fc40b0178 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -40,6 +40,7 @@ use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler; use crate::state::executor_manager::{ ExecutorManager, ExecutorReservation, DEFAULT_EXECUTOR_TIMEOUT_SECONDS, + EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS, }; use crate::state::task_manager::TaskLauncher; @@ -65,6 +66,7 @@ pub struct SchedulerServer>, pub(crate) query_stage_event_loop: EventLoop, query_stage_scheduler: Arc>, + executor_termination_grace_period: u64, } impl SchedulerServer { @@ -98,6 +100,7 @@ impl SchedulerServer SchedulerServer SchedulerServer Result<()> { let state = self.state.clone(); let event_sender = self.query_stage_event_loop.get_sender()?; + let termination_grace_period = self.executor_termination_grace_period; tokio::task::spawn(async move { loop { - let expired_executors = state.executor_manager.get_expired_executors(); + let expired_executors = state + .executor_manager + .get_expired_executors(termination_grace_period); for expired in expired_executors { let executor_id = expired.executor_id.clone(); let executor_manager = state.executor_manager.clone(); - let stop_reason = format!( - "Executor {} heartbeat timed out after {}s", - executor_id.clone(), - DEFAULT_EXECUTOR_TIMEOUT_SECONDS - ); - warn!("{}", stop_reason.clone()); + let sender_clone = event_sender.clone(); + + let terminating = matches!( + expired + .status + .as_ref() + .and_then(|status| status.status.as_ref()), + Some(ballista_core::serde::protobuf::executor_status::Status::Terminating(_)) + ); + + let stop_reason = if terminating { + format!( + "TERMINATING executor {executor_id} heartbeat timed out after {termination_grace_period}s" + ) + } else { + format!( + "ACTIVE executor {executor_id} heartbeat timed out after {DEFAULT_EXECUTOR_TIMEOUT_SECONDS}s", + ) + }; + + warn!("{stop_reason}"); + + // If executor is expired, remove it immediately Self::remove_executor( executor_manager, sender_clone, &executor_id, Some(stop_reason.clone()), - ) - .await - .unwrap_or_else(|e| { - let msg = - format!("Error to remove Executor in Scheduler due to {e:?}"); - error!("{}", msg); - }); + 0, + ); - match state.executor_manager.get_client(&executor_id).await { - Ok(mut client) => { - tokio::task::spawn(async move { - match client - .stop_executor(StopExecutorParams { - executor_id, - reason: stop_reason, - force: true, - }) - .await - { - Err(error) => { - warn!( + // If executor is not already terminating then stop it. If it is terminating then it should already be shutting + // down and we do not need to do anything here. + if !terminating { + match state.executor_manager.get_client(&executor_id).await { + Ok(mut client) => { + tokio::task::spawn(async move { + match client + .stop_executor(StopExecutorParams { + executor_id, + reason: stop_reason, + force: true, + }) + .await + { + Err(error) => { + warn!( "Failed to send stop_executor rpc due to, {}", error ); + } + Ok(_value) => {} } - Ok(_value) => {} - } - }); - } - Err(_) => { - warn!("Executor is already dead, failed to connect to Executor {}", executor_id); + }); + } + Err(_) => { + warn!("Executor is already dead, failed to connect to Executor {}", executor_id); + } } } } - tokio::time::sleep(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS)) - .await; + tokio::time::sleep(Duration::from_secs( + EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS, + )) + .await; } }); Ok(()) } - pub(crate) async fn remove_executor( + pub(crate) fn remove_executor( executor_manager: ExecutorManager, event_sender: EventSender, executor_id: &str, reason: Option, - ) -> Result<()> { - // Update the executor manager immediately here - executor_manager - .remove_executor(executor_id, reason.clone()) - .await?; + wait_secs: u64, + ) { + let executor_id = executor_id.to_owned(); + tokio::spawn(async move { + // Wait for `wait_secs` before removing executor + tokio::time::sleep(Duration::from_secs(wait_secs)).await; + + // Update the executor manager immediately here + if let Err(e) = executor_manager + .remove_executor(&executor_id, reason.clone()) + .await + { + error!("error removing executor {executor_id}: {e:?}"); + } - event_sender - .post_event(QueryStageSchedulerEvent::ExecutorLost( - executor_id.to_owned(), - reason, - )) - .await?; - Ok(()) + if let Err(e) = event_sender + .post_event(QueryStageSchedulerEvent::ExecutorLost(executor_id, reason)) + .await + { + error!("error sending ExecutorLost event: {e:?}"); + } + }); } async fn do_register_executor(&self, metadata: ExecutorMetadata) -> Result<()> { diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs index 9a51aa77e..bec97c3b7 100644 --- a/ballista/scheduler/src/state/executor_manager.rs +++ b/ballista/scheduler/src/state/executor_manager.rs @@ -85,6 +85,10 @@ impl ExecutorReservation { /// to be dead. pub const DEFAULT_EXECUTOR_TIMEOUT_SECONDS: u64 = 180; +// TODO move to configuration file +/// Interval check for expired or dead executors +pub const EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS: u64 = 15; + #[derive(Clone)] pub(crate) struct ExecutorManager { // executor slot policy @@ -140,14 +144,19 @@ impl ExecutorManager { tokio::task::spawn(async move { while let Some(heartbeat) = heartbeat_stream.next().await { let executor_id = heartbeat.executor_id.clone(); - if let Some(ExecutorStatus { - status: Some(executor_status::Status::Dead(_)), - }) = heartbeat.status + + match heartbeat + .status + .as_ref() + .and_then(|status| status.status.as_ref()) { - heartbeats.remove(&executor_id); - dead_executors.insert(executor_id); - } else { - heartbeats.insert(executor_id, heartbeat); + Some(executor_status::Status::Dead(_)) => { + heartbeats.remove(&executor_id); + dead_executors.insert(executor_id); + } + _ => { + heartbeats.insert(executor_id, heartbeat); + } } } }); @@ -614,27 +623,62 @@ impl ExecutorManager { .iter() .filter_map(|pair| { let (exec, heartbeat) = pair.pair(); - (heartbeat.timestamp > last_seen_ts_threshold).then(|| exec.clone()) + + let active = matches!( + heartbeat + .status + .as_ref() + .and_then(|status| status.status.as_ref()), + Some(executor_status::Status::Active(_)) + ); + let live = heartbeat.timestamp > last_seen_ts_threshold; + + (active && live).then(|| exec.clone()) }) .collect() } /// Return a list of expired executors - pub(crate) fn get_expired_executors(&self) -> Vec { + pub(crate) fn get_expired_executors( + &self, + termination_grace_period: u64, + ) -> Vec { let now_epoch_ts = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Time went backwards"); + // Threshold for last heartbeat from Active executor before marking dead let last_seen_threshold = now_epoch_ts .checked_sub(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS)) .unwrap_or_else(|| Duration::from_secs(0)) .as_secs(); + // Threshold for last heartbeat for Fenced executor before marking dead + let termination_wait_threshold = now_epoch_ts + .checked_sub(Duration::from_secs(termination_grace_period)) + .unwrap_or_else(|| Duration::from_secs(0)) + .as_secs(); + let expired_executors = self .executors_heartbeat .iter() .filter_map(|pair| { let (_exec, heartbeat) = pair.pair(); - (heartbeat.timestamp <= last_seen_threshold).then(|| heartbeat.clone()) + + let terminating = matches!( + heartbeat + .status + .as_ref() + .and_then(|status| status.status.as_ref()), + Some(executor_status::Status::Terminating(_)) + ); + + let grace_period_expired = + heartbeat.timestamp <= termination_wait_threshold; + + let expired = heartbeat.timestamp <= last_seen_threshold; + + ((terminating && grace_period_expired) || expired) + .then(|| heartbeat.clone()) }) .collect::>(); expired_executors @@ -656,9 +700,12 @@ mod test { use crate::config::SlotsPolicy; + use crate::scheduler_server::timestamp_secs; use crate::state::executor_manager::{ExecutorManager, ExecutorReservation}; use crate::test_utils::test_cluster_context; use ballista_core::error::Result; + use ballista_core::serde::protobuf::executor_status::Status; + use ballista_core::serde::protobuf::{ExecutorHeartbeat, ExecutorStatus}; use ballista_core::serde::scheduler::{ ExecutorData, ExecutorMetadata, ExecutorSpecification, }; @@ -844,6 +891,56 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_ignore_fenced_executors() -> Result<()> { + test_ignore_fenced_executors_inner(SlotsPolicy::Bias).await?; + test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobin).await?; + test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobinLocal).await?; + + Ok(()) + } + + async fn test_ignore_fenced_executors_inner(slots_policy: SlotsPolicy) -> Result<()> { + let cluster = test_cluster_context(); + + let executor_manager = + ExecutorManager::new(cluster.cluster_state(), slots_policy); + + // Setup two executors initially + let executors = test_executors(2, 4); + + for (executor_metadata, executor_data) in executors { + let _ = executor_manager + .register_executor(executor_metadata, executor_data, false) + .await?; + } + + // Fence one of the executors + executor_manager + .save_executor_heartbeat(ExecutorHeartbeat { + executor_id: "executor-0".to_string(), + timestamp: timestamp_secs(), + metrics: vec![], + status: Some(ExecutorStatus { + status: Some(Status::Terminating(String::default())), + }), + }) + .await?; + + let reservations = executor_manager.reserve_slots(8).await?; + + assert_eq!(reservations.len(), 4, "Expected only four reservations"); + + assert!( + reservations + .iter() + .all(|res| res.executor_id == "executor-1"), + "Expected all reservations from non-fenced executor", + ); + + Ok(()) + } + fn test_executors( total_executors: usize, slots_per_executor: u32, From 39c73c67254420d68adb66f91d504ff0d1850181 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sun, 12 Feb 2023 07:46:32 -0500 Subject: [PATCH 2/3] Remove empty file --- ballista/scheduler/src/main.rs | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 ballista/scheduler/src/main.rs diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs deleted file mode 100644 index e69de29bb..000000000 From 569d5066d7658135d8ca256d652df80d916c3c3f Mon Sep 17 00:00:00 2001 From: Dan Harris <1327726+thinkharderdev@users.noreply.github.com> Date: Sat, 25 Feb 2023 06:40:10 -0500 Subject: [PATCH 3/3] Update ballista/executor/src/executor_process.rs Co-authored-by: Brent Gardner --- ballista/executor/src/executor_process.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index 49e8c6a31..6db3de069 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -330,7 +330,7 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> { if notify_scheduler { // Send a heartbeat to update status of executor to `Fenced`. This should signal to the - // scheduler to no longer scheduler tasks on this executor + // scheduler to no longer schedule tasks on this executor if let Err(error) = scheduler .heart_beat_from_executor(HeartBeatParams { executor_id: executor_id.clone(),