Skip to content

Commit

Permalink
Create WaitExecution for ExecutationServer.
Browse files Browse the repository at this point in the history
Goma requires WaitExecution to function correctly.  Implement this and
store recently completed actions.
  • Loading branch information
aaronmondal authored and chrisstaite-menlo committed Jul 15, 2023
1 parent f211ef2 commit 04b492a
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 78 deletions.
68 changes: 52 additions & 16 deletions cas/grpc_service/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use futures::{Stream, StreamExt};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
use rand::{thread_rng, Rng};
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tonic::{Request, Response, Status};

use ac_utils::get_and_decode_digest;
use action_messages::{ActionInfo, ActionInfoHashKey, DEFAULT_EXECUTION_PRIORITY};
use action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use common::{log, DigestInfo};
use config::cas_server::{ExecutionConfig, InstanceName};
use error::{make_input_err, Error, ResultExt};
Expand Down Expand Up @@ -163,6 +164,14 @@ impl ExecutionServer {
Server::new(self)
}

fn to_execute_stream(receiver: watch::Receiver<Arc<ActionState>>) -> Response<ExecuteStream> {
let receiver_stream = Box::pin(WatchStream::new(receiver).map(|action_update| {
log::info!("\x1b[0;31mexecute Resp Stream\x1b[0m: {:?}", action_update);
Ok(action_update.as_ref().clone().into())
}));
tonic::Response::new(receiver_stream)
}

async fn inner_execute(&self, request: Request<ExecuteRequest>) -> Result<Response<ExecuteStream>, Error> {
let execute_req = request.into_inner();
let instance_name = execute_req.instance_name;
Expand Down Expand Up @@ -197,11 +206,41 @@ impl ExecutionServer {
.await
.err_tip(|| "Failed to schedule task")?;

let receiver_stream = Box::pin(WatchStream::new(rx).map(|action_update| {
log::info!("\x1b[0;31mexecute Resp Stream\x1b[0m: {:?}", action_update);
Ok(action_update.as_ref().clone().into())
}));
Ok(tonic::Response::new(receiver_stream))
Ok(Self::to_execute_stream(rx))
}

async fn filter_actions<'a, Fut, F, G, R, S>(&'a self, operation: F, mut filter: G) -> Result<S, Status>
where
F: FnMut(&'a InstanceInfo) -> Fut,
G: FnMut(R) -> Option<S>,
Fut: Future<Output = R>,
{
// Very innefficient having to search through all instances, however
// the protobuf spec doesn't state that the instance_name can be
// retrieved from the path.
let mut result_stream = self
.instance_infos
.values()
.map(operation)
.collect::<FuturesUnordered<_>>();
while let Some(result) = result_stream.next().await {
if let Some(result) = filter(result) {
return Ok(result);
}
}
Err(Status::not_found("Failed to find existing task"))
}

async fn inner_wait_execution(
&self,
request: Request<WaitExecutionRequest>,
) -> Result<Response<ExecuteStream>, Status> {
let name = request.into_inner().name;
self.filter_actions(
|instance_info| instance_info.scheduler.find_existing_action(&name),
|result| result.map(Self::to_execute_stream),
)
.await
}
}

Expand All @@ -227,14 +266,11 @@ impl Execution for ExecutionServer {
resp
}

type WaitExecutionStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send + Sync + 'static>>;
async fn wait_execution(
&self,
request: Request<WaitExecutionRequest>,
) -> Result<Response<Self::WaitExecutionStream>, Status> {
use stdext::function_name;
let output = format!("{} not yet implemented", function_name!());
println!("{:?}", request);
Err(Status::unimplemented(output))
type WaitExecutionStream = ExecuteStream;
async fn wait_execution(&self, request: Request<WaitExecutionRequest>) -> Result<Response<ExecuteStream>, Status> {
self.inner_wait_execution(request)
.await
.err_tip(|| "Failed on wait_execution() command")
.map_err(|e| e.into())
}
}
1 change: 1 addition & 0 deletions cas/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ rust_library(
"//util:common",
"//util:error",
"@crate_index//:futures",
"@crate_index//:parking_lot",
"@crate_index//:tonic",
"@crate_index//:tokio",
"@crate_index//:tokio-stream",
Expand Down
48 changes: 40 additions & 8 deletions cas/scheduler/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;

Expand All @@ -26,13 +27,19 @@ use action_messages::{ActionInfo, ActionResult, ActionStage, ActionState};
use common::DigestInfo;
use error::Error;
use grpc_store::GrpcStore;
use parking_lot::Mutex;
use platform_property_manager::PlatformPropertyManager;
use proto::build::bazel::remote::execution::v2::{
ActionResult as ProtoActionResult, FindMissingBlobsRequest, GetActionResultRequest,
};
use scheduler::ActionScheduler;
use store::Store;

/// Actions that are having their cache checked or failed cache lookup and are
/// being forwarded upstream. Missing the skip_cache_check actions which are
/// forwarded directly.
type CheckActions = HashMap<String, Arc<watch::Sender<Arc<ActionState>>>>;

pub struct CacheLookupScheduler {
/// A reference to the CAS which is used to validate all the outputs of a
/// cached ActionResult still exist.
Expand All @@ -42,6 +49,8 @@ pub struct CacheLookupScheduler {
/// The "real" scheduler to use to perform actions if they were not found
/// in the action cache.
action_scheduler: Arc<dyn ActionScheduler>,
/// Actions that are currently performing a CacheCheck.
cache_check_actions: Arc<Mutex<CheckActions>>,
}

async fn get_action_from_store(
Expand Down Expand Up @@ -122,6 +131,7 @@ impl CacheLookupScheduler {
cas_store,
ac_store,
action_scheduler,
cache_check_actions: Default::default(),
})
}
}
Expand All @@ -148,9 +158,12 @@ impl ActionScheduler for CacheLookupScheduler {
action_digest: *action_digest,
});
let (tx, rx) = watch::channel(current_state.clone());
let tx = Arc::new(tx);
self.cache_check_actions.lock().insert(name.clone(), tx.clone());
let ac_store = self.ac_store.clone();
let cas_store = self.cas_store.clone();
let action_scheduler = self.action_scheduler.clone();
let cache_check_actions = self.cache_check_actions.clone();
tokio::spawn(async move {
let instance_name = action_info.instance_name.to_string();
if let Some(proto_action_result) =
Expand All @@ -160,24 +173,43 @@ impl ActionScheduler for CacheLookupScheduler {
// Found in the cache, return the result immediately.
Arc::make_mut(&mut current_state).stage = ActionStage::CompletedFromCache(proto_action_result);
let _ = tx.send(current_state);
cache_check_actions.lock().remove(&name);
return;
}
}
// Not in cache, forward to upstream and proxy state.
let mut watch_stream = match action_scheduler.add_action(name, action_info).await {
Ok(rx) => WatchStream::new(rx),
match action_scheduler.add_action(name.clone(), action_info).await {
Ok(rx) => {
let mut watch_stream = WatchStream::new(rx);
while let Some(action_state) = watch_stream.next().await {
if tx.send(action_state).is_err() {
break;
}
}
}
Err(err) => {
Arc::make_mut(&mut current_state).stage = ActionStage::Error((err, ActionResult::default()));
let _ = tx.send(current_state);
return;
}
};
while let Some(action_state) = watch_stream.next().await {
if tx.send(action_state).is_err() {
break;
}
}
cache_check_actions.lock().remove(&name);
});
Ok(rx)
}

async fn find_existing_action(&self, name: &str) -> Option<watch::Receiver<Arc<ActionState>>> {
{
let cache_check_actions = self.cache_check_actions.lock();
if let Some(tx) = cache_check_actions.get(name) {
let current_value = tx.borrow();
// Subscribe marks the current value as seen, so we have to
// re-send it to all receivers.
let rx = tx.subscribe();
let _ = tx.send(current_value.clone());
return Some(rx);
}
}
// Cache skipped may be in the upstream scheduler.
self.action_scheduler.find_existing_action(name).await
}
}
88 changes: 55 additions & 33 deletions cas/scheduler/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,63 @@ use std::sync::Arc;
use async_trait::async_trait;
use parking_lot::Mutex;
use tokio::sync::watch;
use tonic::{transport, Request};
use tonic::{transport, Request, Streaming};

use action_messages::{ActionInfo, ActionState, DEFAULT_EXECUTION_PRIORITY};
use common::log;
use error::{make_err, Code, Error, ResultExt};
use platform_property_manager::PlatformPropertyManager;
use proto::build::bazel::remote::execution::v2::{
capabilities_client::CapabilitiesClient, execution_client::ExecutionClient, ExecuteRequest, ExecutionPolicy,
GetCapabilitiesRequest,
GetCapabilitiesRequest, WaitExecutionRequest,
};
use proto::google::longrunning::Operation;
use scheduler::ActionScheduler;

pub struct GrpcScheduler {
endpoint: transport::Channel,
capabilities_client: CapabilitiesClient<transport::Channel>,
execution_client: ExecutionClient<transport::Channel>,
platform_property_managers: Mutex<HashMap<String, Arc<PlatformPropertyManager>>>,
}

impl GrpcScheduler {
pub async fn new(config: &config::schedulers::GrpcScheduler) -> Result<Self, Error> {
let endpoint = transport::Endpoint::new(config.endpoint.clone())
.err_tip(|| format!("Could not parse {} in GrpcScheduler", config.endpoint))?
.connect()
.await
.err_tip(|| format!("Could not connect to {} in GrpcScheduler", config.endpoint))?;
let endpoint = transport::Channel::balance_list(std::iter::once(
transport::Endpoint::new(config.endpoint.clone())
.err_tip(|| format!("Could not parse {} in GrpcScheduler", config.endpoint))?,
));

Ok(Self {
endpoint,
capabilities_client: CapabilitiesClient::new(endpoint.clone()),
execution_client: ExecutionClient::new(endpoint),
platform_property_managers: Mutex::new(HashMap::new()),
})
}

async fn stream_state(mut result_stream: Streaming<Operation>) -> Result<watch::Receiver<Arc<ActionState>>, Error> {
if let Some(initial_response) = result_stream
.message()
.await
.err_tip(|| "Recieving response from upstream scheduler")?
{
let (tx, rx) = watch::channel(Arc::new(initial_response.try_into()?));
tokio::spawn(async move {
while let Ok(Some(response)) = result_stream.message().await {
match response.try_into() {
Ok(response) => {
if let Err(err) = tx.send(Arc::new(response)) {
log::info!("Client disconnected in GrpcScheduler: {}", err);
return;
}
}
Err(err) => log::error!("Error converting response to watch in GrpcScheduler: {}", err),
}
}
});
return Ok(rx);
}
Err(make_err!(Code::Internal, "Upstream scheduler didn't accept action."))
}
}

#[async_trait]
Expand All @@ -58,8 +85,9 @@ impl ActionScheduler for GrpcScheduler {
}

// Not in the cache, lookup the capabilities with the upstream.
let mut capabilities_client = CapabilitiesClient::new(self.endpoint.clone());
let capabilities = capabilities_client
let capabilities = self
.capabilities_client
.clone()
.get_capabilities(GetCapabilitiesRequest {
instance_name: instance_name.to_string(),
})
Expand Down Expand Up @@ -101,32 +129,26 @@ impl ActionScheduler for GrpcScheduler {
// TODO: Get me from the original request, not very important as we ignore it.
results_cache_policy: None,
};
let mut result_stream = ExecutionClient::new(self.endpoint.clone())
let result_stream = self
.execution_client
.clone()
.execute(Request::new(request))
.await
.err_tip(|| "Sending action to upstream scheduler")?
.into_inner();
if let Some(initial_response) = result_stream
.message()
.await
.err_tip(|| "Recieving response from upstream scheduler")?
{
let (tx, rx) = watch::channel(Arc::new(initial_response.try_into()?));
tokio::spawn(async move {
while let Ok(Some(response)) = result_stream.message().await {
match response.try_into() {
Ok(response) => {
if let Err(err) = tx.send(Arc::new(response)) {
log::info!("Client disconnected in GrpcScheduler: {}", err);
return;
}
}
Err(err) => log::error!("Error converting response to watch in GrpcScheduler: {}", err),
}
}
});
return Ok(rx);
Self::stream_state(result_stream).await
}

async fn find_existing_action(&self, name: &str) -> Option<watch::Receiver<Arc<ActionState>>> {
let request = WaitExecutionRequest { name: name.to_string() };
let result_stream = self
.execution_client
.clone()
.wait_execution(Request::new(request))
.await;
if result_stream.is_err() {
return None;
}
Err(make_err!(Code::Internal, "Upstream scheduler didn't accept action."))
Self::stream_state(result_stream.unwrap().into_inner()).await.ok()
}
}
3 changes: 3 additions & 0 deletions cas/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub trait ActionScheduler: Sync + Send + Unpin {
name: String,
action_info: ActionInfo,
) -> Result<watch::Receiver<Arc<ActionState>>, Error>;

/// Find an existing action by its name.
async fn find_existing_action(&self, name: &str) -> Option<watch::Receiver<Arc<ActionState>>>;
}

/// WorkerScheduler interface is responsible for interactions between the scheduler
Expand Down
Loading

0 comments on commit 04b492a

Please sign in to comment.