diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index 5edf435e7190..adcc6b7b1b95 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -29,12 +29,11 @@ pub enum PrepareError { Prevalidation(String), /// Compilation failed for the given PVF. Preparation(String), - /// An unexpected panic has occured in the preparation worker. + /// An unexpected panic has occurred in the preparation worker. Panic(String), /// Failed to prepare the PVF due to the time limit. TimedOut, - /// An IO error occurred while receiving the result from the worker process. This state is reported by the - /// validation host (not by the worker). + /// An IO error occurred. This state is reported by either the validation host or by the worker. IoErr(String), /// The temporary file for the artifact could not be created at the given cache path. This state is reported by the /// validation host (not by the worker). diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index 04357d8704bc..6627337c21f1 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -261,6 +261,13 @@ impl Response { Self::InvalidCandidate(format!("{}: {}", ctx, msg)) } } + fn format_internal(ctx: &'static str, msg: &str) -> Self { + if msg.is_empty() { + Self::InternalError(ctx.to_string()) + } else { + Self::InternalError(format!("{}: {}", ctx, msg)) + } + } } /// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies @@ -268,19 +275,8 @@ impl Response { /// is checked against the worker version. A mismatch results in immediate worker termination. /// `None` is used for tests and in other situations when version check is not necessary. pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { - worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move { + worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move { let worker_pid = std::process::id(); - if let Some(version) = node_version { - if version != env!("SUBSTRATE_CLI_IMPL_VERSION") { - gum::error!( - target: LOG_TARGET, - %worker_pid, - "Node and worker version mismatch, node needs restarting, forcing shutdown", - ); - crate::kill_parent_node_in_emergency(); - return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch")) - } - } let handshake = recv_handshake(&mut stream).await?; let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| { @@ -301,7 +297,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { let cpu_time_start = ProcessTime::now(); // Spawn a new thread that runs the CPU time monitor. - let thread_fut = rt_handle + let cpu_time_monitor_fut = rt_handle .spawn_blocking(move || { cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx) }) @@ -313,14 +309,14 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { }) .fuse(); - pin_mut!(thread_fut); + pin_mut!(cpu_time_monitor_fut); pin_mut!(execute_fut); let response = select_biased! { // If this future is not selected, the join handle is dropped and the thread will // finish in the background. - join_res = thread_fut => { - match join_res { + cpu_time_monitor_res = cpu_time_monitor_fut => { + match cpu_time_monitor_res { Ok(Some(cpu_time_elapsed)) => { // Log if we exceed the timeout and the other thread hasn't finished. gum::warn!( @@ -333,12 +329,12 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { Response::TimedOut }, Ok(None) => Response::InternalError("error communicating over finished channel".into()), - Err(e) => Response::InternalError(format!("{}", e)), + Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()), } }, execute_res = execute_fut => { let _ = finished_tx.send(()); - execute_res.unwrap_or_else(|e| Response::InternalError(format!("{}", e))) + execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string())) }, }; @@ -367,7 +363,7 @@ fn validate_using_artifact( let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) { Err(err) => - return Response::InvalidCandidate(format!("validation result decoding failed: {}", err)), + return Response::format_invalid("validation result decoding failed", &err.to_string()), Ok(r) => r, }; diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs index 88134529bc4b..8c40bbb8b939 100644 --- a/node/core/pvf/src/lib.rs +++ b/node/core/pvf/src/lib.rs @@ -114,7 +114,6 @@ pub use pvf::PvfPrepData; pub use host::{start, Config, ValidationHost}; pub use metrics::Metrics; -pub(crate) use worker_common::kill_parent_node_in_emergency; pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR; pub use execute::worker_entrypoint as execute_worker_entrypoint; diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 1ccba603c1fb..43926e6b64ab 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -351,19 +351,8 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result) { - worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { + worker_event_loop("prepare", socket_path, node_version, |rt_handle, mut stream| async move { let worker_pid = std::process::id(); - if let Some(version) = node_version { - if version != env!("SUBSTRATE_CLI_IMPL_VERSION") { - gum::error!( - target: LOG_TARGET, - %worker_pid, - "Node and worker version mismatch, node needs restarting, forcing shutdown", - ); - crate::kill_parent_node_in_emergency(); - return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch")) - } - } loop { let (pvf, dest) = recv_request(&mut stream).await?; diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 3ed2994a2f94..f7abfe0b29b3 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -171,18 +171,35 @@ pub async fn tmpfile(prefix: &str) -> io::Result { tmpfile_in(prefix, &temp_dir).await } -pub fn worker_event_loop(debug_id: &'static str, socket_path: &str, mut event_loop: F) -where +pub fn worker_event_loop( + debug_id: &'static str, + socket_path: &str, + node_version: Option<&str>, + mut event_loop: F, +) where F: FnMut(Handle, UnixStream) -> Fut, Fut: futures::Future>, { - gum::debug!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "starting pvf worker ({})", - debug_id, - ); + let worker_pid = std::process::id(); + gum::debug!(target: LOG_TARGET, %worker_pid, "starting pvf worker ({})", debug_id); + + // Check for a mismatch between the node and worker versions. + if let Some(version) = node_version { + if version != env!("SUBSTRATE_CLI_IMPL_VERSION") { + gum::error!( + target: LOG_TARGET, + %worker_pid, + "Node and worker version mismatch, node needs restarting, forcing shutdown", + ); + kill_parent_node_in_emergency(); + let err: io::Result = + Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch")); + gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker({}): {:?}", debug_id, err); + return + } + } + // Run the main worker loop. let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); let handle = rt.handle(); let err = rt @@ -197,13 +214,7 @@ where // It's never `Ok` because it's `Ok(Never)`. .unwrap_err(); - gum::debug!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "quitting pvf worker ({}): {:?}", - debug_id, - err, - ); + gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err); // We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast // as possible and not wait for stalled validation to finish. This isn't strictly necessary now, @@ -422,7 +433,7 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result /// get closed by the OS and other workers receive error on socket read and also exit. Preparation /// jobs are written to the temporary files that are renamed to real artifacts on the node side, so /// no leftover artifacts are possible. -pub(crate) fn kill_parent_node_in_emergency() { +fn kill_parent_node_in_emergency() { unsafe { // SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in // some corner cases, which is checked. `kill()` never fails.