Skip to content

Commit a55d770

Browse files
committed
Make the scheduler event loop buffer size configurable
1 parent a1473d0 commit a55d770

File tree

5 files changed

+34
-3
lines changed

5 files changed

+34
-3
lines changed

ballista/scheduler/scheduler_config_spec.toml

+6
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ type = "ballista_core::config::TaskSchedulingPolicy"
7272
doc = "The scheduing policy for the scheduler, possible values: pull-staged, push-staged. Default: pull-staged"
7373
default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"
7474

75+
[[param]]
76+
name = "event_loop_buffer_size"
77+
type = "u32"
78+
default = "10000"
79+
doc = "Event loop buffer size. Default: 10000"
80+
7581
[[param]]
7682
name = "executor_slots_policy"
7783
type = "ballista_scheduler::config::SlotsPolicy"

ballista/scheduler/src/main.rs

+5
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ async fn start_server(
7474
addr: SocketAddr,
7575
scheduling_policy: TaskSchedulingPolicy,
7676
slots_policy: SlotsPolicy,
77+
event_loop_buffer_size: usize,
7778
) -> Result<()> {
7879
info!(
7980
"Ballista v{} Scheduler listening on {:?}",
@@ -93,11 +94,13 @@ async fn start_server(
9394
slots_policy,
9495
BallistaCodec::default(),
9596
default_session_builder,
97+
event_loop_buffer_size,
9698
),
9799
_ => SchedulerServer::new(
98100
scheduler_name,
99101
config_backend.clone(),
100102
BallistaCodec::default(),
103+
event_loop_buffer_size,
101104
),
102105
};
103106

@@ -244,12 +247,14 @@ async fn main() -> Result<()> {
244247

245248
let scheduling_policy: TaskSchedulingPolicy = opt.scheduler_policy;
246249
let slots_policy: SlotsPolicy = opt.executor_slots_policy;
250+
let event_loop_buffer_size = opt.event_loop_buffer_size as usize;
247251
start_server(
248252
scheduler_name,
249253
client,
250254
addr,
251255
scheduling_policy,
252256
slots_policy,
257+
event_loop_buffer_size,
253258
)
254259
.await?;
255260
Ok(())

ballista/scheduler/src/scheduler_server/grpc.rs

+3
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,7 @@ mod test {
582582
"localhost:50050".to_owned(),
583583
state_storage.clone(),
584584
BallistaCodec::default(),
585+
10000,
585586
);
586587
scheduler.init().await?;
587588
let exec_meta = ExecutorRegistration {
@@ -667,6 +668,7 @@ mod test {
667668
"localhost:50050".to_owned(),
668669
state_storage.clone(),
669670
BallistaCodec::default(),
671+
10000,
670672
);
671673
scheduler.init().await?;
672674

@@ -746,6 +748,7 @@ mod test {
746748
"localhost:50050".to_owned(),
747749
state_storage.clone(),
748750
BallistaCodec::default(),
751+
10000,
749752
);
750753
scheduler.init().await?;
751754

ballista/scheduler/src/scheduler_server/mod.rs

+19-3
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
6868
scheduler_name: String,
6969
config: Arc<dyn StateBackendClient>,
7070
codec: BallistaCodec<T, U>,
71+
event_loop_buffer_size: usize,
7172
) -> Self {
7273
SchedulerServer::new_with_policy(
7374
scheduler_name,
@@ -76,6 +77,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
7677
SlotsPolicy::Bias,
7778
codec,
7879
default_session_builder,
80+
event_loop_buffer_size,
7981
)
8082
}
8183

@@ -84,6 +86,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
8486
config: Arc<dyn StateBackendClient>,
8587
codec: BallistaCodec<T, U>,
8688
session_builder: SessionBuilder,
89+
event_loop_buffer_size: usize,
8790
) -> Self {
8891
SchedulerServer::new_with_policy(
8992
scheduler_name,
@@ -92,6 +95,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
9295
SlotsPolicy::Bias,
9396
codec,
9497
session_builder,
98+
event_loop_buffer_size,
9599
)
96100
}
97101

@@ -102,6 +106,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
102106
slots_policy: SlotsPolicy,
103107
codec: BallistaCodec<T, U>,
104108
session_builder: SessionBuilder,
109+
event_loop_buffer_size: usize,
105110
) -> Self {
106111
let state = Arc::new(SchedulerState::new(
107112
config,
@@ -111,18 +116,27 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
111116
slots_policy,
112117
));
113118

114-
SchedulerServer::new_with_state(scheduler_name, scheduling_policy, state)
119+
SchedulerServer::new_with_state(
120+
scheduler_name,
121+
scheduling_policy,
122+
state,
123+
event_loop_buffer_size,
124+
)
115125
}
116126

117127
pub(crate) fn new_with_state(
118128
scheduler_name: String,
119129
policy: TaskSchedulingPolicy,
120130
state: Arc<SchedulerState<T, U>>,
131+
event_loop_buffer_size: usize,
121132
) -> Self {
122133
let query_stage_scheduler =
123134
Arc::new(QueryStageScheduler::new(state.clone(), policy));
124-
let query_stage_event_loop =
125-
EventLoop::new("query_stage".to_owned(), 10000, query_stage_scheduler);
135+
let query_stage_event_loop = EventLoop::new(
136+
"query_stage".to_owned(),
137+
event_loop_buffer_size,
138+
query_stage_scheduler,
139+
);
126140
Self {
127141
scheduler_name,
128142
state,
@@ -770,6 +784,7 @@ mod test {
770784
SlotsPolicy::Bias,
771785
BallistaCodec::default(),
772786
default_session_builder,
787+
10000,
773788
);
774789
scheduler.init().await?;
775790

@@ -789,6 +804,7 @@ mod test {
789804
"localhost:50050".to_owned(),
790805
TaskSchedulingPolicy::PushStaged,
791806
state,
807+
10000,
792808
);
793809
scheduler.init().await?;
794810

ballista/scheduler/src/standalone.rs

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
3838
"localhost:50050".to_owned(),
3939
Arc::new(client),
4040
BallistaCodec::default(),
41+
100000,
4142
);
4243
scheduler_server.init().await?;
4344
let server = SchedulerGrpcServer::new(scheduler_server.clone());

0 commit comments

Comments
 (0)