Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

PVF: Minor refactor in workers code #7012

Merged
merged 2 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ pub enum PrepareError {
Prevalidation(String),
/// Compilation failed for the given PVF.
Preparation(String),
/// An unexpected panic has occured in the preparation worker.
/// An unexpected panic has occurred in the preparation worker.
Panic(String),
/// Failed to prepare the PVF due to the time limit.
TimedOut,
/// An IO error occurred while receiving the result from the worker process. This state is reported by the
/// validation host (not by the worker).
/// An IO error occurred. This state is reported by either the validation host or by the worker.
IoErr(String),
/// The temporary file for the artifact could not be created at the given cache path. This state is reported by the
/// validation host (not by the worker).
Expand Down
34 changes: 15 additions & 19 deletions node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,26 +261,22 @@ impl Response {
Self::InvalidCandidate(format!("{}: {}", ctx, msg))
}
}
fn format_internal(ctx: &'static str, msg: &str) -> Self {
if msg.is_empty() {
Self::InternalError(ctx.to_string())
} else {
Self::InternalError(format!("{}: {}", ctx, msg))
}
}
}

/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
/// is checked against the worker version. A mismatch results in immediate worker termination.
/// `None` is used for tests and in other situations when version check is not necessary.
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move {
worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move {
let worker_pid = std::process::id();
if let Some(version) = node_version {
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
gum::error!(
target: LOG_TARGET,
%worker_pid,
"Node and worker version mismatch, node needs restarting, forcing shutdown",
);
crate::kill_parent_node_in_emergency();
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
}
}

let handshake = recv_handshake(&mut stream).await?;
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
Expand All @@ -301,7 +297,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
let cpu_time_start = ProcessTime::now();

// Spawn a new thread that runs the CPU time monitor.
let thread_fut = rt_handle
let cpu_time_monitor_fut = rt_handle
.spawn_blocking(move || {
cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
})
Expand All @@ -313,14 +309,14 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
})
.fuse();

pin_mut!(thread_fut);
pin_mut!(cpu_time_monitor_fut);
pin_mut!(execute_fut);

let response = select_biased! {
// If this future is not selected, the join handle is dropped and the thread will
// finish in the background.
join_res = thread_fut => {
match join_res {
cpu_time_monitor_res = cpu_time_monitor_fut => {
match cpu_time_monitor_res {
Ok(Some(cpu_time_elapsed)) => {
// Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!(
Expand All @@ -333,12 +329,12 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
Response::TimedOut
},
Ok(None) => Response::InternalError("error communicating over finished channel".into()),
Err(e) => Response::InternalError(format!("{}", e)),
Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()),
}
},
execute_res = execute_fut => {
let _ = finished_tx.send(());
execute_res.unwrap_or_else(|e| Response::InternalError(format!("{}", e)))
execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string()))
},
};

Expand Down Expand Up @@ -367,7 +363,7 @@ fn validate_using_artifact(

let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
Err(err) =>
return Response::InvalidCandidate(format!("validation result decoding failed: {}", err)),
return Response::format_invalid("validation result decoding failed", &err.to_string()),
Ok(r) => r,
};

Expand Down
1 change: 0 additions & 1 deletion node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ pub use pvf::PvfPrepData;

pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
pub(crate) use worker_common::kill_parent_node_in_emergency;
pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR;

pub use execute::worker_entrypoint as execute_worker_entrypoint;
Expand Down
13 changes: 1 addition & 12 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,19 +351,8 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
/// send that in the `PrepareResult`.
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
worker_event_loop("prepare", socket_path, node_version, |rt_handle, mut stream| async move {
let worker_pid = std::process::id();
if let Some(version) = node_version {
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
gum::error!(
target: LOG_TARGET,
%worker_pid,
"Node and worker version mismatch, node needs restarting, forcing shutdown",
);
crate::kill_parent_node_in_emergency();
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
}
}

loop {
let (pvf, dest) = recv_request(&mut stream).await?;
Expand Down
43 changes: 27 additions & 16 deletions node/core/pvf/src/worker_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,35 @@ pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> {
tmpfile_in(prefix, &temp_dir).await
}

pub fn worker_event_loop<F, Fut>(debug_id: &'static str, socket_path: &str, mut event_loop: F)
where
pub fn worker_event_loop<F, Fut>(
debug_id: &'static str,
socket_path: &str,
node_version: Option<&str>,
mut event_loop: F,
) where
F: FnMut(Handle, UnixStream) -> Fut,
Fut: futures::Future<Output = io::Result<Never>>,
{
gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"starting pvf worker ({})",
debug_id,
);
let worker_pid = std::process::id();
gum::debug!(target: LOG_TARGET, %worker_pid, "starting pvf worker ({})", debug_id);

// Check for a mismatch between the node and worker versions.
if let Some(version) = node_version {
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
gum::error!(
target: LOG_TARGET,
%worker_pid,
"Node and worker version mismatch, node needs restarting, forcing shutdown",
);
kill_parent_node_in_emergency();
let err: io::Result<Never> =
Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"));
gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker({}): {:?}", debug_id, err);
return
}
}

// Run the main worker loop.
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
let handle = rt.handle();
let err = rt
Expand All @@ -197,13 +214,7 @@ where
// It's never `Ok` because it's `Ok(Never)`.
.unwrap_err();

gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"quitting pvf worker ({}): {:?}",
debug_id,
err,
);
gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err);

// We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast
// as possible and not wait for stalled validation to finish. This isn't strictly necessary now,
Expand Down Expand Up @@ -422,7 +433,7 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>
/// get closed by the OS and other workers receive error on socket read and also exit. Preparation
/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so
/// no leftover artifacts are possible.
pub(crate) fn kill_parent_node_in_emergency() {
fn kill_parent_node_in_emergency() {
unsafe {
// SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in
// some corner cases, which is checked. `kill()` never fails.
Expand Down