-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[draft] accounts: parallel load across txs via core-pinned threadpool #17774
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,74 @@ | ||||||||||||||||||||||||||
use rayon::ThreadPool; | ||||||||||||||||||||||||||
use rayon_core::{ThreadBuilder, ThreadPoolBuilder}; | ||||||||||||||||||||||||||
use std::{io, thread}; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
const NUM_THREADS_PER_CORE: usize = 2; | ||||||||||||||||||||||||||
const MAX_NUM_OF_THREADS: usize = 128; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
#[derive(Debug, Default)] | ||||||||||||||||||||||||||
pub struct PinnedSpawn { | ||||||||||||||||||||||||||
cores: Vec<core_affinity::CoreId>, | ||||||||||||||||||||||||||
len: usize, | ||||||||||||||||||||||||||
core_id_pointer: usize, | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
impl PinnedSpawn { | ||||||||||||||||||||||||||
// pub fn new(num_cores: usize) -> Self { | ||||||||||||||||||||||||||
// let core_ids = core_affinity::get_core_ids().unwrap(); | ||||||||||||||||||||||||||
// if num_cores > core_ids.len() { | ||||||||||||||||||||||||||
// panic!("More cores requested than available"); | ||||||||||||||||||||||||||
// } | ||||||||||||||||||||||||||
// Self { | ||||||||||||||||||||||||||
// cores: core_ids.into_iter().rev().take(num_cores).collect(), | ||||||||||||||||||||||||||
// len: num_cores, | ||||||||||||||||||||||||||
// core_id_pointer: 0, | ||||||||||||||||||||||||||
// } | ||||||||||||||||||||||||||
// } | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Pins as many threads as the ceil of the fraction times the total number of cores | ||||||||||||||||||||||||||
// This ensures that at least 1 core would be pinned | ||||||||||||||||||||||||||
pub fn new_frac_of_cores(num: usize, denom: usize) -> Self { | ||||||||||||||||||||||||||
if num > denom { | ||||||||||||||||||||||||||
panic!("fraction must be <= 1"); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
let core_ids = core_affinity::get_core_ids().unwrap(); | ||||||||||||||||||||||||||
let num_cores = (num * core_ids.len() - 1) / denom + 1; | ||||||||||||||||||||||||||
Self { | ||||||||||||||||||||||||||
cores: core_ids.into_iter().rev().take(num_cores).collect(), | ||||||||||||||||||||||||||
len: num_cores, | ||||||||||||||||||||||||||
core_id_pointer: 0, | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Spawn threads pinned to core in a round robin fashion | ||||||||||||||||||||||||||
pub fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { | ||||||||||||||||||||||||||
let mut b = thread::Builder::new(); | ||||||||||||||||||||||||||
if let Some(name) = thread.name() { | ||||||||||||||||||||||||||
b = b.name(name.to_owned()); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
if let Some(stack_size) = thread.stack_size() { | ||||||||||||||||||||||||||
b = b.stack_size(stack_size); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
let id_for_spawn = self.cores[self.core_id_pointer]; | ||||||||||||||||||||||||||
b.spawn(move || { | ||||||||||||||||||||||||||
core_affinity::set_for_current(id_for_spawn); | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you expand on in a comment what the problem we're trying to solve with pinning the threads/benefit of pinning these threads to particular cores would be (cache locality?) Also perhaps a new bench in
Might also be a good idea to try running this on a full validator to see if there are any unexpected drawdowns in other components of the validator. For instance I think the PoH thread is pinned to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @carllin My apologies that the thoughts are a little jumbled and still evolving on the issue. Will eventually summarise investigations in a report of sorts - there is a larger question of avoiding mmap, as do most modern SSD-oriented DBs. And it ought to at least be spelled out what the potential benefits are. The biggest benefit may be to reduce memory requirements while improving the worst-case critical path (this is related to some security, performance, and pricing issues, such as the tx cost model). This is important if considering a persistent index. I think the persistent index should be written using io_uring and fallback on polling disk read syscalls handled via async/await. Pinning the threads in particular was just an idea to reduce resource contention, so that the loading threads don’t eat up the resources of the entire CPU. However, I’ve found by measuring the time to perform Note that in the replay stage, a different scenario arises. One can no longer “cheat” by parallelising across threads. That’s because the default behaviour is to already saturate the threads with The fundamental problem about spinning up threads is that one doesn't know in advance when one is going to trigger a page fault. Spinning up a thread when MMap page hits is wasteful. But not spinning up a thread when there is a fault is going to increase the latency. The solution of asking a scheduler like Rayon is imperfect at best - as seen from the results below, there isn't a solution that works in all cases, and the worst-case critical path is also not fully mitigated. Ideally, the application layer knows when data is cached and when it has to hit disk - and can thus spin up a thread as needed, or have an async runtime than doesn't even require spinning up more OS threads, rather than relying on the OS as a black box that can arbitrarily stall a thread's execution. In other words, having the I/O operation occur at the application/kernel boundary is much better than having it occur fully in the kernel layer. Results - fully in RAM (approximate from eyeballing):
As you can see, while parallelism helps the banking stage, utilising the same threadpool leads to contention for the replay stage and results in worse performance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As it is difficult to manually trigger page faults, I decided to simulate a single 250us page fault on every account load. pub fn get_account<'a>(&'a self, offset: usize) -> Option<(StoredAccountMeta<'a>, usize)> {
std::thread::sleep(std::time::Duration::from_micros(250)); Here are my results (32 IO threads on 8C16T machine): This first result is very reasonably explained by the following fact (and by latency spikes): The second result is not really explained. There's a factor of 5 discrepancy. Replay: This PR |
||||||||||||||||||||||||||
thread.run() | ||||||||||||||||||||||||||
})?; | ||||||||||||||||||||||||||
self.core_id_pointer += 1; | ||||||||||||||||||||||||||
self.core_id_pointer %= self.len; | ||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
pub fn pinned_spawn_handler_frac(num: usize, denom: usize) -> ThreadPool { | ||||||||||||||||||||||||||
let mut spawner = PinnedSpawn::new_frac_of_cores(num, denom); | ||||||||||||||||||||||||||
ThreadPoolBuilder::new() | ||||||||||||||||||||||||||
.thread_name(|i| format!("pinned-thread-for-parallel-load-{}", i)) | ||||||||||||||||||||||||||
.num_threads(std::cmp::min( | ||||||||||||||||||||||||||
spawner.len * NUM_THREADS_PER_CORE, | ||||||||||||||||||||||||||
MAX_NUM_OF_THREADS, | ||||||||||||||||||||||||||
)) | ||||||||||||||||||||||||||
.spawn_handler(|thread| spawner.spawn(thread)) | ||||||||||||||||||||||||||
.build() | ||||||||||||||||||||||||||
.unwrap() | ||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, this is not a bad idea, but I'm wondering if we can get even better perf if we scheduled the entire load + execute in pathway in parallel. Might be worth exploring 😃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the reason why I think that's not necessarily a good idea is that the execute portion is CPU-bound.
The purpose of making the load portion parallel or async was to reduce the tail-end critical path of sequentially loading 128 TXs with up to 20-256 accounts each.
As I've mentioned elsewhere, that's a maximum critical path of 640ms-7+s on a storage device with 250us read latency. This is extremely bad for the costing model that @taozhu-chicago is working on and for security.
In general tx execution parallelism can already be handled by spawning more banking or replay threads.
I'm not familiar with how predictable the execution critical path is. But, I do know that compute unit limit is about 200K now, which if that corresponds to CPU cycles, at 60us per tx, corresponds to a critical path of 7.68ms.
Ideally, the DB was not using MMap, so that one can use something like io_uring which can achieve 1MM direct-IO (i.e. zero-copy) IOPs on single thread.
So that one does not have to manage thread scheduling, contention and switching, and can spawn async at the load granularity. The loading APIs should then be rewritten in terms of batch APIs that do async under the hood.
Batching further helps io_uring by reducing system call overhead.
AccountStoredMeta
deserialization can also occur in the kernel via EBPF so that one does not need to ask the application layer to do so. Deserializing to a buffer of the right size may be a challenge here, however. But,AccountInfo
already contains the data size, so it shouldn't be a problem.This neccesitates a better application-layer cache, as io_uring would be used in zero-copy/unbuffered mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example API:
Notice that batching forces copying (into a
LoadedTransaction
), however. One can get rid of the copy by returningVec<Vec<AccountSharedData>>
instead.