Skip to content

Commit ee3a914

Browse files
committed
Rename SlotsPolicy to TaskDistribution
1 parent 37eb702 commit ee3a914

File tree

6 files changed

+59
-62
lines changed

6 files changed

+59
-62
lines changed

ballista/scheduler/scheduler_config_spec.toml

+4-4
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ default = "3600"
9696
doc = "Delayed interval for cleaning up finished job state. Default: 3600"
9797

9898
[[param]]
99-
name = "executor_slots_policy"
100-
type = "ballista_scheduler::config::SlotsPolicy"
101-
doc = "The executor slots policy for the scheduler, possible values: bias, round-robin. Default: bias"
102-
default = "ballista_scheduler::config::SlotsPolicy::Bias"
99+
name = "task_distribution"
100+
type = "ballista_scheduler::config::TaskDistribution"
101+
doc = "The policy of distributing tasks to available executor slots, possible values: bias, round-robin. Default: bias"
102+
default = "ballista_scheduler::config::TaskDistribution::Bias"
103103

104104
[[param]]
105105
name = "plugin_dir"

ballista/scheduler/src/bin/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ async fn main() -> Result<()> {
109109
bind_port: opt.bind_port,
110110
scheduling_policy: opt.scheduler_policy,
111111
event_loop_buffer_size: opt.event_loop_buffer_size,
112-
executor_slots_policy: opt.executor_slots_policy,
112+
task_distribution: opt.task_distribution,
113113
finished_job_data_clean_up_interval_seconds: opt
114114
.finished_job_data_clean_up_interval_seconds,
115115
finished_job_state_clean_up_interval_seconds: opt

ballista/scheduler/src/cluster/mod.rs

+1-12
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
2929
use crate::cluster::storage::etcd::EtcdClient;
3030
use crate::cluster::storage::sled::SledClient;
3131
use crate::cluster::storage::KeyValueStore;
32-
use crate::config::{ClusterStorageConfig, SchedulerConfig};
32+
use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistribution};
3333
use crate::scheduler_server::SessionBuilder;
3434
use crate::state::execution_graph::ExecutionGraph;
3535
use crate::state::executor_manager::ExecutorReservation;
@@ -195,17 +195,6 @@ impl BallistaCluster {
195195
/// by any schedulers with a shared `ClusterState`
196196
pub type ExecutorHeartbeatStream = Pin<Box<dyn Stream<Item = ExecutorHeartbeat> + Send>>;
197197

198-
/// Method of distributing tasks to available executor slots
199-
#[derive(Debug, Clone, Copy)]
200-
pub enum TaskDistribution {
201-
/// Eagerly assign tasks to executor slots. This will assign as many task slots per executor
202-
/// as are currently available
203-
Bias,
204-
/// Distributed tasks evenely across executors. This will try and iterate through available executors
205-
/// and assign one task to each executor until all tasks are assigned.
206-
RoundRobin,
207-
}
208-
209198
/// A trait that contains the necessary method to maintain a globally consistent view of cluster resources
210199
#[tonic::async_trait]
211200
pub trait ClusterState: Send + Sync + 'static {

ballista/scheduler/src/config.rs

+16-11
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::fmt;
2626
#[derive(Debug, Clone)]
2727
pub struct SchedulerConfig {
2828
/// Namespace of this scheduler. Schedulers using the same cluster storage and namespace
29-
/// will share gloabl cluster state.
29+
/// will share global cluster state.
3030
pub namespace: String,
3131
/// The external hostname of the scheduler
3232
pub external_host: String,
@@ -36,8 +36,8 @@ pub struct SchedulerConfig {
3636
pub scheduling_policy: TaskSchedulingPolicy,
3737
/// The event loop buffer size. for a system of high throughput, a larger value like 1000000 is recommended
3838
pub event_loop_buffer_size: u32,
39-
/// The executor slots policy for the scheduler. For a cluster with single scheduler, round-robin-local is recommended
40-
pub executor_slots_policy: SlotsPolicy,
39+
/// Policy of distributing tasks to available executor slots. For a cluster with single scheduler, round-robin is recommended
40+
pub task_distribution: TaskDistribution,
4141
/// The delayed interval for cleaning up finished job data, mainly the shuffle data, 0 means the cleaning up is disabled
4242
pub finished_job_data_clean_up_interval_seconds: u64,
4343
/// The delayed interval for cleaning up finished job state stored in the backend, 0 means the cleaning up is disabled.
@@ -62,7 +62,7 @@ impl Default for SchedulerConfig {
6262
bind_port: 50050,
6363
scheduling_policy: TaskSchedulingPolicy::PullStaged,
6464
event_loop_buffer_size: 10000,
65-
executor_slots_policy: SlotsPolicy::Bias,
65+
task_distribution: TaskDistribution::Bias,
6666
finished_job_data_clean_up_interval_seconds: 300,
6767
finished_job_state_clean_up_interval_seconds: 3600,
6868
advertise_flight_sql_endpoint: None,
@@ -131,8 +131,8 @@ impl SchedulerConfig {
131131
self
132132
}
133133

134-
pub fn with_executor_slots_policy(mut self, policy: SlotsPolicy) -> Self {
135-
self.executor_slots_policy = policy;
134+
pub fn with_task_distribution(mut self, policy: TaskDistribution) -> Self {
135+
self.task_distribution = policy;
136136
self
137137
}
138138

@@ -161,23 +161,28 @@ pub enum ClusterStorageConfig {
161161
Sled(Option<String>),
162162
}
163163

164-
// an enum used to configure the executor slots policy
165-
// needs to be visible to code generated by configure_me
164+
/// Policy of distributing tasks to available executor slots
165+
///
166+
/// It needs to be visible to code generated by configure_me
166167
#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
167-
pub enum SlotsPolicy {
168+
pub enum TaskDistribution {
169+
/// Eagerly assign tasks to executor slots. This will assign as many task slots per executor
170+
/// as are currently available
168171
Bias,
172+
/// Distributed tasks evenly across executors. This will try and iterate through available executors
173+
/// and assign one task to each executor until all tasks are assigned.
169174
RoundRobin,
170175
}
171176

172-
impl std::str::FromStr for SlotsPolicy {
177+
impl std::str::FromStr for TaskDistribution {
173178
type Err = String;
174179

175180
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
176181
ArgEnum::from_str(s, true)
177182
}
178183
}
179184

180-
impl parse_arg::ParseArgFromStr for SlotsPolicy {
185+
impl parse_arg::ParseArgFromStr for TaskDistribution {
181186
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
182187
write!(writer, "The executor slots policy for the scheduler")
183188
}

ballista/scheduler/src/state/executor_manager.rs

+35-32
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@
1717

1818
use std::time::{Duration, SystemTime, UNIX_EPOCH};
1919

20-
use crate::cluster::TaskDistribution;
21-
2220
#[cfg(not(test))]
2321
use ballista_core::error::BallistaError;
2422
use ballista_core::error::Result;
2523
use ballista_core::serde::protobuf;
2624

2725
use crate::cluster::ClusterState;
28-
use crate::config::SlotsPolicy;
26+
use crate::config::TaskDistribution;
2927

3028
use crate::state::execution_graph::RunningTaskInfo;
3129
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
@@ -101,13 +99,8 @@ pub struct ExecutorManager {
10199
impl ExecutorManager {
102100
pub(crate) fn new(
103101
cluster_state: Arc<dyn ClusterState>,
104-
slots_policy: SlotsPolicy,
102+
task_distribution: TaskDistribution,
105103
) -> Self {
106-
let task_distribution = match slots_policy {
107-
SlotsPolicy::Bias => TaskDistribution::Bias,
108-
SlotsPolicy::RoundRobin => TaskDistribution::RoundRobin,
109-
};
110-
111104
Self {
112105
task_distribution,
113106
cluster_state,
@@ -511,7 +504,7 @@ impl ExecutorManager {
511504
#[cfg(test)]
512505
mod test {
513506

514-
use crate::config::SlotsPolicy;
507+
use crate::config::TaskDistribution;
515508

516509
use crate::scheduler_server::timestamp_secs;
517510
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
@@ -525,17 +518,19 @@ mod test {
525518

526519
#[tokio::test]
527520
async fn test_reserve_and_cancel() -> Result<()> {
528-
test_reserve_and_cancel_inner(SlotsPolicy::Bias).await?;
529-
test_reserve_and_cancel_inner(SlotsPolicy::RoundRobin).await?;
521+
test_reserve_and_cancel_inner(TaskDistribution::Bias).await?;
522+
test_reserve_and_cancel_inner(TaskDistribution::RoundRobin).await?;
530523

531524
Ok(())
532525
}
533526

534-
async fn test_reserve_and_cancel_inner(slots_policy: SlotsPolicy) -> Result<()> {
527+
async fn test_reserve_and_cancel_inner(
528+
task_distribution: TaskDistribution,
529+
) -> Result<()> {
535530
let cluster = test_cluster_context();
536531

537532
let executor_manager =
538-
ExecutorManager::new(cluster.cluster_state(), slots_policy);
533+
ExecutorManager::new(cluster.cluster_state(), task_distribution);
539534

540535
let executors = test_executors(10, 4);
541536

@@ -551,7 +546,7 @@ mod test {
551546
assert_eq!(
552547
reservations.len(),
553548
40,
554-
"Expected 40 reservations for policy {slots_policy:?}"
549+
"Expected 40 reservations for policy {task_distribution:?}"
555550
);
556551

557552
// Now cancel them
@@ -563,25 +558,27 @@ mod test {
563558
assert_eq!(
564559
reservations.len(),
565560
40,
566-
"Expected 40 reservations for policy {slots_policy:?}"
561+
"Expected 40 reservations for policy {task_distribution:?}"
567562
);
568563

569564
Ok(())
570565
}
571566

572567
#[tokio::test]
573568
async fn test_reserve_partial() -> Result<()> {
574-
test_reserve_partial_inner(SlotsPolicy::Bias).await?;
575-
test_reserve_partial_inner(SlotsPolicy::RoundRobin).await?;
569+
test_reserve_partial_inner(TaskDistribution::Bias).await?;
570+
test_reserve_partial_inner(TaskDistribution::RoundRobin).await?;
576571

577572
Ok(())
578573
}
579574

580-
async fn test_reserve_partial_inner(slots_policy: SlotsPolicy) -> Result<()> {
575+
async fn test_reserve_partial_inner(
576+
task_distribution: TaskDistribution,
577+
) -> Result<()> {
581578
let cluster = test_cluster_context();
582579

583580
let executor_manager =
584-
ExecutorManager::new(cluster.cluster_state(), slots_policy);
581+
ExecutorManager::new(cluster.cluster_state(), task_distribution);
585582

586583
let executors = test_executors(10, 4);
587584

@@ -621,21 +618,23 @@ mod test {
621618

622619
#[tokio::test]
623620
async fn test_reserve_concurrent() -> Result<()> {
624-
test_reserve_concurrent_inner(SlotsPolicy::Bias).await?;
625-
test_reserve_concurrent_inner(SlotsPolicy::RoundRobin).await?;
621+
test_reserve_concurrent_inner(TaskDistribution::Bias).await?;
622+
test_reserve_concurrent_inner(TaskDistribution::RoundRobin).await?;
626623

627624
Ok(())
628625
}
629626

630-
async fn test_reserve_concurrent_inner(slots_policy: SlotsPolicy) -> Result<()> {
627+
async fn test_reserve_concurrent_inner(
628+
task_distribution: TaskDistribution,
629+
) -> Result<()> {
631630
let (sender, mut receiver) =
632631
tokio::sync::mpsc::channel::<Result<Vec<ExecutorReservation>>>(1000);
633632

634633
let executors = test_executors(10, 4);
635634

636635
let cluster = test_cluster_context();
637636
let executor_manager =
638-
ExecutorManager::new(cluster.cluster_state(), slots_policy);
637+
ExecutorManager::new(cluster.cluster_state(), task_distribution);
639638

640639
for (executor_metadata, executor_data) in executors {
641640
executor_manager
@@ -670,17 +669,19 @@ mod test {
670669

671670
#[tokio::test]
672671
async fn test_register_reserve() -> Result<()> {
673-
test_register_reserve_inner(SlotsPolicy::Bias).await?;
674-
test_register_reserve_inner(SlotsPolicy::RoundRobin).await?;
672+
test_register_reserve_inner(TaskDistribution::Bias).await?;
673+
test_register_reserve_inner(TaskDistribution::RoundRobin).await?;
675674

676675
Ok(())
677676
}
678677

679-
async fn test_register_reserve_inner(slots_policy: SlotsPolicy) -> Result<()> {
678+
async fn test_register_reserve_inner(
679+
task_distribution: TaskDistribution,
680+
) -> Result<()> {
680681
let cluster = test_cluster_context();
681682

682683
let executor_manager =
683-
ExecutorManager::new(cluster.cluster_state(), slots_policy);
684+
ExecutorManager::new(cluster.cluster_state(), task_distribution);
684685

685686
let executors = test_executors(10, 4);
686687

@@ -702,17 +703,19 @@ mod test {
702703

703704
#[tokio::test]
704705
async fn test_ignore_fenced_executors() -> Result<()> {
705-
test_ignore_fenced_executors_inner(SlotsPolicy::Bias).await?;
706-
test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobin).await?;
706+
test_ignore_fenced_executors_inner(TaskDistribution::Bias).await?;
707+
test_ignore_fenced_executors_inner(TaskDistribution::RoundRobin).await?;
707708

708709
Ok(())
709710
}
710711

711-
async fn test_ignore_fenced_executors_inner(slots_policy: SlotsPolicy) -> Result<()> {
712+
async fn test_ignore_fenced_executors_inner(
713+
task_distribution: TaskDistribution,
714+
) -> Result<()> {
712715
let cluster = test_cluster_context();
713716

714717
let executor_manager =
715-
ExecutorManager::new(cluster.cluster_state(), slots_policy);
718+
ExecutorManager::new(cluster.cluster_state(), task_distribution);
716719

717720
// Setup two executors initially
718721
let executors = test_executors(2, 4);

ballista/scheduler/src/state/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
116116
Self {
117117
executor_manager: ExecutorManager::new(
118118
cluster.cluster_state(),
119-
config.executor_slots_policy,
119+
config.task_distribution,
120120
),
121121
task_manager: TaskManager::new(
122122
cluster.job_state(),
@@ -140,7 +140,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
140140
Self {
141141
executor_manager: ExecutorManager::new(
142142
cluster.cluster_state(),
143-
config.executor_slots_policy,
143+
config.task_distribution,
144144
),
145145
task_manager: TaskManager::with_launcher(
146146
cluster.job_state(),

0 commit comments

Comments
 (0)