Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stuck issue for the load testing of Push-based task scheduling #2006

Merged
merged 4 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ pub struct ExecutorData {
pub available_task_slots: u32,
}

pub struct ExecutorDataChange {
pub executor_id: String,
pub task_slots: i32,
}

struct ExecutorResourcePair {
total: protobuf::executor_resource::Resource,
available: protobuf::executor_resource::Resource,
Expand Down
33 changes: 26 additions & 7 deletions ballista/rust/scheduler/src/scheduler_server/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use log::{debug, warn};
use ballista_core::error::{BallistaError, Result};
use ballista_core::event_loop::EventAction;
use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition};
use ballista_core::serde::scheduler::ExecutorData;
use ballista_core::serde::scheduler::ExecutorDataChange;
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};

use crate::scheduler_server::ExecutorsClient;
Expand Down Expand Up @@ -70,12 +70,28 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
return Ok(Some(SchedulerServerEvent::JobSubmitted(job_id)));
}

let mut executors_data_change: Vec<ExecutorDataChange> = available_executors
.iter()
.map(|executor_data| ExecutorDataChange {
executor_id: executor_data.executor_id.clone(),
task_slots: executor_data.available_task_slots as i32,
})
.collect();

let (tasks_assigment, num_tasks) = self
.state
.fetch_tasks(&mut available_executors, &job_id)
.await?;
for (data_change, data) in executors_data_change
.iter_mut()
.zip(available_executors.iter())
{
data_change.task_slots =
data.available_task_slots as i32 - data_change.task_slots;
}

if num_tasks > 0 {
self.launch_tasks(&available_executors, tasks_assigment)
self.launch_tasks(&executors_data_change, tasks_assigment)
.await?;
}

Expand All @@ -84,12 +100,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>

async fn launch_tasks(
&self,
executors: &[ExecutorData],
executors: &[ExecutorDataChange],
tasks_assigment: Vec<Vec<TaskDefinition>>,
) -> Result<()> {
for (idx_executor, tasks) in tasks_assigment.into_iter().enumerate() {
if !tasks.is_empty() {
let executor_data = &executors[idx_executor];
let executor_data_change = &executors[idx_executor];
debug!(
"Start to launch tasks {:?} to executor {:?}",
tasks
Expand All @@ -107,14 +123,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
}
})
.collect::<Vec<String>>(),
executor_data.executor_id
executor_data_change.executor_id
);
let mut client = {
let clients = self.executors_client.read().await;
clients.get(&executor_data.executor_id).unwrap().clone()
clients
.get(&executor_data_change.executor_id)
.unwrap()
.clone()
};
// Update the resources first
self.state.save_executor_data(executor_data.clone());
self.state.update_executor_data(executor_data_change);
// TODO check whether launching task is successful or not
client.launch_task(LaunchTaskParams { task: tasks }).await?;
} else {
Expand Down
13 changes: 9 additions & 4 deletions ballista/rust/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use ballista_core::serde::protobuf::{
TaskDefinition, UpdateTaskStatusParams, UpdateTaskStatusResult,
};
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use ballista_core::serde::scheduler::{
ExecutorData, ExecutorDataChange, ExecutorMetadata,
};
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
Expand Down Expand Up @@ -290,9 +292,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
jobs.insert(task_id.job_id.clone());
}
}
if let Some(mut executor_data) = self.state.get_executor_data(&executor_id) {
executor_data.available_task_slots += num_tasks as u32;
self.state.save_executor_data(executor_data);

if let Some(executor_data) = self.state.get_executor_data(&executor_id) {
self.state.update_executor_data(&ExecutorDataChange {
executor_id: executor_data.executor_id,
task_slots: num_tasks as i32,
});
} else {
error!("Fail to get executor data for {:?}", &executor_id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ use std::sync::Arc;
use std::time::Instant;

use async_trait::async_trait;
use log::{debug, error, info};
use log::{debug, error, info, warn};
use tokio::sync::RwLock;

use ballista_core::error::{BallistaError, Result};
use ballista_core::event_loop::{EventAction, EventSender};
use ballista_core::serde::protobuf::{PartitionId, TaskStatus};
use ballista_core::serde::protobuf::{
job_status, JobStatus, PartitionId, RunningJob, TaskStatus,
};
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -160,7 +162,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
) -> Result<Option<QueryStageSchedulerEvent>> {
match event {
QueryStageSchedulerEvent::JobSubmitted(job_id, plan) => {
info!("Job {} submitted", job_id);
let plan = self.create_physical_plan(plan).await?;
if let Err(e) = self
.state
.save_job_metadata(
&job_id,
&JobStatus {
status: Some(job_status::Status::Running(RunningJob {})),
},
)
.await
{
warn!("Could not update job {} status to running: {}", job_id, e);
}
self.generate_stages(&job_id, plan).await?;

if let Some(event_sender) = self.event_sender.as_ref() {
Expand Down
33 changes: 31 additions & 2 deletions ballista/rust/scheduler/src/state/in_memory_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

use ballista_core::serde::protobuf::{ExecutorHeartbeat, TaskStatus};
use ballista_core::serde::scheduler::ExecutorData;
use ballista_core::serde::scheduler::{ExecutorData, ExecutorDataChange};
use log::{error, info, warn};
use parking_lot::RwLock;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
Expand Down Expand Up @@ -85,6 +86,33 @@ impl InMemorySchedulerState {
executors_data.insert(executor_data.executor_id.clone(), executor_data);
}

pub(crate) fn update_executor_data(&self, executor_data_change: &ExecutorDataChange) {
let mut executors_data = self.executors_data.write();
if let Some(executor_data) =
executors_data.get_mut(&executor_data_change.executor_id)
{
let available_task_slots = executor_data.available_task_slots as i32
+ executor_data_change.task_slots;
if available_task_slots < 0 {
error!(
"Available task slots {} for executor {} is less than 0",
available_task_slots, executor_data.executor_id
);
} else {
info!(
"available_task_slots for executor {} becomes {}",
executor_data.executor_id, available_task_slots
);
executor_data.available_task_slots = available_task_slots as u32;
}
} else {
warn!(
"Could not find executor data for {}",
executor_data_change.executor_id
);
}
}

pub(crate) fn get_executor_data(&self, executor_id: &str) -> Option<ExecutorData> {
let executors_data = self.executors_data.read();
executors_data.get(executor_id).cloned()
Expand All @@ -100,7 +128,8 @@ impl InMemorySchedulerState {
executors_data
.iter()
.filter_map(|(exec, data)| {
alive_executors.contains(exec).then(|| data.clone())
(data.available_task_slots > 0 && alive_executors.contains(exec))
.then(|| data.clone())
})
.collect::<Vec<ExecutorData>>()
};
Expand Down
7 changes: 6 additions & 1 deletion ballista/rust/scheduler/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use ballista_core::serde::protobuf::{
FailedTask, JobStatus, RunningJob, RunningTask, TaskStatus,
};
use ballista_core::serde::scheduler::{
ExecutorData, ExecutorMetadata, PartitionId, PartitionStats,
ExecutorData, ExecutorDataChange, ExecutorMetadata, PartitionId, PartitionStats,
};
use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec};
use datafusion::prelude::ExecutionContext;
Expand Down Expand Up @@ -227,6 +227,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
self.in_memory_state.save_executor_data(executor_data);
}

pub fn update_executor_data(&self, executor_data_change: &ExecutorDataChange) {
self.in_memory_state
.update_executor_data(executor_data_change);
}

pub fn get_available_executors_data(&self) -> Vec<ExecutorData> {
self.in_memory_state.get_available_executors_data()
}
Expand Down