Skip to content

Commit 6f03e7c

Browse files
committed
Add grpc service for the scheduler to make it able for the job shuffle data be cleaned up to be triggered by client explicitly
1 parent 0ebca57 commit 6f03e7c

File tree

4 files changed

+46
-6
lines changed

4 files changed

+46
-6
lines changed

ballista/core/proto/ballista.proto

+9
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,13 @@ message CancelJobResult {
948948
bool cancelled = 1;
949949
}
950950

951+
message CleanJobDataParams {
952+
string job_id = 1;
953+
}
954+
955+
message CleanJobDataResult {
956+
}
957+
951958
message LaunchTaskParams {
952959
// Allow to launch a task set to an executor at once
953960
repeated TaskDefinition tasks = 1;
@@ -1014,6 +1021,8 @@ service SchedulerGrpc {
10141021
rpc ExecutorStopped (ExecutorStoppedParams) returns (ExecutorStoppedResult) {}
10151022

10161023
rpc CancelJob (CancelJobParams) returns (CancelJobResult) {}
1024+
1025+
rpc CleanJobData (CleanJobDataParams) returns (CleanJobDataResult) {}
10171026
}
10181027

10191028
service ExecutorGrpc {

ballista/scheduler/src/scheduler_server/event.rs

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub enum QueryStageSchedulerEvent {
4040
JobRunningFailed(String, String),
4141
JobUpdated(String),
4242
JobCancel(String),
43+
JobDataClean(String),
4344
TaskUpdating(String, Vec<TaskStatus>),
4445
ReservationOffering(Vec<ExecutorReservation>),
4546
ExecutorLost(String, Option<String>),

ballista/scheduler/src/scheduler_server/grpc.rs

+30-6
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ use std::convert::TryInto;
2424
use ballista_core::serde::protobuf::executor_registration::OptionalHost;
2525
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
2626
use ballista_core::serde::protobuf::{
27-
executor_status, CancelJobParams, CancelJobResult, ExecuteQueryParams,
28-
ExecuteQueryResult, ExecutorHeartbeat, ExecutorStatus, ExecutorStoppedParams,
29-
ExecutorStoppedResult, GetFileMetadataParams, GetFileMetadataResult,
30-
GetJobStatusParams, GetJobStatusResult, HeartBeatParams, HeartBeatResult,
31-
PollWorkParams, PollWorkResult, RegisterExecutorParams, RegisterExecutorResult,
32-
UpdateTaskStatusParams, UpdateTaskStatusResult,
27+
executor_status, CancelJobParams, CancelJobResult, CleanJobDataParams,
28+
CleanJobDataResult, ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat,
29+
ExecutorStatus, ExecutorStoppedParams, ExecutorStoppedResult, GetFileMetadataParams,
30+
GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, HeartBeatParams,
31+
HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
32+
RegisterExecutorResult, UpdateTaskStatusParams, UpdateTaskStatusResult,
3333
};
3434
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
3535
use ballista_core::serde::AsExecutionPlan;
@@ -545,6 +545,30 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
545545
})?;
546546
Ok(Response::new(CancelJobResult { cancelled: true }))
547547
}
548+
549+
async fn clean_job_data(
550+
&self,
551+
request: Request<CleanJobDataParams>,
552+
) -> Result<Response<CleanJobDataResult>, Status> {
553+
let job_id = request.into_inner().job_id;
554+
info!("Received clean data request for job {}", job_id);
555+
556+
self.query_stage_event_loop
557+
.get_sender()
558+
.map_err(|e| {
559+
let msg = format!("Get query stage event loop error due to {:?}", e);
560+
error!("{}", msg);
561+
Status::internal(msg)
562+
})?
563+
.post_event(QueryStageSchedulerEvent::JobDataClean(job_id))
564+
.await
565+
.map_err(|e| {
566+
let msg = format!("Post to query stage event loop error due to {:?}", e);
567+
error!("{}", msg);
568+
Status::internal(msg)
569+
})?;
570+
Ok(Response::new(CleanJobDataResult {}))
571+
}
548572
}
549573

550574
#[cfg(all(test, feature = "sled"))]

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

+6
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
224224
.cancel_running_tasks(tasks)
225225
.await?
226226
}
227+
QueryStageSchedulerEvent::JobDataClean(job_id) => {
228+
let executor_manager = self.state.executor_manager.clone();
229+
tokio::spawn(async move {
230+
executor_manager.clean_up_job_data(job_id).await;
231+
});
232+
}
227233
}
228234

229235
Ok(())

0 commit comments

Comments
 (0)