Skip to content

Commit

Permalink
Introduce StageManager for managing tasks stage by stage (#1983)
Browse files Browse the repository at this point in the history
* Introduce StageManager

* Add unit test for both of the event-pull-based task scheduling and event-push-based task scheduling

* Fix clippy

* Fix for PR review

* Fix for PR review

* Fix clippy caused by #[cfg(test)] and #[cfg(not(test))]

Co-authored-by: yangzhong <[email protected]>
  • Loading branch information
yahoNanJing and kyotoYaho authored Mar 17, 2022
1 parent 8b249ae commit b88f103
Show file tree
Hide file tree
Showing 13 changed files with 1,844 additions and 1,190 deletions.
1 change: 1 addition & 0 deletions ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ tower = { version = "0.4" }
warp = "0.3"
parking_lot = "0.12"
async-trait = "0.1.41"
async-recursion = "1.0.0"

[dev-dependencies]
ballista-core = { path = "../core", version = "0.6.0" }
Expand Down
20 changes: 20 additions & 0 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,26 @@ impl DistributedPlanner {
}
}

/// Returns the unresolved shuffles in the execution plan
pub fn find_unresolved_shuffles(
plan: &Arc<dyn ExecutionPlan>,
) -> Result<Vec<UnresolvedShuffleExec>> {
if let Some(unresolved_shuffle) =
plan.as_any().downcast_ref::<UnresolvedShuffleExec>()
{
Ok(vec![unresolved_shuffle.clone()])
} else {
Ok(plan
.children()
.iter()
.map(find_unresolved_shuffles)
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect())
}
}

pub fn remove_unresolved_shuffles(
stage: &dyn ExecutionPlan,
partition_locations: &HashMap<usize, HashMap<usize, Vec<PartitionLocation>>>,
Expand Down
33 changes: 33 additions & 0 deletions ballista/rust/scheduler/src/scheduler_server/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;

#[derive(Clone)]
pub(crate) enum SchedulerServerEvent {
// number of offer rounds
ReviveOffers(u32),
}

#[derive(Clone)]
pub enum QueryStageSchedulerEvent {
JobSubmitted(String, Arc<dyn ExecutionPlan>),
StageFinished(String, u32),
JobFinished(String),
JobFailed(String, u32, String),
}
30 changes: 13 additions & 17 deletions ballista/rust/scheduler/src/scheduler_server/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::time::Duration;
use async_trait::async_trait;
use log::{debug, warn};

use crate::scheduler_server::event::SchedulerServerEvent;
use ballista_core::error::{BallistaError, Result};
use ballista_core::event_loop::EventAction;
use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition};
Expand All @@ -31,11 +32,6 @@ use crate::scheduler_server::ExecutorsClient;
use crate::state::task_scheduler::TaskScheduler;
use crate::state::SchedulerState;

#[derive(Clone)]
pub(crate) enum SchedulerServerEvent {
JobSubmitted(String),
}

pub(crate) struct SchedulerServerEventAction<
T: 'static + AsLogicalPlan,
U: 'static + AsExecutionPlan,
Expand All @@ -57,17 +53,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
}
}

async fn offer_resources(
&self,
job_id: String,
) -> Result<Option<SchedulerServerEvent>> {
let mut available_executors = self.state.get_available_executors_data();
#[allow(unused_variables)]
async fn offer_resources(&self, n: u32) -> Result<Option<SchedulerServerEvent>> {
let mut available_executors =
self.state.executor_manager.get_available_executors_data();
// In case of there's no enough resources, reschedule the tasks of the job
if available_executors.is_empty() {
// TODO Maybe it's better to use an exclusive runtime for this kind task scheduling
warn!("Not enough available executors for task running");
tokio::time::sleep(Duration::from_millis(100)).await;
return Ok(Some(SchedulerServerEvent::JobSubmitted(job_id)));
return Ok(Some(SchedulerServerEvent::ReviveOffers(1)));
}

let mut executors_data_change: Vec<ExecutorDataChange> = available_executors
Expand All @@ -80,7 +75,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>

let (tasks_assigment, num_tasks) = self
.state
.fetch_tasks(&mut available_executors, &job_id)
.fetch_schedulable_tasks(&mut available_executors, n)
.await?;
for (data_change, data) in executors_data_change
.iter_mut()
Expand All @@ -90,6 +85,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
data.available_task_slots as i32 - data_change.task_slots;
}

#[cfg(not(test))]
if num_tasks > 0 {
self.launch_tasks(&executors_data_change, tasks_assigment)
.await?;
Expand All @@ -98,6 +94,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
Ok(None)
}

#[allow(dead_code)]
async fn launch_tasks(
&self,
executors: &[ExecutorDataChange],
Expand Down Expand Up @@ -132,10 +129,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.unwrap()
.clone()
};
// Update the resources first
self.state.update_executor_data(executor_data_change);
// TODO check whether launching task is successful or not
client.launch_task(LaunchTaskParams { task: tasks }).await?;
self.state
.executor_manager
.update_executor_data(executor_data_change);
} else {
// Since the task assignment policy is round robin,
// if find tasks for one executor is empty, just break fast
Expand All @@ -162,9 +160,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
event: SchedulerServerEvent,
) -> Result<Option<SchedulerServerEvent>> {
match event {
SchedulerServerEvent::JobSubmitted(job_id) => {
self.offer_resources(job_id).await
}
SchedulerServerEvent::ReviveOffers(n) => self.offer_resources(n).await,
}
}

Expand Down
10 changes: 1 addition & 9 deletions ballista/rust/scheduler/src/scheduler_server/external_scaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::scheduler_server::externalscaler::{
GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue, ScaledObjectRef,
};
use crate::scheduler_server::SchedulerServer;
use ballista_core::serde::protobuf::task_status;
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
use log::debug;
use tonic::{Request, Response};
Expand All @@ -35,14 +34,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
&self,
_request: Request<ScaledObjectRef>,
) -> Result<Response<IsActiveResponse>, tonic::Status> {
let tasks = self.state.get_all_tasks();
let result = tasks.iter().any(|task| {
!matches!(
task.status,
Some(task_status::Status::Completed(_))
| Some(task_status::Status::Failed(_))
)
});
let result = self.state.stage_manager.has_running_tasks();
debug!("Are there active tasks? {}", result);
Ok(Response::new(IsActiveResponse { result }))
}
Expand Down
Loading

0 comments on commit b88f103

Please sign in to comment.