Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove redundant fields in ExecutorManager #728

Merged
merged 6 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ default = "3600"
doc = "Delayed interval for cleaning up finished job state. Default: 3600"

[[param]]
name = "executor_slots_policy"
type = "ballista_scheduler::config::SlotsPolicy"
doc = "The executor slots policy for the scheduler, possible values: bias, round-robin, round-robin-local. Default: bias"
default = "ballista_scheduler::config::SlotsPolicy::Bias"
name = "task_distribution"
type = "ballista_scheduler::config::TaskDistribution"
doc = "The policy of distributing tasks to available executor slots, possible values: bias, round-robin. Default: bias"
default = "ballista_scheduler::config::TaskDistribution::Bias"

[[param]]
name = "plugin_dir"
Expand Down
2 changes: 1 addition & 1 deletion ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn main() -> Result<()> {
bind_port: opt.bind_port,
scheduling_policy: opt.scheduler_policy,
event_loop_buffer_size: opt.event_loop_buffer_size,
executor_slots_policy: opt.executor_slots_policy,
task_distribution: opt.task_distribution,
finished_job_data_clean_up_interval_seconds: opt
.finished_job_data_clean_up_interval_seconds,
finished_job_state_clean_up_interval_seconds: opt
Expand Down
25 changes: 6 additions & 19 deletions ballista/scheduler/src/cluster/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::cluster::{
reserve_slots_bias, reserve_slots_round_robin, ClusterState, ExecutorHeartbeatStream,
JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution,
};
use crate::scheduler_server::SessionBuilder;
use crate::scheduler_server::{timestamp_secs, SessionBuilder};
use crate::state::execution_graph::ExecutionGraph;
use crate::state::executor_manager::ExecutorReservation;
use crate::state::session_manager::create_datafusion_context;
Expand All @@ -47,7 +47,6 @@ use prost::Message;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

/// State implementation based on underlying `KeyValueStore`
pub struct KeyValueState<
Expand Down Expand Up @@ -240,22 +239,17 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
) -> Result<Vec<ExecutorReservation>> {
let executor_id = metadata.id.clone();

let current_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| {
BallistaError::Internal(format!("Error getting current timestamp: {e:?}"))
})?
.as_secs();

//TODO this should be in a transaction
// Now that we know we can connect, save the metadata and slots
self.save_executor_metadata(metadata).await?;
self.save_executor_heartbeat(ExecutorHeartbeat {
executor_id: executor_id.clone(),
timestamp: current_ts,
timestamp: timestamp_secs(),
metrics: vec![],
status: Some(protobuf::ExecutorStatus {
status: Some(protobuf::executor_status::Status::Active("".to_string())),
status: Some(
protobuf::executor_status::Status::Active(String::default()),
),
}),
})
.await?;
Expand Down Expand Up @@ -364,16 +358,9 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
}

async fn remove_executor(&self, executor_id: &str) -> Result<()> {
let current_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| {
BallistaError::Internal(format!("Error getting current timestamp: {e:?}"))
})?
.as_secs();

let value = ExecutorHeartbeat {
executor_id: executor_id.to_owned(),
timestamp: current_ts,
timestamp: timestamp_secs(),
metrics: vec![],
status: Some(protobuf::ExecutorStatus {
status: Some(protobuf::executor_status::Status::Dead("".to_string())),
Expand Down
27 changes: 11 additions & 16 deletions ballista/scheduler/src/cluster/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,53 +163,48 @@ impl ClusterState for InMemoryClusterState {
mut spec: ExecutorData,
reserve: bool,
) -> Result<Vec<ExecutorReservation>> {
let heartbeat = ExecutorHeartbeat {
executor_id: metadata.id.clone(),
let executor_id = metadata.id.clone();

self.save_executor_metadata(metadata).await?;
self.save_executor_heartbeat(ExecutorHeartbeat {
executor_id: executor_id.clone(),
timestamp: timestamp_secs(),
metrics: vec![],
status: Some(ExecutorStatus {
status: Some(executor_status::Status::Active(String::default())),
}),
};
})
.await?;

let mut guard = self.task_slots.lock();

// Check to see if we already have task slots for executor. If so, remove them.
if let Some((idx, _)) = guard
.task_slots
.iter()
.find_position(|slots| slots.executor_id == metadata.id)
.find_position(|slots| slots.executor_id == executor_id)
{
guard.task_slots.swap_remove(idx);
}

if reserve {
let slots = std::mem::take(&mut spec.available_task_slots) as usize;

let reservations = (0..slots)
.map(|_| ExecutorReservation::new_free(metadata.id.clone()))
.map(|_| ExecutorReservation::new_free(executor_id.clone()))
.collect();

self.executors.insert(metadata.id.clone(), metadata.clone());

guard.task_slots.push(AvailableTaskSlots {
executor_id: metadata.id,
executor_id,
slots: 0,
});

self.heartbeat_sender.send(&heartbeat);

Ok(reservations)
} else {
self.executors.insert(metadata.id.clone(), metadata.clone());

guard.task_slots.push(AvailableTaskSlots {
executor_id: metadata.id,
executor_id,
slots: spec.available_task_slots,
});

self.heartbeat_sender.send(&heartbeat);

Ok(vec![])
}
}
Expand Down
13 changes: 1 addition & 12 deletions ballista/scheduler/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
use crate::cluster::storage::etcd::EtcdClient;
use crate::cluster::storage::sled::SledClient;
use crate::cluster::storage::KeyValueStore;
use crate::config::{ClusterStorageConfig, SchedulerConfig};
use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistribution};
use crate::scheduler_server::SessionBuilder;
use crate::state::execution_graph::ExecutionGraph;
use crate::state::executor_manager::ExecutorReservation;
Expand Down Expand Up @@ -195,17 +195,6 @@ impl BallistaCluster {
/// by any schedulers with a shared `ClusterState`
pub type ExecutorHeartbeatStream = Pin<Box<dyn Stream<Item = ExecutorHeartbeat> + Send>>;

/// Method of distributing tasks to available executor slots
#[derive(Debug, Clone, Copy)]
pub enum TaskDistribution {
/// Eagerly assign tasks to executor slots. This will assign as many task slots per executor
/// as are currently available
Bias,
/// Distributed tasks evenely across executors. This will try and iterate through available executors
/// and assign one task to each executor until all tasks are assigned.
RoundRobin,
}

/// A trait that contains the necessary method to maintain a globally consistent view of cluster resources
#[tonic::async_trait]
pub trait ClusterState: Send + Sync + 'static {
Expand Down
34 changes: 16 additions & 18 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::fmt;
#[derive(Debug, Clone)]
pub struct SchedulerConfig {
/// Namespace of this scheduler. Schedulers using the same cluster storage and namespace
/// will share gloabl cluster state.
/// will share global cluster state.
pub namespace: String,
/// The external hostname of the scheduler
pub external_host: String,
Expand All @@ -36,8 +36,8 @@ pub struct SchedulerConfig {
pub scheduling_policy: TaskSchedulingPolicy,
/// The event loop buffer size. for a system of high throughput, a larger value like 1000000 is recommended
pub event_loop_buffer_size: u32,
/// The executor slots policy for the scheduler. For a cluster with single scheduler, round-robin-local is recommended
pub executor_slots_policy: SlotsPolicy,
/// Policy of distributing tasks to available executor slots. For a cluster with single scheduler, round-robin is recommended
pub task_distribution: TaskDistribution,
/// The delayed interval for cleaning up finished job data, mainly the shuffle data, 0 means the cleaning up is disabled
pub finished_job_data_clean_up_interval_seconds: u64,
/// The delayed interval for cleaning up finished job state stored in the backend, 0 means the cleaning up is disabled.
Expand All @@ -62,7 +62,7 @@ impl Default for SchedulerConfig {
bind_port: 50050,
scheduling_policy: TaskSchedulingPolicy::PullStaged,
event_loop_buffer_size: 10000,
executor_slots_policy: SlotsPolicy::Bias,
task_distribution: TaskDistribution::Bias,
finished_job_data_clean_up_interval_seconds: 300,
finished_job_state_clean_up_interval_seconds: 3600,
advertise_flight_sql_endpoint: None,
Expand Down Expand Up @@ -131,8 +131,8 @@ impl SchedulerConfig {
self
}

pub fn with_executor_slots_policy(mut self, policy: SlotsPolicy) -> Self {
self.executor_slots_policy = policy;
pub fn with_task_distribution(mut self, policy: TaskDistribution) -> Self {
self.task_distribution = policy;
self
}

Expand Down Expand Up @@ -161,30 +161,28 @@ pub enum ClusterStorageConfig {
Sled(Option<String>),
}

// an enum used to configure the executor slots policy
// needs to be visible to code generated by configure_me
/// Policy of distributing tasks to available executor slots
///
/// It needs to be visible to code generated by configure_me
#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
pub enum SlotsPolicy {
pub enum TaskDistribution {
/// Eagerly assign tasks to executor slots. This will assign as many task slots per executor
/// as are currently available
Bias,
/// Distributed tasks evenly across executors. This will try and iterate through available executors
/// and assign one task to each executor until all tasks are assigned.
RoundRobin,
RoundRobinLocal,
}

impl SlotsPolicy {
pub fn is_local(&self) -> bool {
matches!(self, SlotsPolicy::RoundRobinLocal)
}
}

impl std::str::FromStr for SlotsPolicy {
impl std::str::FromStr for TaskDistribution {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
ArgEnum::from_str(s, true)
}
}

impl parse_arg::ParseArgFromStr for SlotsPolicy {
impl parse_arg::ParseArgFromStr for TaskDistribution {
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
write!(writer, "The executor slots policy for the scheduler")
}
Expand Down
15 changes: 10 additions & 5 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,12 +773,14 @@ mod test {

let active_executors = state
.executor_manager
.get_alive_executors_within_one_minute();
.get_alive_executors_within_one_minute()
.await?;
assert!(active_executors.is_empty());

let expired_executors = state
.executor_manager
.get_expired_executors(scheduler.executor_termination_grace_period);
.get_expired_executors(scheduler.executor_termination_grace_period)
.await?;
assert!(expired_executors.is_empty());

Ok(())
Expand Down Expand Up @@ -902,12 +904,14 @@ mod test {

let active_executors = state
.executor_manager
.get_alive_executors_within_one_minute();
.get_alive_executors_within_one_minute()
.await?;
assert_eq!(active_executors.len(), 1);

let expired_executors = state
.executor_manager
.get_expired_executors(scheduler.executor_termination_grace_period);
.get_expired_executors(scheduler.executor_termination_grace_period)
.await?;
assert!(expired_executors.is_empty());

// simulate the heartbeat timeout
Expand All @@ -919,7 +923,8 @@ mod test {

let active_executors = state
.executor_manager
.get_alive_executors_within_one_minute();
.get_alive_executors_within_one_minute()
.await?;
assert!(active_executors.is_empty());
Ok(())
}
Expand Down
11 changes: 9 additions & 2 deletions ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
let termination_grace_period = self.executor_termination_grace_period;
tokio::task::spawn(async move {
loop {
let expired_executors = state
let expired_executors = if let Ok(expired_executors) = state
.executor_manager
.get_expired_executors(termination_grace_period);
.get_expired_executors(termination_grace_period)
.await
{
expired_executors
} else {
warn!("Fail to get expired executors");
vec![]
};
for expired in expired_executors {
let executor_id = expired.executor_id.clone();
let executor_manager = state.executor_manager.clone();
Expand Down
Loading