From 04b492ac19ce21344a9e8c516fd6735a264af6a4 Mon Sep 17 00:00:00 2001 From: Aaron Siddhartha Mondal Date: Fri, 14 Jul 2023 22:54:42 +0200 Subject: [PATCH] Create WaitExecution for ExecutationServer. Goma requires WaitExecution to function correctly. Implement this and store recently completed actions. --- cas/grpc_service/execution_server.rs | 68 ++++++++--- cas/scheduler/BUILD | 1 + cas/scheduler/cache_lookup_scheduler.rs | 48 ++++++-- cas/scheduler/grpc_scheduler.rs | 88 ++++++++------ cas/scheduler/scheduler.rs | 3 + cas/scheduler/simple_scheduler.rs | 108 ++++++++++++++---- .../tests/cache_lookup_scheduler_test.rs | 13 +++ cas/scheduler/tests/utils/mock_scheduler.rs | 28 +++++ 8 files changed, 279 insertions(+), 78 deletions(-) diff --git a/cas/grpc_service/execution_server.rs b/cas/grpc_service/execution_server.rs index c1425cf47e..c07e4584f6 100644 --- a/cas/grpc_service/execution_server.rs +++ b/cas/grpc_service/execution_server.rs @@ -17,13 +17,14 @@ use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use futures::{Stream, StreamExt}; +use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; use rand::{thread_rng, Rng}; +use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tonic::{Request, Response, Status}; use ac_utils::get_and_decode_digest; -use action_messages::{ActionInfo, ActionInfoHashKey, DEFAULT_EXECUTION_PRIORITY}; +use action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY}; use common::{log, DigestInfo}; use config::cas_server::{ExecutionConfig, InstanceName}; use error::{make_input_err, Error, ResultExt}; @@ -163,6 +164,14 @@ impl ExecutionServer { Server::new(self) } + fn to_execute_stream(receiver: watch::Receiver>) -> Response { + let receiver_stream = Box::pin(WatchStream::new(receiver).map(|action_update| { + log::info!("\x1b[0;31mexecute Resp Stream\x1b[0m: {:?}", action_update); + Ok(action_update.as_ref().clone().into()) + })); + tonic::Response::new(receiver_stream) + } + async fn inner_execute(&self, request: Request) -> Result, Error> { let execute_req = request.into_inner(); let instance_name = execute_req.instance_name; @@ -197,11 +206,41 @@ impl ExecutionServer { .await .err_tip(|| "Failed to schedule task")?; - let receiver_stream = Box::pin(WatchStream::new(rx).map(|action_update| { - log::info!("\x1b[0;31mexecute Resp Stream\x1b[0m: {:?}", action_update); - Ok(action_update.as_ref().clone().into()) - })); - Ok(tonic::Response::new(receiver_stream)) + Ok(Self::to_execute_stream(rx)) + } + + async fn filter_actions<'a, Fut, F, G, R, S>(&'a self, operation: F, mut filter: G) -> Result + where + F: FnMut(&'a InstanceInfo) -> Fut, + G: FnMut(R) -> Option, + Fut: Future, + { + // Very innefficient having to search through all instances, however + // the protobuf spec doesn't state that the instance_name can be + // retrieved from the path. + let mut result_stream = self + .instance_infos + .values() + .map(operation) + .collect::>(); + while let Some(result) = result_stream.next().await { + if let Some(result) = filter(result) { + return Ok(result); + } + } + Err(Status::not_found("Failed to find existing task")) + } + + async fn inner_wait_execution( + &self, + request: Request, + ) -> Result, Status> { + let name = request.into_inner().name; + self.filter_actions( + |instance_info| instance_info.scheduler.find_existing_action(&name), + |result| result.map(Self::to_execute_stream), + ) + .await } } @@ -227,14 +266,11 @@ impl Execution for ExecutionServer { resp } - type WaitExecutionStream = Pin> + Send + Sync + 'static>>; - async fn wait_execution( - &self, - request: Request, - ) -> Result, Status> { - use stdext::function_name; - let output = format!("{} not yet implemented", function_name!()); - println!("{:?}", request); - Err(Status::unimplemented(output)) + type WaitExecutionStream = ExecuteStream; + async fn wait_execution(&self, request: Request) -> Result, Status> { + self.inner_wait_execution(request) + .await + .err_tip(|| "Failed on wait_execution() command") + .map_err(|e| e.into()) } } diff --git a/cas/scheduler/BUILD b/cas/scheduler/BUILD index fbee22706b..f35a39e7c1 100644 --- a/cas/scheduler/BUILD +++ b/cas/scheduler/BUILD @@ -109,6 +109,7 @@ rust_library( "//util:common", "//util:error", "@crate_index//:futures", + "@crate_index//:parking_lot", "@crate_index//:tonic", "@crate_index//:tokio", "@crate_index//:tokio-stream", diff --git a/cas/scheduler/cache_lookup_scheduler.rs b/cas/scheduler/cache_lookup_scheduler.rs index d9be27e1e6..c854684a51 100644 --- a/cas/scheduler/cache_lookup_scheduler.rs +++ b/cas/scheduler/cache_lookup_scheduler.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -26,6 +27,7 @@ use action_messages::{ActionInfo, ActionResult, ActionStage, ActionState}; use common::DigestInfo; use error::Error; use grpc_store::GrpcStore; +use parking_lot::Mutex; use platform_property_manager::PlatformPropertyManager; use proto::build::bazel::remote::execution::v2::{ ActionResult as ProtoActionResult, FindMissingBlobsRequest, GetActionResultRequest, @@ -33,6 +35,11 @@ use proto::build::bazel::remote::execution::v2::{ use scheduler::ActionScheduler; use store::Store; +/// Actions that are having their cache checked or failed cache lookup and are +/// being forwarded upstream. Missing the skip_cache_check actions which are +/// forwarded directly. +type CheckActions = HashMap>>>; + pub struct CacheLookupScheduler { /// A reference to the CAS which is used to validate all the outputs of a /// cached ActionResult still exist. @@ -42,6 +49,8 @@ pub struct CacheLookupScheduler { /// The "real" scheduler to use to perform actions if they were not found /// in the action cache. action_scheduler: Arc, + /// Actions that are currently performing a CacheCheck. + cache_check_actions: Arc>, } async fn get_action_from_store( @@ -122,6 +131,7 @@ impl CacheLookupScheduler { cas_store, ac_store, action_scheduler, + cache_check_actions: Default::default(), }) } } @@ -148,9 +158,12 @@ impl ActionScheduler for CacheLookupScheduler { action_digest: *action_digest, }); let (tx, rx) = watch::channel(current_state.clone()); + let tx = Arc::new(tx); + self.cache_check_actions.lock().insert(name.clone(), tx.clone()); let ac_store = self.ac_store.clone(); let cas_store = self.cas_store.clone(); let action_scheduler = self.action_scheduler.clone(); + let cache_check_actions = self.cache_check_actions.clone(); tokio::spawn(async move { let instance_name = action_info.instance_name.to_string(); if let Some(proto_action_result) = @@ -160,24 +173,43 @@ impl ActionScheduler for CacheLookupScheduler { // Found in the cache, return the result immediately. Arc::make_mut(&mut current_state).stage = ActionStage::CompletedFromCache(proto_action_result); let _ = tx.send(current_state); + cache_check_actions.lock().remove(&name); return; } } // Not in cache, forward to upstream and proxy state. - let mut watch_stream = match action_scheduler.add_action(name, action_info).await { - Ok(rx) => WatchStream::new(rx), + match action_scheduler.add_action(name.clone(), action_info).await { + Ok(rx) => { + let mut watch_stream = WatchStream::new(rx); + while let Some(action_state) = watch_stream.next().await { + if tx.send(action_state).is_err() { + break; + } + } + } Err(err) => { Arc::make_mut(&mut current_state).stage = ActionStage::Error((err, ActionResult::default())); let _ = tx.send(current_state); - return; - } - }; - while let Some(action_state) = watch_stream.next().await { - if tx.send(action_state).is_err() { - break; } } + cache_check_actions.lock().remove(&name); }); Ok(rx) } + + async fn find_existing_action(&self, name: &str) -> Option>> { + { + let cache_check_actions = self.cache_check_actions.lock(); + if let Some(tx) = cache_check_actions.get(name) { + let current_value = tx.borrow(); + // Subscribe marks the current value as seen, so we have to + // re-send it to all receivers. + let rx = tx.subscribe(); + let _ = tx.send(current_value.clone()); + return Some(rx); + } + } + // Cache skipped may be in the upstream scheduler. + self.action_scheduler.find_existing_action(name).await + } } diff --git a/cas/scheduler/grpc_scheduler.rs b/cas/scheduler/grpc_scheduler.rs index d9278dec50..d8d7085cef 100644 --- a/cas/scheduler/grpc_scheduler.rs +++ b/cas/scheduler/grpc_scheduler.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; use parking_lot::Mutex; use tokio::sync::watch; -use tonic::{transport, Request}; +use tonic::{transport, Request, Streaming}; use action_messages::{ActionInfo, ActionState, DEFAULT_EXECUTION_PRIORITY}; use common::log; @@ -26,28 +26,55 @@ use error::{make_err, Code, Error, ResultExt}; use platform_property_manager::PlatformPropertyManager; use proto::build::bazel::remote::execution::v2::{ capabilities_client::CapabilitiesClient, execution_client::ExecutionClient, ExecuteRequest, ExecutionPolicy, - GetCapabilitiesRequest, + GetCapabilitiesRequest, WaitExecutionRequest, }; +use proto::google::longrunning::Operation; use scheduler::ActionScheduler; pub struct GrpcScheduler { - endpoint: transport::Channel, + capabilities_client: CapabilitiesClient, + execution_client: ExecutionClient, platform_property_managers: Mutex>>, } impl GrpcScheduler { pub async fn new(config: &config::schedulers::GrpcScheduler) -> Result { - let endpoint = transport::Endpoint::new(config.endpoint.clone()) - .err_tip(|| format!("Could not parse {} in GrpcScheduler", config.endpoint))? - .connect() - .await - .err_tip(|| format!("Could not connect to {} in GrpcScheduler", config.endpoint))?; + let endpoint = transport::Channel::balance_list(std::iter::once( + transport::Endpoint::new(config.endpoint.clone()) + .err_tip(|| format!("Could not parse {} in GrpcScheduler", config.endpoint))?, + )); Ok(Self { - endpoint, + capabilities_client: CapabilitiesClient::new(endpoint.clone()), + execution_client: ExecutionClient::new(endpoint), platform_property_managers: Mutex::new(HashMap::new()), }) } + + async fn stream_state(mut result_stream: Streaming) -> Result>, Error> { + if let Some(initial_response) = result_stream + .message() + .await + .err_tip(|| "Recieving response from upstream scheduler")? + { + let (tx, rx) = watch::channel(Arc::new(initial_response.try_into()?)); + tokio::spawn(async move { + while let Ok(Some(response)) = result_stream.message().await { + match response.try_into() { + Ok(response) => { + if let Err(err) = tx.send(Arc::new(response)) { + log::info!("Client disconnected in GrpcScheduler: {}", err); + return; + } + } + Err(err) => log::error!("Error converting response to watch in GrpcScheduler: {}", err), + } + } + }); + return Ok(rx); + } + Err(make_err!(Code::Internal, "Upstream scheduler didn't accept action.")) + } } #[async_trait] @@ -58,8 +85,9 @@ impl ActionScheduler for GrpcScheduler { } // Not in the cache, lookup the capabilities with the upstream. - let mut capabilities_client = CapabilitiesClient::new(self.endpoint.clone()); - let capabilities = capabilities_client + let capabilities = self + .capabilities_client + .clone() .get_capabilities(GetCapabilitiesRequest { instance_name: instance_name.to_string(), }) @@ -101,32 +129,26 @@ impl ActionScheduler for GrpcScheduler { // TODO: Get me from the original request, not very important as we ignore it. results_cache_policy: None, }; - let mut result_stream = ExecutionClient::new(self.endpoint.clone()) + let result_stream = self + .execution_client + .clone() .execute(Request::new(request)) .await .err_tip(|| "Sending action to upstream scheduler")? .into_inner(); - if let Some(initial_response) = result_stream - .message() - .await - .err_tip(|| "Recieving response from upstream scheduler")? - { - let (tx, rx) = watch::channel(Arc::new(initial_response.try_into()?)); - tokio::spawn(async move { - while let Ok(Some(response)) = result_stream.message().await { - match response.try_into() { - Ok(response) => { - if let Err(err) = tx.send(Arc::new(response)) { - log::info!("Client disconnected in GrpcScheduler: {}", err); - return; - } - } - Err(err) => log::error!("Error converting response to watch in GrpcScheduler: {}", err), - } - } - }); - return Ok(rx); + Self::stream_state(result_stream).await + } + + async fn find_existing_action(&self, name: &str) -> Option>> { + let request = WaitExecutionRequest { name: name.to_string() }; + let result_stream = self + .execution_client + .clone() + .wait_execution(Request::new(request)) + .await; + if result_stream.is_err() { + return None; } - Err(make_err!(Code::Internal, "Upstream scheduler didn't accept action.")) + Self::stream_state(result_stream.unwrap().into_inner()).await.ok() } } diff --git a/cas/scheduler/scheduler.rs b/cas/scheduler/scheduler.rs index 2fcc3f237c..d6a30d7dc8 100644 --- a/cas/scheduler/scheduler.rs +++ b/cas/scheduler/scheduler.rs @@ -35,6 +35,9 @@ pub trait ActionScheduler: Sync + Send + Unpin { name: String, action_info: ActionInfo, ) -> Result>, Error>; + + /// Find an existing action by its name. + async fn find_existing_action(&self, name: &str) -> Option>>; } /// WorkerScheduler interface is responsible for interactions between the scheduler diff --git a/cas/scheduler/simple_scheduler.rs b/cas/scheduler/simple_scheduler.rs index 2f811071c1..c7c796d6f5 100644 --- a/cas/scheduler/simple_scheduler.rs +++ b/cas/scheduler/simple_scheduler.rs @@ -15,6 +15,7 @@ use std::cmp; use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; +use std::time::SystemTime; use async_trait::async_trait; use futures::Future; @@ -159,6 +160,10 @@ struct SimpleSchedulerImpl { queued_actions: BTreeMap, AwaitedAction>, workers: Workers, active_actions: HashMap, RunningAction>, + // These actions completed recently but had no listener, they might have + // completed while the caller was thinking about calling wait_execution, so + // keep their completion state around for a while to send back. + recently_completed_actions: Vec>, /// Timeout of how long to evict workers if no response in this given amount of time in seconds. worker_timeout_s: u64, /// Default times a job can retry before failing. @@ -168,6 +173,15 @@ struct SimpleSchedulerImpl { } impl SimpleSchedulerImpl { + fn subscribe_to_channel(awaited_action: &AwaitedAction) -> watch::Receiver> { + let rx = awaited_action.notify_channel.subscribe(); + awaited_action + .notify_channel + .send(awaited_action.current_state.clone()) + .unwrap(); + rx + } + /// Attempts to find a worker to execute an action and begins executing it. /// If an action is already running that is cacheable it may merge this action /// with the results and state changes of the already running action. @@ -181,13 +195,7 @@ impl SimpleSchedulerImpl { ) -> Result>, Error> { // Check to see if the action is running, if it is and cacheable, merge the actions. if let Some(running_action) = self.active_actions.get_mut(&action_info) { - let rx = running_action.action.notify_channel.subscribe(); - running_action - .action - .notify_channel - .send(running_action.action.current_state.clone()) - .unwrap(); - return Ok(rx); + return Ok(Self::subscribe_to_channel(&running_action.action)); } // Check to see if the action is queued, if it is and cacheable, merge the actions. @@ -205,10 +213,7 @@ impl SimpleSchedulerImpl { Arc::make_mut(&mut arc_action_info).priority = new_priority; let rx = queued_action.notify_channel.subscribe(); - queued_action - .notify_channel - .send(queued_action.current_state.clone()) - .unwrap(); + let _ = queued_action.notify_channel.send(queued_action.current_state.clone()); // Even if we fail to send our action to the client, we need to add this action back to the // queue because it was remove earlier. @@ -219,7 +224,6 @@ impl SimpleSchedulerImpl { // Action needs to be added to queue or is not cacheable. let action_info = Arc::new(action_info); - let action_digest = *action_info.digest(); // TODO(allada) This name field needs to be indexable. The client might perform operations // based on the name later. It cannot be the same index used as the workers though, because @@ -228,7 +232,7 @@ impl SimpleSchedulerImpl { let current_state = Arc::new(ActionState { name, stage: ActionStage::Queued, - action_digest, + action_digest: *action_info.digest(), }); let (tx, rx) = watch::channel(current_state.clone()); @@ -248,6 +252,56 @@ impl SimpleSchedulerImpl { Ok(rx) } + fn clean_recently_completed_actions(&mut self) { + // Keep for a minute maximum. + let expiry_time = SystemTime::now().checked_sub(Duration::new(60, 0)).unwrap(); + self.recently_completed_actions.retain(|action| match &action.stage { + ActionStage::Completed(state) => { + if state.execution_metadata.worker_completed_timestamp > expiry_time { + true + } else { + log::warn!( + "Action {} has no more listeners during update_action()", + action.action_digest.str() + ); + false + } + } + _ => false, + }); + } + + fn find_recently_completed_action(&mut self, name: &str) -> Option>> { + let rx = self + .recently_completed_actions + .iter() + .position(|state| state.name == name) + .map(|index| watch::channel(self.recently_completed_actions.swap_remove(index)).1); + + // Clean up the actions list. + self.clean_recently_completed_actions(); + + rx + } + + // Gets an existing action by its name, very noddy implementation that just searches for it everywhere. + fn find_existing_action(&self, name: &str) -> Option>> { + self.queued_actions + .values() + .chain( + self.active_actions + .values() + .map(|running_action| &running_action.action), + ) + .find_map(|awaited_action| { + if awaited_action.current_state.name == name { + Some(Self::subscribe_to_channel(awaited_action)) + } else { + None + } + }) + } + fn retry_action(&mut self, action_info: &Arc, worker_id: &WorkerId) { match self.active_actions.remove(action_info) { Some(running_action) => { @@ -455,18 +509,24 @@ impl SimpleSchedulerImpl { .action .notify_channel .send(running_action.action.current_state.clone()); - if send_result.is_err() { - log::warn!( - "Action {} has no more listeners during update_action()", - action_info.digest().str() - ); - } if !running_action.action.current_state.stage.is_finished() { + if send_result.is_err() { + log::warn!( + "Action {} has no more listeners during update_action()", + action_info.digest().str() + ); + } // If the operation is not finished it means the worker is still working on it, so put it // back or else we will loose track of the task. self.active_actions.insert(action_info, running_action); return Ok(()); + } else if send_result.is_err() { + // Keep in case this is asked for soon. + self.recently_completed_actions + .push(running_action.action.current_state); + // Clean up any old actions. + self.clean_recently_completed_actions(); } let worker = self @@ -477,8 +537,6 @@ impl SimpleSchedulerImpl { worker.complete_action(&action_info); self.tasks_or_workers_change_notify.notify_one(); - // TODO(allada) We should probably hold a small queue of recent actions for debugging. - // Right now it will drop the action which also disconnects listeners here. Ok(()) } } @@ -533,6 +591,7 @@ impl SimpleScheduler { queued_actions: BTreeMap::new(), workers: Workers::new(scheduler_cfg.allocation_strategy), active_actions: HashMap::new(), + recently_completed_actions: Vec::new(), worker_timeout_s, max_job_retries, tasks_or_workers_change_notify: tasks_or_workers_change_notify.clone(), @@ -598,6 +657,13 @@ impl ActionScheduler for SimpleScheduler { let mut inner = self.inner.lock(); inner.add_action(name, action_info) } + + async fn find_existing_action(&self, name: &str) -> Option>> { + let mut inner = self.inner.lock(); + inner + .find_existing_action(name) + .or_else(|| inner.find_recently_completed_action(name)) + } } #[async_trait] diff --git a/cas/scheduler/tests/cache_lookup_scheduler_test.rs b/cas/scheduler/tests/cache_lookup_scheduler_test.rs index d15edb272e..e2b739c30c 100644 --- a/cas/scheduler/tests/cache_lookup_scheduler_test.rs +++ b/cas/scheduler/tests/cache_lookup_scheduler_test.rs @@ -161,4 +161,17 @@ mod cache_lookup_scheduler_tests { ); Ok(()) } + + #[tokio::test] + async fn find_existing_action_call_passed() -> Result<(), Error> { + let context = make_cache_scheduler()?; + let action_name = "action"; + let (actual_result, actual_action_name) = join!( + context.cache_scheduler.find_existing_action(action_name), + context.mock_scheduler.expect_find_existing_action(None), + ); + assert_eq!(true, actual_result.is_none()); + assert_eq!(action_name, actual_action_name); + Ok(()) + } } diff --git a/cas/scheduler/tests/utils/mock_scheduler.rs b/cas/scheduler/tests/utils/mock_scheduler.rs index 4ea740d704..00acffd833 100644 --- a/cas/scheduler/tests/utils/mock_scheduler.rs +++ b/cas/scheduler/tests/utils/mock_scheduler.rs @@ -26,11 +26,13 @@ use scheduler::ActionScheduler; enum ActionSchedulerCalls { GetPlatformPropertyManager(String), AddAction((String, ActionInfo)), + FindExistingAction(String), } enum ActionSchedulerReturns { GetPlatformPropertyManager(Result, Error>), AddAction(Result>, Error>), + FindExistingAction(Option>>), } pub struct MockActionScheduler { @@ -94,6 +96,21 @@ impl MockActionScheduler { .unwrap(); req } + + pub async fn expect_find_existing_action(&self, result: Option>>) -> String { + let mut rx_call_lock = self.rx_call.lock().await; + let ActionSchedulerCalls::FindExistingAction(req) = rx_call_lock + .recv() + .await + .expect("Could not receive msg in mpsc") else { + panic!("Got incorrect call waiting for find_existing_action") + }; + self.tx_resp + .send(ActionSchedulerReturns::FindExistingAction(result)) + .map_err(|_| make_input_err!("Could not send request to mpsc")) + .unwrap(); + req + } } #[async_trait] @@ -125,4 +142,15 @@ impl ActionScheduler for MockActionScheduler { _ => panic!("Expected add_action return value"), } } + + async fn find_existing_action(&self, name: &str) -> Option>> { + self.tx_call + .send(ActionSchedulerCalls::FindExistingAction(name.to_string())) + .expect("Could not send request to mpsc"); + let mut rx_resp_lock = self.rx_resp.lock().await; + match rx_resp_lock.recv().await.expect("Could not receive msg in mpsc") { + ActionSchedulerReturns::FindExistingAction(result) => result, + _ => panic!("Expected find_existing_action return value"), + } + } }