From c7d963a9c242c915c4ed320fc79a0634d506ccd0 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Mon, 17 Aug 2020 16:14:27 -0700 Subject: [PATCH] Use crossbeam_deque::Injector instead of crossbeam_queue::SegQueue `Injector` and `SegQueue` are _almost_ identical, down to the very same comments in their implementations. One difference is that `Injector` allocates its first block as soon as it's created, but `SegQueue` waits until its first `push`, which complicates it to allow being `null`. `Injector` also has methods to steal batches into a deque `Worker`, which might be useful to us. At the very least, this lets us trim a dependency. --- ci/compat-Cargo.lock | 12 ------------ rayon-core/Cargo.toml | 1 - rayon-core/src/job.rs | 14 ++++++++++---- rayon-core/src/registry.rs | 24 ++++++++++++++---------- 4 files changed, 24 insertions(+), 27 deletions(-) diff --git a/ci/compat-Cargo.lock b/ci/compat-Cargo.lock index 2e98c07b0..aef1589f1 100644 --- a/ci/compat-Cargo.lock +++ b/ci/compat-Cargo.lock @@ -220,16 +220,6 @@ dependencies = [ "scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "crossbeam-queue" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -1051,7 +1041,6 @@ version = "1.7.1" dependencies = [ "crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-queue 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.74 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1457,7 +1446,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061" "checksum crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" "checksum crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" -"checksum crossbeam-queue 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" "checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" "checksum derivative 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cb582b60359da160a9477ee80f15c8d784c477e69c217ef2cdd4169c24ea380f" "checksum dispatch 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bd0c93bb4b0c6d9b77f4435b0ae98c24d17f1c45b2ff844c6151a07256ca923b" diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index fd72d1263..97c0c2527 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -20,7 +20,6 @@ num_cpus = "1.2" lazy_static = "1" crossbeam-channel = "0.4.2" crossbeam-deque = "0.7.2" -crossbeam-queue = "0.2" crossbeam-utils = "0.7" [dev-dependencies] diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index 27510cff2..a71f1b0e9 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -1,6 +1,6 @@ use crate::latch::Latch; use crate::unwind; -use crossbeam_queue::SegQueue; +use crossbeam_deque::{Injector, Steal}; use std::any::Any; use std::cell::UnsafeCell; use std::mem; @@ -184,13 +184,13 @@ impl JobResult { /// Indirect queue to provide FIFO job priority. pub(super) struct JobFifo { - inner: SegQueue, + inner: Injector, } impl JobFifo { pub(super) fn new() -> Self { JobFifo { - inner: SegQueue::new(), + inner: Injector::new(), } } @@ -206,6 +206,12 @@ impl JobFifo { impl Job for JobFifo { unsafe fn execute(this: *const Self) { // We "execute" a queue by executing its first job, FIFO. - (*this).inner.pop().expect("job in fifo queue").execute() + loop { + match (*this).inner.steal() { + Steal::Success(job_ref) => break job_ref.execute(), + Steal::Empty => panic!("FIFO is empty"), + Steal::Retry => {} + } + } } } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 53032cca3..10781aba6 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -8,8 +8,7 @@ use crate::util::leak; use crate::{ ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, }; -use crossbeam_deque::{Steal, Stealer, Worker}; -use crossbeam_queue::SegQueue; +use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use std::any::Any; use std::cell::Cell; use std::collections::hash_map::DefaultHasher; @@ -136,7 +135,7 @@ pub(super) struct Registry { logger: Logger, thread_infos: Vec, sleep: Sleep, - injected_jobs: SegQueue, + injected_jobs: Injector, panic_handler: Option>, start_handler: Option>, exit_handler: Option>, @@ -240,7 +239,7 @@ impl Registry { logger: logger.clone(), thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), sleep: Sleep::new(logger, n_threads), - injected_jobs: SegQueue::new(), + injected_jobs: Injector::new(), terminate_count: AtomicUsize::new(1), panic_handler: builder.take_panic_handler(), start_handler: builder.take_start_handler(), @@ -413,13 +412,18 @@ impl Registry { } fn pop_injected_job(&self, worker_index: usize) -> Option { - let job = self.injected_jobs.pop().ok(); - if job.is_some() { - self.log(|| JobUninjected { - worker: worker_index, - }); + loop { + match self.injected_jobs.steal() { + Steal::Success(job) => { + self.log(|| JobUninjected { + worker: worker_index, + }); + return Some(job); + } + Steal::Empty => return None, + Steal::Retry => {} + } } - job } /// If already in a worker-thread of this registry, just execute `op`.