Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel queries #292

Merged
merged 22 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ bevy_winit = { path = "crates/bevy_winit", optional = true, version = "0.1" }
[dev-dependencies]
rand = "0.7.2"
serde = { version = "1", features = ["derive"]}
criterion = "0.3"

[[example]]
name = "hello_world"
Expand Down Expand Up @@ -179,6 +180,10 @@ path = "examples/ecs/startup_system.rs"
name = "ecs_guide"
path = "examples/ecs/ecs_guide.rs"

[[example]]
name = "parallel_query"
path = "examples/ecs/parallel_query.rs"

[[example]]
name = "breakout"
path = "examples/game/breakout.rs"
Expand Down Expand Up @@ -242,3 +247,8 @@ path = "examples/window/multiple_windows.rs"
[[example]]
name = "window_settings"
path = "examples/window/window_settings.rs"

[[bench]]
name = "iter"
path = "crates/bevy_tasks/benches/iter.rs"
harness = false
82 changes: 82 additions & 0 deletions crates/bevy_ecs/src/system/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use bevy_hecs::{
Archetype, Component, ComponentError, Entity, Fetch, Query as HecsQuery, QueryOne, Ref, RefMut,
World,
};
use bevy_tasks::ParallelIterator;
use std::marker::PhantomData;

/// Provides scoped access to a World according to a given [HecsQuery]
Expand Down Expand Up @@ -148,6 +149,29 @@ impl<'w, Q: HecsQuery> QueryBorrow<'w, Q> {
iter: None,
}
}

/// Like `iter`, but returns child iterators of at most `batch_size`
/// elements
///
/// Useful for distributing work over a threadpool using the
/// ParallelIterator interface.
///
/// Batch size needs to be chosen based on the task being done in
/// parallel. The elements in each batch are computed serially, while
/// the batches themselves are computed in parallel.
///
/// A too small batch size can cause too much overhead, since scheduling
/// each batch could take longer than running the batch. On the other
/// hand, a too large batch size risks that one batch is still running
/// long after the rest have finished.
pub fn par_iter<'q>(&'q mut self, batch_size: u32) -> ParIter<'q, 'w, Q> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[inline] would make this more consistent with some of the other functions in this file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its worth pointing out that inline sometimes regresses performance. iterators are especially weird in that respect. its worth testing perf for every inline/non-inline decision made.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't profiled inline vs not inline here, so I'd prefer to let the compiler make the decision for now.

ParIter {
borrow: self,
archetype_index: 0,
batch_size,
batch: 0,
}
}
}

unsafe impl<'w, Q: HecsQuery> Send for QueryBorrow<'w, Q> {}
Expand Down Expand Up @@ -257,3 +281,61 @@ impl<Q: HecsQuery> ChunkIter<Q> {
}
}
}

/// Batched version of `QueryIter`
pub struct ParIter<'q, 'w, Q: HecsQuery> {
borrow: &'q mut QueryBorrow<'w, Q>,
archetype_index: u32,
batch_size: u32,
batch: u32,
}

impl<'q, 'w, Q: HecsQuery> ParallelIterator<Batch<'q, Q>> for ParIter<'q, 'w, Q> {
type Item = <Q::Fetch as Fetch<'q>>::Item;

fn next_batch(&mut self) -> Option<Batch<'q, Q>> {
loop {
let archetype = self.borrow.archetypes.get(self.archetype_index as usize)?;
let offset = self.batch_size * self.batch;
if offset >= archetype.len() {
self.archetype_index += 1;
self.batch = 0;
continue;
}
if let Some(fetch) = unsafe { Q::Fetch::get(archetype, offset as usize) } {
self.batch += 1;
return Some(Batch {
_marker: PhantomData,
state: ChunkIter {
fetch,
len: self.batch_size.min(archetype.len() - offset),
},
});
} else {
self.archetype_index += 1;
debug_assert_eq!(
self.batch, 0,
"query fetch should always reject at the first batch or not at all"
);
continue;
}
}
}
}

/// A sequence of entities yielded by `ParIter`
pub struct Batch<'q, Q: HecsQuery> {
_marker: PhantomData<&'q ()>,
state: ChunkIter<Q>,
}

impl<'q, 'w, Q: HecsQuery> Iterator for Batch<'q, Q> {
type Item = <Q::Fetch as Fetch<'q>>::Item;

fn next(&mut self) -> Option<Self::Item> {
let components = unsafe { self.state.next()? };
Some(components)
}
}

unsafe impl<'q, Q: HecsQuery> Send for Batch<'q, Q> {}
148 changes: 148 additions & 0 deletions crates/bevy_tasks/benches/iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use bevy_tasks::{ParallelIterator, TaskPoolBuilder};
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};

struct ParChunks<'a, T>(std::slice::Chunks<'a, T>);
impl<'a, T> ParallelIterator<std::slice::Iter<'a, T>> for ParChunks<'a, T>
where
T: 'a + Send + Sync,
{
type Item = &'a T;

fn next_batch(&mut self) -> Option<std::slice::Iter<'a, T>> {
self.0.next().map(|s| s.iter())
}
}

struct ParChunksMut<'a, T>(std::slice::ChunksMut<'a, T>);
impl<'a, T> ParallelIterator<std::slice::IterMut<'a, T>> for ParChunksMut<'a, T>
where
T: 'a + Send + Sync,
{
type Item = &'a mut T;

fn next_batch(&mut self) -> Option<std::slice::IterMut<'a, T>> {
self.0.next().map(|s| s.iter_mut())
}
}

fn bench_overhead(c: &mut Criterion) {
fn noop(_: &mut usize) {};

let mut v = (0..10000).collect::<Vec<usize>>();
c.bench_function("overhead_iter", |b| {
b.iter(|| {
v.iter_mut().for_each(noop);
})
});

let mut v = (0..10000).collect::<Vec<usize>>();
let mut group = c.benchmark_group("overhead_par_iter");
for thread_count in &[1, 2, 4, 8, 16, 32] {
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
group.bench_with_input(
BenchmarkId::new("threads", thread_count),
thread_count,
|b, _| {
b.iter(|| {
ParChunksMut(v.chunks_mut(100)).for_each(&pool, noop);
})
},
);
}
group.finish();
}

fn bench_for_each(c: &mut Criterion) {
fn busy_work(n: usize) {
let mut i = n;
while i > 0 {
i = black_box(i - 1);
}
}

let mut v = (0..10000).collect::<Vec<usize>>();
c.bench_function("for_each_iter", |b| {
b.iter(|| {
v.iter_mut().for_each(|x| {
busy_work(10000);
*x *= *x;
});
})
});

let mut v = (0..10000).collect::<Vec<usize>>();
let mut group = c.benchmark_group("for_each_par_iter");
for thread_count in &[1, 2, 4, 8, 16, 32] {
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
group.bench_with_input(
BenchmarkId::new("threads", thread_count),
thread_count,
|b, _| {
b.iter(|| {
ParChunksMut(v.chunks_mut(100)).for_each(&pool, |x| {
busy_work(10000);
*x *= *x;
});
})
},
);
}
group.finish();
}

fn bench_many_maps(c: &mut Criterion) {
fn busy_doubles(mut x: usize, n: usize) -> usize {
for _ in 0..n {
x = black_box(x.wrapping_mul(2));
}
x
}

let v = (0..10000).collect::<Vec<usize>>();
c.bench_function("many_maps_iter", |b| {
b.iter(|| {
v.iter()
.map(|x| busy_doubles(*x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.for_each(drop);
})
});

let v = (0..10000).collect::<Vec<usize>>();
let mut group = c.benchmark_group("many_maps_par_iter");
for thread_count in &[1, 2, 4, 8, 16, 32] {
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
group.bench_with_input(
BenchmarkId::new("threads", thread_count),
thread_count,
|b, _| {
b.iter(|| {
ParChunks(v.chunks(100))
.map(|x| busy_doubles(*x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.for_each(&pool, drop);
})
},
);
}
group.finish();
}

criterion_group!(benches, bench_overhead, bench_for_each, bench_many_maps);
criterion_main!(benches);
Loading