From e74d32bad909b574cce4aba0719db5c57ca737f5 Mon Sep 17 00:00:00 2001
From: yangzhong <yangzhong@ebay.com>
Date: Mon, 31 Oct 2022 11:46:40 +0800
Subject: [PATCH] Add grpc service for the scheduler to make it able for the
 job shuffle data be cleaned up to be triggered by client explicitly

---
 ballista/core/proto/ballista.proto            |  9 +++++
 .../scheduler/src/scheduler_server/event.rs   |  1 +
 .../scheduler/src/scheduler_server/grpc.rs    | 36 +++++++++++++++----
 .../scheduler_server/query_stage_scheduler.rs |  3 ++
 4 files changed, 43 insertions(+), 6 deletions(-)

diff --git a/ballista/core/proto/ballista.proto b/ballista/core/proto/ballista.proto
index b5992f030..a38bf0ad4 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -948,6 +948,13 @@ message CancelJobResult {
   bool cancelled = 1;
 }
 
+message CleanJobDataParams {
+  string job_id = 1;
+}
+
+message CleanJobDataResult {
+}
+
 message LaunchTaskParams {
   // Allow to launch a task set to an executor at once
   repeated TaskDefinition tasks = 1;
@@ -1014,6 +1021,8 @@ service SchedulerGrpc {
   rpc ExecutorStopped (ExecutorStoppedParams) returns (ExecutorStoppedResult) {}
 
   rpc CancelJob (CancelJobParams) returns (CancelJobResult) {}
+
+  rpc CleanJobData (CleanJobDataParams) returns (CleanJobDataResult) {}
 }
 
 service ExecutorGrpc {
diff --git a/ballista/scheduler/src/scheduler_server/event.rs b/ballista/scheduler/src/scheduler_server/event.rs
index 544976cdb..cb1dd8e93 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -40,6 +40,7 @@ pub enum QueryStageSchedulerEvent {
     JobRunningFailed(String, String),
     JobUpdated(String),
     JobCancel(String),
+    JobDataClean(String),
     TaskUpdating(String, Vec<TaskStatus>),
     ReservationOffering(Vec<ExecutorReservation>),
     ExecutorLost(String, Option<String>),
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index 73a1c64f4..42805672d 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -22,12 +22,12 @@ use std::convert::TryInto;
 use ballista_core::serde::protobuf::executor_registration::OptionalHost;
 use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
 use ballista_core::serde::protobuf::{
-    executor_status, CancelJobParams, CancelJobResult, ExecuteQueryParams,
-    ExecuteQueryResult, ExecutorHeartbeat, ExecutorStatus, ExecutorStoppedParams,
-    ExecutorStoppedResult, GetFileMetadataParams, GetFileMetadataResult,
-    GetJobStatusParams, GetJobStatusResult, HeartBeatParams, HeartBeatResult,
-    PollWorkParams, PollWorkResult, RegisterExecutorParams, RegisterExecutorResult,
-    UpdateTaskStatusParams, UpdateTaskStatusResult,
+    executor_status, CancelJobParams, CancelJobResult, CleanJobDataParams,
+    CleanJobDataResult, ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat,
+    ExecutorStatus, ExecutorStoppedParams, ExecutorStoppedResult, GetFileMetadataParams,
+    GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, HeartBeatParams,
+    HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
+    RegisterExecutorResult, UpdateTaskStatusParams, UpdateTaskStatusResult,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
 use ballista_core::serde::AsExecutionPlan;
@@ -543,6 +543,30 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
             })?;
         Ok(Response::new(CancelJobResult { cancelled: true }))
     }
+
+    async fn clean_job_data(
+        &self,
+        request: Request<CleanJobDataParams>,
+    ) -> Result<Response<CleanJobDataResult>, Status> {
+        let job_id = request.into_inner().job_id;
+        info!("Received clean data request for job {}", job_id);
+
+        self.query_stage_event_loop
+            .get_sender()
+            .map_err(|e| {
+                let msg = format!("Get query stage event loop error due to {:?}", e);
+                error!("{}", msg);
+                Status::internal(msg)
+            })?
+            .post_event(QueryStageSchedulerEvent::JobDataClean(job_id))
+            .await
+            .map_err(|e| {
+                let msg = format!("Post to query stage event loop error due to {:?}", e);
+                error!("{}", msg);
+                Status::internal(msg)
+            })?;
+        Ok(Response::new(CleanJobDataResult {}))
+    }
 }
 
 #[cfg(all(test, feature = "sled"))]
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 5c31bdaa3..f49cae467 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -219,6 +219,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                     .cancel_running_tasks(tasks)
                     .await?
             }
+            QueryStageSchedulerEvent::JobDataClean(job_id) => {
+                self.state.executor_manager.clean_up_job_data(job_id);
+            }
         }
 
         Ok(())