Skip to content

Commit e74d32b

Browse files
committed
Add grpc service for the scheduler to make it able for the job shuffle data be cleaned up to be triggered by client explicitly
1 parent 926605e commit e74d32b

File tree

4 files changed

+43
-6
lines changed

4 files changed

+43
-6
lines changed

ballista/core/proto/ballista.proto

+9
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,13 @@ message CancelJobResult {
948948
bool cancelled = 1;
949949
}
950950

951+
message CleanJobDataParams {
952+
string job_id = 1;
953+
}
954+
955+
message CleanJobDataResult {
956+
}
957+
951958
message LaunchTaskParams {
952959
// Allow to launch a task set to an executor at once
953960
repeated TaskDefinition tasks = 1;
@@ -1014,6 +1021,8 @@ service SchedulerGrpc {
10141021
rpc ExecutorStopped (ExecutorStoppedParams) returns (ExecutorStoppedResult) {}
10151022

10161023
rpc CancelJob (CancelJobParams) returns (CancelJobResult) {}
1024+
1025+
rpc CleanJobData (CleanJobDataParams) returns (CleanJobDataResult) {}
10171026
}
10181027

10191028
service ExecutorGrpc {

ballista/scheduler/src/scheduler_server/event.rs

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub enum QueryStageSchedulerEvent {
4040
JobRunningFailed(String, String),
4141
JobUpdated(String),
4242
JobCancel(String),
43+
JobDataClean(String),
4344
TaskUpdating(String, Vec<TaskStatus>),
4445
ReservationOffering(Vec<ExecutorReservation>),
4546
ExecutorLost(String, Option<String>),

ballista/scheduler/src/scheduler_server/grpc.rs

+30-6
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ use std::convert::TryInto;
2222
use ballista_core::serde::protobuf::executor_registration::OptionalHost;
2323
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
2424
use ballista_core::serde::protobuf::{
25-
executor_status, CancelJobParams, CancelJobResult, ExecuteQueryParams,
26-
ExecuteQueryResult, ExecutorHeartbeat, ExecutorStatus, ExecutorStoppedParams,
27-
ExecutorStoppedResult, GetFileMetadataParams, GetFileMetadataResult,
28-
GetJobStatusParams, GetJobStatusResult, HeartBeatParams, HeartBeatResult,
29-
PollWorkParams, PollWorkResult, RegisterExecutorParams, RegisterExecutorResult,
30-
UpdateTaskStatusParams, UpdateTaskStatusResult,
25+
executor_status, CancelJobParams, CancelJobResult, CleanJobDataParams,
26+
CleanJobDataResult, ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat,
27+
ExecutorStatus, ExecutorStoppedParams, ExecutorStoppedResult, GetFileMetadataParams,
28+
GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, HeartBeatParams,
29+
HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
30+
RegisterExecutorResult, UpdateTaskStatusParams, UpdateTaskStatusResult,
3131
};
3232
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
3333
use ballista_core::serde::AsExecutionPlan;
@@ -543,6 +543,30 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
543543
})?;
544544
Ok(Response::new(CancelJobResult { cancelled: true }))
545545
}
546+
547+
async fn clean_job_data(
548+
&self,
549+
request: Request<CleanJobDataParams>,
550+
) -> Result<Response<CleanJobDataResult>, Status> {
551+
let job_id = request.into_inner().job_id;
552+
info!("Received clean data request for job {}", job_id);
553+
554+
self.query_stage_event_loop
555+
.get_sender()
556+
.map_err(|e| {
557+
let msg = format!("Get query stage event loop error due to {:?}", e);
558+
error!("{}", msg);
559+
Status::internal(msg)
560+
})?
561+
.post_event(QueryStageSchedulerEvent::JobDataClean(job_id))
562+
.await
563+
.map_err(|e| {
564+
let msg = format!("Post to query stage event loop error due to {:?}", e);
565+
error!("{}", msg);
566+
Status::internal(msg)
567+
})?;
568+
Ok(Response::new(CleanJobDataResult {}))
569+
}
546570
}
547571

548572
#[cfg(all(test, feature = "sled"))]

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

+3
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
219219
.cancel_running_tasks(tasks)
220220
.await?
221221
}
222+
QueryStageSchedulerEvent::JobDataClean(job_id) => {
223+
self.state.executor_manager.clean_up_job_data(job_id);
224+
}
222225
}
223226

224227
Ok(())

0 commit comments

Comments
 (0)