This repository has been archived by the owner on Nov 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
PVF: Remove rayon
and some uses of tokio
#7153
Merged
+397
−260
Merged
Changes from 8 commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
866a690
PVF: Remove `rayon` and some uses of `tokio`
mrcnski c7e44d1
Fix compile error
mrcnski dc0e9c9
Fix compile errors
mrcnski 67994f5
Fix compile error
mrcnski df22727
Address comments + couple other changes (see message)
mrcnski 5223995
Implement proper thread synchronization
mrcnski d4eb740
Catch panics in threads so we always notify condvar
mrcnski 850d2c0
Use `WaitOutcome` enum instead of bool condition variable
mrcnski f4c0b4b
Address review comments
mrcnski 883952c
Make the API for condvars in workers nicer
mrcnski cc89ab8
Add a doc
mrcnski 1967ddb
Use condvar for memory stats thread
mrcnski 6e7a13c
Small refactor
mrcnski d037d31
Address review comments
mrcnski 88d787c
Minor simplification
mrcnski f7a5b31
Address review comments
mrcnski File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,20 +15,23 @@ | |
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
use crate::{ | ||
common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop}, | ||
executor_intf::Executor, | ||
common::{ | ||
bytes_to_path, cond_notify_on_done, cond_wait_while, cpu_time_monitor_loop, | ||
stringify_panic_payload, worker_event_loop, WaitOutcome, | ||
}, | ||
executor_intf::{Executor, EXECUTE_THREAD_STACK_SIZE}, | ||
LOG_TARGET, | ||
}; | ||
use cpu_time::ProcessTime; | ||
use futures::{pin_mut, select_biased, FutureExt}; | ||
use parity_scale_codec::{Decode, Encode}; | ||
use polkadot_node_core_pvf::{ | ||
framed_recv, framed_send, ExecuteHandshake as Handshake, ExecuteResponse as Response, | ||
}; | ||
use polkadot_parachain::primitives::ValidationResult; | ||
use std::{ | ||
path::{Path, PathBuf}, | ||
sync::{mpsc::channel, Arc}, | ||
sync::{mpsc::channel, Arc, Condvar, Mutex}, | ||
thread, | ||
time::Duration, | ||
}; | ||
use tokio::{io, net::UnixStream}; | ||
|
@@ -67,18 +70,22 @@ async fn send_response(stream: &mut UnixStream, response: Response) -> io::Resul | |
framed_send(stream, &response.encode()).await | ||
} | ||
|
||
/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies | ||
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`, | ||
/// is checked against the worker version. A mismatch results in immediate worker termination. | ||
/// `None` is used for tests and in other situations when version check is not necessary. | ||
/// The entrypoint that the spawned execute worker should start with. | ||
/// | ||
/// # Parameters | ||
/// | ||
/// The `socket_path` specifies the path to the socket used to communicate with the host. The | ||
/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in | ||
/// immediate worker termination. `None` is used for tests and in other situations when version | ||
/// check is not necessary. | ||
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { | ||
worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move { | ||
worker_event_loop("execute", socket_path, node_version, |mut stream| async move { | ||
let worker_pid = std::process::id(); | ||
|
||
let handshake = recv_handshake(&mut stream).await?; | ||
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| { | ||
let executor = Executor::new(handshake.executor_params).map_err(|e| { | ||
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) | ||
})?); | ||
})?; | ||
|
||
loop { | ||
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; | ||
|
@@ -89,31 +96,63 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { | |
artifact_path.display(), | ||
); | ||
|
||
// Used to signal to the cpu time monitor thread that it can finish. | ||
let (finished_tx, finished_rx) = channel::<()>(); | ||
// Conditional variable to notify us when a thread is done. | ||
let cond_main = Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new())); | ||
let cond_cpu = Arc::clone(&cond_main); | ||
let cond_job = Arc::clone(&cond_main); | ||
|
||
let cpu_time_start = ProcessTime::now(); | ||
|
||
// Spawn a new thread that runs the CPU time monitor. | ||
let cpu_time_monitor_fut = rt_handle | ||
.spawn_blocking(move || { | ||
cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx) | ||
}) | ||
.fuse(); | ||
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); | ||
let cpu_time_monitor_thread = thread::spawn(move || { | ||
cond_notify_on_done( | ||
|| { | ||
cpu_time_monitor_loop( | ||
cpu_time_start, | ||
execution_timeout, | ||
cpu_time_monitor_rx, | ||
) | ||
}, | ||
cond_cpu, | ||
WaitOutcome::CpuTimedOut, | ||
) | ||
}); | ||
let executor_2 = executor.clone(); | ||
let execute_fut = rt_handle | ||
.spawn_blocking(move || { | ||
validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start) | ||
}) | ||
.fuse(); | ||
|
||
pin_mut!(cpu_time_monitor_fut); | ||
pin_mut!(execute_fut); | ||
|
||
let response = select_biased! { | ||
// If this future is not selected, the join handle is dropped and the thread will | ||
// finish in the background. | ||
cpu_time_monitor_res = cpu_time_monitor_fut => { | ||
match cpu_time_monitor_res { | ||
let execute_thread = | ||
thread::Builder::new().stack_size(EXECUTE_THREAD_STACK_SIZE).spawn(move || { | ||
cond_notify_on_done( | ||
|| { | ||
validate_using_artifact( | ||
&artifact_path, | ||
¶ms, | ||
executor_2, | ||
cpu_time_start, | ||
) | ||
}, | ||
cond_job, | ||
WaitOutcome::JobFinished, | ||
) | ||
})?; | ||
|
||
// Wait for one of the threads to finish. | ||
let outcome = cond_wait_while(cond_main); | ||
|
||
let response = match outcome { | ||
WaitOutcome::JobFinished => { | ||
let _ = cpu_time_monitor_tx.send(()); | ||
execute_thread.join().unwrap_or_else(|e| { | ||
// TODO: Use `Panic` error once that is implemented. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe have an issue for this , rather than TODO in the code ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I already addressed it here, it's approved so I'll merge it right after this PR. (I had planned to do these changes right after another so I left the TODO as a marker for myself, did the change, and set 7155's merge target to this branch. Will do issues instead in the future. 👍) |
||
Response::format_internal( | ||
"execute thread error", | ||
&stringify_panic_payload(e), | ||
) | ||
}) | ||
}, | ||
// If this thread is not selected, we signal it to end, the join handle is dropped | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// and the thread will finish in the background. | ||
WaitOutcome::CpuTimedOut => { | ||
match cpu_time_monitor_thread.join() { | ||
Ok(Some(cpu_time_elapsed)) => { | ||
// Log if we exceed the timeout and the other thread hasn't finished. | ||
gum::warn!( | ||
|
@@ -125,14 +164,21 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { | |
); | ||
Response::TimedOut | ||
}, | ||
Ok(None) => Response::InternalError("error communicating over finished channel".into()), | ||
Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()), | ||
Ok(None) => Response::format_internal( | ||
"cpu time monitor thread error", | ||
"error communicating over finished channel".into(), | ||
), | ||
// We can use an internal error here because errors in this thread are | ||
// independent of the candidate. | ||
Err(e) => Response::format_internal( | ||
"cpu time monitor thread error", | ||
&stringify_panic_payload(e), | ||
), | ||
} | ||
}, | ||
execute_res = execute_fut => { | ||
let _ = finished_tx.send(()); | ||
execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string())) | ||
}, | ||
WaitOutcome::Pending => Response::InternalError( | ||
"we run wait_while until the outcome is no longer pending; qed".into(), | ||
mrcnski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
), | ||
}; | ||
|
||
send_response(&mut stream, response).await?; | ||
|
@@ -143,7 +189,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { | |
fn validate_using_artifact( | ||
artifact_path: &Path, | ||
params: &[u8], | ||
executor: Arc<Executor>, | ||
executor: Executor, | ||
cpu_time_start: ProcessTime, | ||
) -> Response { | ||
// Check here if the file exists, because the error from Substrate is not match-able. | ||
|
@@ -163,13 +209,15 @@ fn validate_using_artifact( | |
Ok(d) => d, | ||
}; | ||
|
||
let duration = cpu_time_start.elapsed(); | ||
|
||
let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) { | ||
Err(err) => | ||
return Response::format_invalid("validation result decoding failed", &err.to_string()), | ||
Ok(r) => r, | ||
}; | ||
|
||
// Include the decoding in the measured time, to prevent any potential attacks exploiting some | ||
// bug in decoding. | ||
let duration = cpu_time_start.elapsed(); | ||
|
||
Response::Ok { result_descriptor, duration } | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need tokio and futures if everything async-related is already purged? Filesystem interactions are sync in nature, and for the worker reading from the socket is blocking too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right! I just didn't remove the rest of async to keep this PR focused. But I can do it here if you want.
Note that we still need to remove the dependencies on
polkadot-node-core-pvf
andtracing-gum
to fully remove the dependency ontokio
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How's tracing-gum related to tokio?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just ran
cargo tree -e normal
in the crate and sawtokio
several times in the output, e.g. undersc-network
andlibp2p
crates. I have no idea howtracing-gum
works though.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's jaeger, even though gum only uses hashing from it.
Better to polish the rest of the code and properly synchronize threads and take care of removing tokio later (if it's possible)