-
Notifications
You must be signed in to change notification settings - Fork 1.6k
PVF: Remove rayon
and some uses of tokio
#7153
Changes from 10 commits
866a690
c7e44d1
dc0e9c9
67994f5
df22727
5223995
d4eb740
850d2c0
f4c0b4b
883952c
cc89ab8
1967ddb
6e7a13c
d037d31
88d787c
f7a5b31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,15 +18,12 @@ use crate::LOG_TARGET; | |
use cpu_time::ProcessTime; | ||
use futures::never::Never; | ||
use std::{ | ||
any::Any, | ||
path::PathBuf, | ||
sync::mpsc::{Receiver, RecvTimeoutError}, | ||
time::Duration, | ||
}; | ||
use tokio::{ | ||
io, | ||
net::UnixStream, | ||
runtime::{Handle, Runtime}, | ||
}; | ||
use tokio::{io, net::UnixStream, runtime::Runtime}; | ||
|
||
/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the | ||
/// child process. | ||
|
@@ -44,7 +41,7 @@ pub fn worker_event_loop<F, Fut>( | |
node_version: Option<&str>, | ||
mut event_loop: F, | ||
) where | ||
F: FnMut(Handle, UnixStream) -> Fut, | ||
F: FnMut(UnixStream) -> Fut, | ||
Fut: futures::Future<Output = io::Result<Never>>, | ||
{ | ||
let worker_pid = std::process::id(); | ||
|
@@ -68,13 +65,12 @@ pub fn worker_event_loop<F, Fut>( | |
|
||
// Run the main worker loop. | ||
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); | ||
let handle = rt.handle(); | ||
let err = rt | ||
.block_on(async move { | ||
let stream = UnixStream::connect(socket_path).await?; | ||
let _ = tokio::fs::remove_file(socket_path).await; | ||
|
||
let result = event_loop(handle.clone(), stream).await; | ||
let result = event_loop(stream).await; | ||
|
||
result | ||
}) | ||
|
@@ -124,6 +120,20 @@ pub fn cpu_time_monitor_loop( | |
} | ||
} | ||
|
||
/// Attempt to convert an opaque panic payload to a string. | ||
/// | ||
/// This is a best effort, and is not guaranteed to provide the most accurate value. | ||
pub fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String { | ||
match payload.downcast::<&'static str>() { | ||
Ok(msg) => msg.to_string(), | ||
Err(payload) => match payload.downcast::<String>() { | ||
Ok(msg) => *msg, | ||
// At least we tried... | ||
Err(_) => "unknown panic payload".to_string(), | ||
}, | ||
} | ||
} | ||
|
||
/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGTERM` | ||
/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node | ||
/// restart should be handled by the node owner. As node exits, unix sockets opened to workers | ||
|
@@ -140,3 +150,105 @@ fn kill_parent_node_in_emergency() { | |
} | ||
} | ||
} | ||
|
||
/// Functionality related to threads spawned by the workers. | ||
pub mod thread { | ||
use std::{ | ||
panic, | ||
sync::{Arc, Condvar, Mutex}, | ||
thread, | ||
}; | ||
|
||
/// Contains the outcome of waiting on threads, or `Pending` if none are ready. | ||
#[derive(Clone, Copy)] | ||
pub enum WaitOutcome { | ||
JobFinished, | ||
CpuTimedOut, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Wouldn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or remove the prefix completely: |
||
Pending, | ||
} | ||
|
||
impl WaitOutcome { | ||
pub fn is_pending(&self) -> bool { | ||
matches!(self, Self::Pending) | ||
} | ||
} | ||
|
||
/// Helper type. | ||
type Cond = Arc<(Mutex<WaitOutcome>, Condvar)>; | ||
|
||
/// Gets a condvar initialized to `Pending`. | ||
pub fn get_condvar() -> Cond { | ||
Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new())) | ||
} | ||
|
||
/// Runs a thread, afterwards notifying the thread waiting on the condvar. Catches panics and | ||
/// resumes them after triggering the condvar, so that the waiting thread is notified on panics. | ||
pub fn spawn_worker_thread<F, R>( | ||
name: &str, | ||
f: F, | ||
cond: Cond, | ||
outcome: WaitOutcome, | ||
) -> std::io::Result<thread::JoinHandle<R>> | ||
where | ||
F: FnOnce() -> R, | ||
F: Send + 'static + panic::UnwindSafe, | ||
R: Send + 'static, | ||
{ | ||
thread::Builder::new() | ||
.name(name.into()) | ||
.spawn(move || cond_notify_on_done(f, cond, outcome)) | ||
} | ||
|
||
/// Runs a worker thread with the given stack size. See [`spawn_worker_thread`]. | ||
pub fn spawn_worker_thread_with_stack_size<F, R>( | ||
name: &str, | ||
f: F, | ||
cond: Cond, | ||
outcome: WaitOutcome, | ||
stack_size: usize, | ||
) -> std::io::Result<thread::JoinHandle<R>> | ||
where | ||
F: FnOnce() -> R, | ||
F: Send + 'static + panic::UnwindSafe, | ||
R: Send + 'static, | ||
{ | ||
thread::Builder::new() | ||
.name(name.into()) | ||
.stack_size(stack_size) | ||
.spawn(move || cond_notify_on_done(f, cond, outcome)) | ||
} | ||
|
||
/// Runs a function, afterwards notifying the thread waiting on the condvar. Catches panics and | ||
/// resumes them after triggering the condvar, so that the waiting thread is notified on panics. | ||
fn cond_notify_on_done<F, R>(f: F, cond: Cond, outcome: WaitOutcome) -> R | ||
where | ||
F: FnOnce() -> R, | ||
F: panic::UnwindSafe, | ||
{ | ||
let result = panic::catch_unwind(|| f()); | ||
cond_notify_one(cond, outcome); | ||
match result { | ||
Ok(inner) => return inner, | ||
Err(err) => panic::resume_unwind(err), | ||
} | ||
} | ||
|
||
/// Helper function to notify the thread waiting on this condvar. | ||
fn cond_notify_one(cond: Cond, outcome: WaitOutcome) { | ||
let (lock, cvar) = &*cond; | ||
let mut flag = lock.lock().unwrap(); | ||
if !flag.is_pending() { | ||
// Someone else already triggered the condvar. | ||
return | ||
} | ||
*flag = outcome; | ||
cvar.notify_one(); | ||
} | ||
|
||
/// Block the thread while it waits on the condvar. | ||
pub fn wait_for_threads(cond: Cond) -> WaitOutcome { | ||
let (lock, cvar) = &*cond; | ||
let guard = cvar.wait_while(lock.lock().unwrap(), |flag| flag.is_pending()).unwrap(); | ||
*guard | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,12 +15,15 @@ | |
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
use crate::{ | ||
common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop}, | ||
executor_intf::Executor, | ||
common::{ | ||
bytes_to_path, cpu_time_monitor_loop, stringify_panic_payload, | ||
thread::{self, WaitOutcome}, | ||
worker_event_loop, | ||
}, | ||
executor_intf::{Executor, EXECUTE_THREAD_STACK_SIZE}, | ||
LOG_TARGET, | ||
}; | ||
use cpu_time::ProcessTime; | ||
use futures::{pin_mut, select_biased, FutureExt}; | ||
use parity_scale_codec::{Decode, Encode}; | ||
use polkadot_node_core_pvf::{ | ||
framed_recv, framed_send, ExecuteHandshake as Handshake, ExecuteResponse as Response, | ||
|
@@ -67,18 +70,22 @@ async fn send_response(stream: &mut UnixStream, response: Response) -> io::Resul | |
framed_send(stream, &response.encode()).await | ||
} | ||
|
||
/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies | ||
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`, | ||
/// is checked against the worker version. A mismatch results in immediate worker termination. | ||
/// `None` is used for tests and in other situations when version check is not necessary. | ||
/// The entrypoint that the spawned execute worker should start with. | ||
/// | ||
/// # Parameters | ||
/// | ||
/// The `socket_path` specifies the path to the socket used to communicate with the host. The | ||
/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in | ||
/// immediate worker termination. `None` is used for tests and in other situations when version | ||
/// check is not necessary. | ||
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { | ||
worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move { | ||
worker_event_loop("execute", socket_path, node_version, |mut stream| async move { | ||
let worker_pid = std::process::id(); | ||
|
||
let handshake = recv_handshake(&mut stream).await?; | ||
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| { | ||
let executor = Executor::new(handshake.executor_params).map_err(|e| { | ||
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) | ||
})?); | ||
})?; | ||
|
||
loop { | ||
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; | ||
|
@@ -89,31 +96,49 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { | |
artifact_path.display(), | ||
); | ||
|
||
// Used to signal to the cpu time monitor thread that it can finish. | ||
let (finished_tx, finished_rx) = channel::<()>(); | ||
// Conditional variable to notify us when a thread is done. | ||
let condvar = thread::get_condvar(); | ||
|
||
let cpu_time_start = ProcessTime::now(); | ||
|
||
// Spawn a new thread that runs the CPU time monitor. | ||
let cpu_time_monitor_fut = rt_handle | ||
.spawn_blocking(move || { | ||
cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx) | ||
}) | ||
.fuse(); | ||
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); | ||
let cpu_time_monitor_thread = thread::spawn_worker_thread( | ||
"cpu time monitor thread", | ||
move || { | ||
cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx) | ||
}, | ||
Arc::clone(&condvar), | ||
WaitOutcome::CpuTimedOut, | ||
)?; | ||
let executor_2 = executor.clone(); | ||
let execute_fut = rt_handle | ||
.spawn_blocking(move || { | ||
let execute_thread = thread::spawn_worker_thread_with_stack_size( | ||
"execute thread", | ||
move || { | ||
validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start) | ||
}) | ||
.fuse(); | ||
|
||
pin_mut!(cpu_time_monitor_fut); | ||
pin_mut!(execute_fut); | ||
|
||
let response = select_biased! { | ||
// If this future is not selected, the join handle is dropped and the thread will | ||
// finish in the background. | ||
cpu_time_monitor_res = cpu_time_monitor_fut => { | ||
match cpu_time_monitor_res { | ||
}, | ||
Arc::clone(&condvar), | ||
WaitOutcome::JobFinished, | ||
EXECUTE_THREAD_STACK_SIZE, | ||
)?; | ||
|
||
let outcome = thread::wait_for_threads(condvar); | ||
|
||
let response = match outcome { | ||
WaitOutcome::JobFinished => { | ||
let _ = cpu_time_monitor_tx.send(()); | ||
execute_thread.join().unwrap_or_else(|e| { | ||
// TODO: Use `Panic` error once that is implemented. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe have an issue for this , rather than TODO in the code ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I already addressed it here, it's approved so I'll merge it right after this PR. (I had planned to do these changes right after another so I left the TODO as a marker for myself, did the change, and set 7155's merge target to this branch. Will do issues instead in the future. 👍) |
||
Response::format_internal( | ||
"execute thread error", | ||
&stringify_panic_payload(e), | ||
) | ||
}) | ||
}, | ||
// If the CPU thread is not selected, we signal it to end, the join handle is | ||
// dropped and the thread will finish in the background. | ||
WaitOutcome::CpuTimedOut => { | ||
match cpu_time_monitor_thread.join() { | ||
Ok(Some(cpu_time_elapsed)) => { | ||
// Log if we exceed the timeout and the other thread hasn't finished. | ||
gum::warn!( | ||
|
@@ -125,14 +150,20 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { | |
); | ||
Response::TimedOut | ||
}, | ||
Ok(None) => Response::InternalError("error communicating over finished channel".into()), | ||
Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()), | ||
Ok(None) => Response::format_internal( | ||
"cpu time monitor thread error", | ||
"error communicating over finished channel".into(), | ||
), | ||
// We can use an internal error here because errors in this thread are | ||
// independent of the candidate. | ||
Err(e) => Response::format_internal( | ||
"cpu time monitor thread error", | ||
&stringify_panic_payload(e), | ||
), | ||
} | ||
}, | ||
execute_res = execute_fut => { | ||
let _ = finished_tx.send(()); | ||
execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string())) | ||
}, | ||
WaitOutcome::Pending => | ||
unreachable!("we run wait_while until the outcome is no longer pending; qed"), | ||
}; | ||
|
||
send_response(&mut stream, response).await?; | ||
|
@@ -143,7 +174,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { | |
fn validate_using_artifact( | ||
artifact_path: &Path, | ||
params: &[u8], | ||
executor: Arc<Executor>, | ||
executor: Executor, | ||
cpu_time_start: ProcessTime, | ||
) -> Response { | ||
// Check here if the file exists, because the error from Substrate is not match-able. | ||
|
@@ -163,13 +194,15 @@ fn validate_using_artifact( | |
Ok(d) => d, | ||
}; | ||
|
||
let duration = cpu_time_start.elapsed(); | ||
|
||
let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) { | ||
Err(err) => | ||
return Response::format_invalid("validation result decoding failed", &err.to_string()), | ||
Ok(r) => r, | ||
}; | ||
|
||
// Include the decoding in the measured time, to prevent any potential attacks exploiting some | ||
// bug in decoding. | ||
let duration = cpu_time_start.elapsed(); | ||
|
||
Response::Ok { result_descriptor, duration } | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need tokio and futures if everything async-related is already purged? Filesystem interactions are sync in nature, and for the worker reading from the socket is blocking too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right! I just didn't remove the rest of async to keep this PR focused. But I can do it here if you want.
Note that we still need to remove the dependencies on
polkadot-node-core-pvf
andtracing-gum
to fully remove the dependency ontokio
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How's tracing-gum related to tokio?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just ran
cargo tree -e normal
in the crate and sawtokio
several times in the output, e.g. undersc-network
andlibp2p
crates. I have no idea howtracing-gum
works though.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's jaeger, even though gum only uses hashing from it.
Better to polish the rest of the code and properly synchronize threads and take care of removing tokio later (if it's possible)