From 437b3696f5e084c0ae0e5350bef7797a4bc2b023 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 4 Jun 2022 00:20:57 -0700 Subject: [PATCH 01/14] Start systems while prepare_systems is running --- .../src/schedule/executor_parallel.rs | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 149d8d02bc2df..bb2e0c0815246 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -175,8 +175,9 @@ impl ParallelExecutor { for (index, (system_data, system)) in self.system_metadata.iter_mut().zip(systems).enumerate() { + let should_run = system.should_run(); // Spawn the system task. - if system.should_run() { + if should_run { self.should_run.set(index, true); let start_receiver = system_data.start_receiver.clone(); let finish_sender = self.finish_sender.clone(); @@ -212,7 +213,24 @@ impl ParallelExecutor { } // Queue the system if it has no dependencies, otherwise reset its dependency counter. if system_data.dependencies_total == 0 { - self.queued.insert(index); + // Non-send systems are considered conflicting with each other. + if should_run + && system_data.is_send + && system_data + .archetype_component_access + .is_compatible(&self.active_archetype_component_access) + { + system_data + .start_sender + .try_send(()) + .unwrap_or_else(|error| unreachable!("{}", error)); + self.running.set(index, true); + // Add this system's access information to the active access information. + self.active_archetype_component_access + .extend(&system_data.archetype_component_access); + } else { + self.queued.insert(index); + } } else { system_data.dependencies_now = system_data.dependencies_total; } From c522b7cbf54b9ce810f10203b6b972b10cdcd514 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 4 Jun 2022 00:52:30 -0700 Subject: [PATCH 02/14] Fix tests and enable non-send systems --- crates/bevy_ecs/src/schedule/executor_parallel.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index bb2e0c0815246..7427ec58ae487 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -169,6 +169,8 @@ impl ParallelExecutor { systems: &'scope mut [ParallelSystemContainer], world: &'scope World, ) { + #[cfg(test)] + let mut started_systems = 0; #[cfg(feature = "trace")] let _span = bevy_utils::tracing::info_span!("prepare_systems").entered(); self.should_run.clear(); @@ -215,16 +217,23 @@ impl ParallelExecutor { if system_data.dependencies_total == 0 { // Non-send systems are considered conflicting with each other. if should_run - && system_data.is_send + && (!self.non_send_running || system_data.is_send) && system_data .archetype_component_access .is_compatible(&self.active_archetype_component_access) { + #[cfg(test)] + { + started_systems += 1; + } system_data .start_sender .try_send(()) .unwrap_or_else(|error| unreachable!("{}", error)); self.running.set(index, true); + if !system_data.is_send { + self.non_send_running = true; + } // Add this system's access information to the active access information. self.active_archetype_component_access .extend(&system_data.archetype_component_access); @@ -235,6 +244,10 @@ impl ParallelExecutor { system_data.dependencies_now = system_data.dependencies_total; } } + #[cfg(test)] + if started_systems != 0 { + self.emit_event(StartedSystems(started_systems)); + } } /// Determines if the system with given index has no conflicts with already running systems. From 9265c1aa3fa4eac24d9c3e6948d2d9d407c4283b Mon Sep 17 00:00:00 2001 From: James Liu Date: Sat, 4 Jun 2022 15:11:33 -0700 Subject: [PATCH 03/14] Update comment about non-send systems. Co-authored-by: Alice Cecile --- crates/bevy_ecs/src/schedule/executor_parallel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 7427ec58ae487..e2b7b17234009 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -215,7 +215,7 @@ impl ParallelExecutor { } // Queue the system if it has no dependencies, otherwise reset its dependency counter. if system_data.dependencies_total == 0 { - // Non-send systems are considered conflicting with each other. + // Non-send systems are incompatible, as they must all be executed on the main thread. if should_run && (!self.non_send_running || system_data.is_send) && system_data From a15585e50237040a5e40fbd49b910ffb110e8947 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 10 Jun 2022 17:04:52 -0700 Subject: [PATCH 04/14] Omit start channel entirely if possible --- .../src/schedule/executor_parallel.rs | 115 ++++++++++-------- 1 file changed, 67 insertions(+), 48 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index dac15f685687e..7887936a35a55 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -125,26 +125,28 @@ impl ParallelSystemExecutor for ParallelExecutor { ComputeTaskPool::init(TaskPool::default).scope(|scope| { self.prepare_systems(scope, systems, world); + if 0 == self.queued.count_ones(..) { + return; + } let parallel_executor = async { // All systems have been ran if there are no queued or running systems. while 0 != self.queued.count_ones(..) + self.running.count_ones(..) { - self.process_queued_systems().await; - // Avoid deadlocking if no systems were actually started. - if self.running.count_ones(..) != 0 { - // Wait until at least one system has finished. - let index = self - .finish_receiver - .recv() - .await - .unwrap_or_else(|error| unreachable!("{}", error)); + self.process_queued_systems(); + // Should never deadlock. + debug_assert!(self.running.count_ones(..) != 0); + // Wait until at least one system has finished. + let index = self + .finish_receiver + .recv() + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + self.process_finished_system(index); + // Gather other systems than may have finished. + while let Ok(index) = self.finish_receiver.try_recv() { self.process_finished_system(index); - // Gather other systems than may have finished. - while let Ok(index) = self.finish_receiver.try_recv() { - self.process_finished_system(index); - } - // At least one system has finished, so active access is outdated. - self.rebuild_active_access(); } + // At least one system has finished, so active access is outdated. + self.rebuild_active_access(); self.update_counters_and_queue_systems(); } }; @@ -178,7 +180,6 @@ impl ParallelExecutor { // Spawn the system task. if should_run { self.should_run.set(index, true); - let start_receiver = system_data.start_receiver.clone(); let finish_sender = self.finish_sender.clone(); let system = system.system_mut(); #[cfg(feature = "trace")] // NB: outside the task to get the TLS current span @@ -186,47 +187,37 @@ impl ParallelExecutor { #[cfg(feature = "trace")] let overhead_span = bevy_utils::tracing::info_span!("system overhead", name = &*system.name()); - let task = async move { - start_receiver - .recv() - .await - .unwrap_or_else(|error| unreachable!("{}", error)); + + let run = move || { #[cfg(feature = "trace")] let system_guard = system_span.enter(); unsafe { system.run_unsafe((), world) }; - #[cfg(feature = "trace")] - drop(system_guard); - finish_sender - .send(index) - .await - .unwrap_or_else(|error| unreachable!("{}", error)); }; - #[cfg(feature = "trace")] - let task = task.instrument(overhead_span); - if system_data.is_send { - scope.spawn(task); - } else { - scope.spawn_local(task); - } - } - // Queue the system if it has no dependencies, otherwise reset its dependency counter. - if system_data.dependencies_total == 0 { - // Non-send systems are incompatible, as they must all be executed on the main thread. - if should_run - && (!self.non_send_running || system_data.is_send) + if (!self.non_send_running || system_data.is_send) && system_data .archetype_component_access - .is_compatible(&self.active_archetype_component_access) - { + .is_compatible(&self.active_archetype_component_access) { + let task = async move { + run(); + finish_sender + .try_send(index) + .unwrap_or_else(|error| unreachable!("{}", error)); + }; + + #[cfg(feature = "trace")] + let task = task.instrument(overhead_span); + if system_data.is_send { + scope.spawn(task); + } else { + scope.spawn_local(task); + } + #[cfg(test)] { started_systems += 1; } - system_data - .start_sender - .try_send(()) - .unwrap_or_else(|error| unreachable!("{}", error)); + self.running.set(index, true); if !system_data.is_send { self.non_send_running = true; @@ -235,6 +226,35 @@ impl ParallelExecutor { self.active_archetype_component_access .extend(&system_data.archetype_component_access); } else { + let start_receiver = system_data.start_receiver.clone(); + let task = async move { + start_receiver + .recv() + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + run(); + #[cfg(feature = "trace")] + let system_guard = system_span.enter(); + unsafe { system.run_unsafe((), world) }; + #[cfg(feature = "trace")] + drop(system_guard); + finish_sender + .try_send(index) + .unwrap_or_else(|error| unreachable!("{}", error)); + }; + + #[cfg(feature = "trace")] + let task = task.instrument(overhead_span); + if system_data.is_send { + scope.spawn(task); + } else { + scope.spawn_local(task); + } + } + } + // Queue the system if it has no dependencies, otherwise reset its dependency counter. + if system_data.dependencies_total == 0 { + if !should_run { self.queued.insert(index); } } else { @@ -276,8 +296,7 @@ impl ParallelExecutor { } system_metadata .start_sender - .send(()) - .await + .try_send(()) .unwrap_or_else(|error| unreachable!("{}", error)); self.running.set(index, true); if !system_metadata.is_send { From 816ead5f7fd62f9b92f92c71dcd891ffc68b5508 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 10 Jun 2022 18:31:33 -0700 Subject: [PATCH 05/14] Review comments and formatting. --- .../src/schedule/executor_parallel.rs | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 7887936a35a55..b8908549c4e37 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -188,16 +188,17 @@ impl ParallelExecutor { let overhead_span = bevy_utils::tracing::info_span!("system overhead", name = &*system.name()); - let run = move || { + let mut run = move || { #[cfg(feature = "trace")] let system_guard = system_span.enter(); unsafe { system.run_unsafe((), world) }; }; - if (!self.non_send_running || system_data.is_send) - && system_data - .archetype_component_access - .is_compatible(&self.active_archetype_component_access) { + if Self::can_start_now( + self.non_send_running, + system_data, + &self.active_archetype_component_access, + ) { let task = async move { run(); finish_sender @@ -233,11 +234,6 @@ impl ParallelExecutor { .await .unwrap_or_else(|error| unreachable!("{}", error)); run(); - #[cfg(feature = "trace")] - let system_guard = system_span.enter(); - unsafe { system.run_unsafe((), world) }; - #[cfg(feature = "trace")] - drop(system_guard); finish_sender .try_send(index) .unwrap_or_else(|error| unreachable!("{}", error)); @@ -268,19 +264,23 @@ impl ParallelExecutor { } /// Determines if the system with given index has no conflicts with already running systems. - fn can_start_now(&self, index: usize) -> bool { - let system_data = &self.system_metadata[index]; + #[inline] + fn can_start_now( + non_send_running: bool, + system_data: &SystemSchedulingMetadata, + active_archetype_component_access: &Access, + ) -> bool { // Non-send systems are considered conflicting with each other. - (!self.non_send_running || system_data.is_send) + (!non_send_running || system_data.is_send) && system_data .archetype_component_access - .is_compatible(&self.active_archetype_component_access) + .is_compatible(active_archetype_component_access) } /// Starts all non-conflicting queued systems, moves them from `queued` to `running`, /// adds their access information to active access information; /// processes queued systems that shouldn't run this iteration as completed immediately. - async fn process_queued_systems(&mut self) { + fn process_queued_systems(&mut self) { #[cfg(test)] let mut started_systems = 0; for index in self.queued.ones() { @@ -289,7 +289,11 @@ impl ParallelExecutor { let system_metadata = &self.system_metadata[index]; if !self.should_run[index] { self.dependants_scratch.extend(&system_metadata.dependants); - } else if self.can_start_now(index) { + } else if Self::can_start_now( + self.non_send_running, + system_metadata, + &self.active_archetype_component_access, + ) { #[cfg(test)] { started_systems += 1; From 425c157c73567cb1c2c61d677ad703842075ce67 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 10 Jun 2022 18:34:26 -0700 Subject: [PATCH 06/14] Test comment --- crates/bevy_ecs/src/schedule/executor_parallel.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index b8908549c4e37..9445e44d95c25 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -168,6 +168,7 @@ impl ParallelExecutor { systems: &'scope mut [ParallelSystemContainer], world: &'scope World, ) { + // These are used as a part of a unit test. #[cfg(test)] let mut started_systems = 0; #[cfg(feature = "trace")] @@ -281,6 +282,8 @@ impl ParallelExecutor { /// adds their access information to active access information; /// processes queued systems that shouldn't run this iteration as completed immediately. fn process_queued_systems(&mut self) { + // These are used as a part of a unit test as seen in `process_queued_systems`. + // Removing them will cause the test to fail. #[cfg(test)] let mut started_systems = 0; for index in self.queued.ones() { From b0275d703ba6ff9704255dbe60eb40d3ab7880b3 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 10 Jun 2022 19:20:09 -0700 Subject: [PATCH 07/14] Fix tests --- .../src/schedule/executor_parallel.rs | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 9445e44d95c25..4dc883199f7a0 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -56,7 +56,7 @@ pub struct ParallelExecutor { impl Default for ParallelExecutor { fn default() -> Self { - let (finish_sender, finish_receiver) = async_channel::unbounded(); + let (finish_sender, finish_receiver) = async_channel::bounded(128); Self { system_metadata: Default::default(), finish_sender, @@ -125,28 +125,29 @@ impl ParallelSystemExecutor for ParallelExecutor { ComputeTaskPool::init(TaskPool::default).scope(|scope| { self.prepare_systems(scope, systems, world); - if 0 == self.queued.count_ones(..) { + if 0 == self.queued.count_ones(..) + self.running.count_ones(..) { return; } let parallel_executor = async { // All systems have been ran if there are no queued or running systems. while 0 != self.queued.count_ones(..) + self.running.count_ones(..) { self.process_queued_systems(); - // Should never deadlock. - debug_assert!(self.running.count_ones(..) != 0); - // Wait until at least one system has finished. - let index = self - .finish_receiver - .recv() - .await - .unwrap_or_else(|error| unreachable!("{}", error)); - self.process_finished_system(index); - // Gather other systems than may have finished. - while let Ok(index) = self.finish_receiver.try_recv() { + // Avoid deadlocking if no systems were actually started. + if self.running.count_ones(..) != 0 { + // Wait until at least one system has finished. + let index = self + .finish_receiver + .recv() + .await + .unwrap_or_else(|error| unreachable!("{}", error)); self.process_finished_system(index); + // Gather other systems than may have finished. + while let Ok(index) = self.finish_receiver.try_recv() { + self.process_finished_system(index); + } + // At least one system has finished, so active access is outdated. + self.rebuild_active_access(); } - // At least one system has finished, so active access is outdated. - self.rebuild_active_access(); self.update_counters_and_queue_systems(); } }; @@ -178,6 +179,13 @@ impl ParallelExecutor { self.system_metadata.iter_mut().zip(systems).enumerate() { let should_run = system.should_run(); + let can_start = should_run + && system_data.dependencies_total == 0 + && Self::can_start_now( + self.non_send_running, + system_data, + &self.active_archetype_component_access, + ); // Spawn the system task. if should_run { self.should_run.set(index, true); @@ -195,15 +203,12 @@ impl ParallelExecutor { unsafe { system.run_unsafe((), world) }; }; - if Self::can_start_now( - self.non_send_running, - system_data, - &self.active_archetype_component_access, - ) { + if can_start { let task = async move { run(); finish_sender - .try_send(index) + .send(index) + .await .unwrap_or_else(|error| unreachable!("{}", error)); }; @@ -236,7 +241,8 @@ impl ParallelExecutor { .unwrap_or_else(|error| unreachable!("{}", error)); run(); finish_sender - .try_send(index) + .send(index) + .await .unwrap_or_else(|error| unreachable!("{}", error)); }; @@ -251,7 +257,7 @@ impl ParallelExecutor { } // Queue the system if it has no dependencies, otherwise reset its dependency counter. if system_data.dependencies_total == 0 { - if !should_run { + if !can_start { self.queued.insert(index); } } else { From 26ed9959e6f253a9fdac7d83d992e80908d58d07 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 11 Jun 2022 00:24:17 -0700 Subject: [PATCH 08/14] Silence Clippy --- crates/bevy_ecs/src/schedule/executor_parallel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 4dc883199f7a0..928029c1e4555 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -199,7 +199,7 @@ impl ParallelExecutor { let mut run = move || { #[cfg(feature = "trace")] - let system_guard = system_span.enter(); + let _system_guard = system_span.enter(); unsafe { system.run_unsafe((), world) }; }; From 5e44ab5c3a8893155d546c086a82a2e9e5fb539a Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 11 Jun 2022 05:29:50 -0700 Subject: [PATCH 09/14] Simpler checks and a small cleanup --- crates/bevy_ecs/src/schedule/executor_parallel.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 928029c1e4555..ce9fcbdd9388c 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -125,7 +125,7 @@ impl ParallelSystemExecutor for ParallelExecutor { ComputeTaskPool::init(TaskPool::default).scope(|scope| { self.prepare_systems(scope, systems, world); - if 0 == self.queued.count_ones(..) + self.running.count_ones(..) { + if 0 == self.should_run.count_ones(..) { return; } let parallel_executor = async { @@ -188,7 +188,7 @@ impl ParallelExecutor { ); // Spawn the system task. if should_run { - self.should_run.set(index, true); + self.should_run.insert(index); let finish_sender = self.finish_sender.clone(); let system = system.system_mut(); #[cfg(feature = "trace")] // NB: outside the task to get the TLS current span @@ -225,7 +225,7 @@ impl ParallelExecutor { started_systems += 1; } - self.running.set(index, true); + self.running.insert(index); if !system_data.is_send { self.non_send_running = true; } From 8d05834eacf936f7337a73953c67345ea4a29aec Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 11 Jun 2022 05:37:29 -0700 Subject: [PATCH 10/14] Use event_listener instead of a async_channel<()> --- crates/bevy_ecs/Cargo.toml | 1 + .../src/schedule/executor_parallel.rs | 23 ++++++------------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index 277ecc480ffc9..537afbd1dc506 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -21,6 +21,7 @@ bevy_utils = { path = "../bevy_utils", version = "0.8.0-dev" } bevy_ecs_macros = { path = "macros", version = "0.8.0-dev" } async-channel = "1.4" +event-listener = "2.5" thread_local = "1.1.4" fixedbitset = "0.4" fxhash = "0.2" diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index ce9fcbdd9388c..8b26cf55637f3 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -4,6 +4,7 @@ use crate::{ schedule::{ParallelSystemContainer, ParallelSystemExecutor}, world::World, }; +use event_listener::Event; use async_channel::{Receiver, Sender}; use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; #[cfg(feature = "trace")] @@ -15,9 +16,7 @@ use SchedulingEvent::*; struct SystemSchedulingMetadata { /// Used to signal the system's task to start the system. - start_sender: Sender<()>, - /// Receives the signal to start the system. - start_receiver: Receiver<()>, + start: Event, /// Indices of systems that depend on this one, used to decrement their /// dependency counters when this system finishes. dependants: Vec, @@ -84,10 +83,8 @@ impl ParallelSystemExecutor for ParallelExecutor { for container in systems.iter() { let dependencies_total = container.dependencies().len(); let system = container.system(); - let (start_sender, start_receiver) = async_channel::bounded(1); self.system_metadata.push(SystemSchedulingMetadata { - start_sender, - start_receiver, + start: Event::new(), dependants: vec![], dependencies_total, dependencies_now: 0, @@ -233,12 +230,9 @@ impl ParallelExecutor { self.active_archetype_component_access .extend(&system_data.archetype_component_access); } else { - let start_receiver = system_data.start_receiver.clone(); + let start_listener = system_data.start.listen(); let task = async move { - start_receiver - .recv() - .await - .unwrap_or_else(|error| unreachable!("{}", error)); + start_listener.await; run(); finish_sender .send(index) @@ -307,11 +301,8 @@ impl ParallelExecutor { { started_systems += 1; } - system_metadata - .start_sender - .try_send(()) - .unwrap_or_else(|error| unreachable!("{}", error)); - self.running.set(index, true); + system_metadata.start.notify(1); + self.running.insert(index); if !system_metadata.is_send { self.non_send_running = true; } From 40e5c7305135b0ec90c02d9f5b17d527aa284e86 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 11 Jun 2022 14:01:58 -0700 Subject: [PATCH 11/14] Use a lower-overhead notify --- crates/bevy_ecs/src/schedule/executor_parallel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 8b26cf55637f3..ff14f2d2476e6 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -301,7 +301,7 @@ impl ParallelExecutor { { started_systems += 1; } - system_metadata.start.notify(1); + system_metadata.start.notify_additional_relaxed(1); self.running.insert(index); if !system_metadata.is_send { self.non_send_running = true; From 2c85b477168ffe628b03cdd4237fbd7ee1b5e168 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 13 Jun 2022 06:06:21 -0700 Subject: [PATCH 12/14] Formatting --- crates/bevy_ecs/src/schedule/executor_parallel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index ff14f2d2476e6..cfd31b418c0c7 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -4,11 +4,11 @@ use crate::{ schedule::{ParallelSystemContainer, ParallelSystemExecutor}, world::World, }; -use event_listener::Event; use async_channel::{Receiver, Sender}; use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; #[cfg(feature = "trace")] use bevy_utils::tracing::Instrument; +use event_listener::Event; use fixedbitset::FixedBitSet; #[cfg(test)] From 88911652e2d551f82e88f3b2d15474e43bdb3645 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 17 Jun 2022 14:19:24 -0700 Subject: [PATCH 13/14] Review comments --- .../src/schedule/executor_parallel.rs | 146 ++++++++++-------- 1 file changed, 81 insertions(+), 65 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index cfd31b418c0c7..7fbb01c4abdcd 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -55,6 +55,10 @@ pub struct ParallelExecutor { impl Default for ParallelExecutor { fn default() -> Self { + // Using a bounded channel here as it avoids allocations when signaling + // and generally remains hotter in memory. It'll take 128 systems completing + // before the parallel executor runs before this overflows. If it overflows + // all systems will just suspend until the parallel executor runs. let (finish_sender, finish_receiver) = async_channel::bounded(128); Self { system_metadata: Default::default(), @@ -122,7 +126,7 @@ impl ParallelSystemExecutor for ParallelExecutor { ComputeTaskPool::init(TaskPool::default).scope(|scope| { self.prepare_systems(scope, systems, world); - if 0 == self.should_run.count_ones(..) { + if self.should_run.count_ones(..) == 0 { return; } let parallel_executor = async { @@ -183,79 +187,91 @@ impl ParallelExecutor { system_data, &self.active_archetype_component_access, ); + + // Queue the system if it has no dependencies, otherwise reset its dependency counter. + if system_data.dependencies_total == 0 { + if !can_start { + self.queued.insert(index); + } + } else { + system_data.dependencies_now = system_data.dependencies_total; + } + + if !should_run { + continue; + } + // Spawn the system task. - if should_run { - self.should_run.insert(index); - let finish_sender = self.finish_sender.clone(); - let system = system.system_mut(); - #[cfg(feature = "trace")] // NB: outside the task to get the TLS current span - let system_span = bevy_utils::tracing::info_span!("system", name = &*system.name()); + self.should_run.insert(index); + let finish_sender = self.finish_sender.clone(); + let system = system.system_mut(); + #[cfg(feature = "trace")] // NB: outside the task to get the TLS current span + let system_span = bevy_utils::tracing::info_span!("system", name = &*system.name()); + #[cfg(feature = "trace")] + let overhead_span = + bevy_utils::tracing::info_span!("system overhead", name = &*system.name()); + + let mut run = move || { #[cfg(feature = "trace")] - let overhead_span = - bevy_utils::tracing::info_span!("system overhead", name = &*system.name()); + let _system_guard = system_span.enter(); + unsafe { system.run_unsafe((), world) }; + }; - let mut run = move || { - #[cfg(feature = "trace")] - let _system_guard = system_span.enter(); - unsafe { system.run_unsafe((), world) }; + if can_start { + let task = async move { + run(); + // This will never panic: + // - The channel is never closed or dropped. + // - Overflowing the bounded size will just suspend until + // there is capacity. + finish_sender + .send(index) + .await + .unwrap_or_else(|error| unreachable!("{}", error)); }; - if can_start { - let task = async move { - run(); - finish_sender - .send(index) - .await - .unwrap_or_else(|error| unreachable!("{}", error)); - }; - - #[cfg(feature = "trace")] - let task = task.instrument(overhead_span); - if system_data.is_send { - scope.spawn(task); - } else { - scope.spawn_local(task); - } - - #[cfg(test)] - { - started_systems += 1; - } - - self.running.insert(index); - if !system_data.is_send { - self.non_send_running = true; - } - // Add this system's access information to the active access information. - self.active_archetype_component_access - .extend(&system_data.archetype_component_access); + #[cfg(feature = "trace")] + let task = task.instrument(overhead_span); + if system_data.is_send { + scope.spawn(task); } else { - let start_listener = system_data.start.listen(); - let task = async move { - start_listener.await; - run(); - finish_sender - .send(index) - .await - .unwrap_or_else(|error| unreachable!("{}", error)); - }; - - #[cfg(feature = "trace")] - let task = task.instrument(overhead_span); - if system_data.is_send { - scope.spawn(task); - } else { - scope.spawn_local(task); - } + scope.spawn_local(task); } - } - // Queue the system if it has no dependencies, otherwise reset its dependency counter. - if system_data.dependencies_total == 0 { - if !can_start { - self.queued.insert(index); + + #[cfg(test)] + { + started_systems += 1; } + + self.running.insert(index); + if !system_data.is_send { + self.non_send_running = true; + } + // Add this system's access information to the active access information. + self.active_archetype_component_access + .extend(&system_data.archetype_component_access); } else { - system_data.dependencies_now = system_data.dependencies_total; + let start_listener = system_data.start.listen(); + let task = async move { + start_listener.await; + run(); + // This will never panic: + // - The channel is never closed or dropped. + // - Overflowing the bounded size will just suspend until + // there is capacity. + finish_sender + .send(index) + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + }; + + #[cfg(feature = "trace")] + let task = task.instrument(overhead_span); + if system_data.is_send { + scope.spawn(task); + } else { + scope.spawn_local(task); + } } } #[cfg(test)] From 07571398c4ce77049956ec8c079e83233610aaff Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 13 Sep 2022 14:54:46 -0400 Subject: [PATCH 14/14] Fix bad merge --- crates/bevy_ecs/src/schedule/executor_parallel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index bb4bc299d6fe6..96b272d03f2c8 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -277,7 +277,7 @@ impl ParallelExecutor { } #[cfg(test)] if started_systems != 0 { - self.emit_event(StartedSystems(started_systems)); + self.emit_event(SchedulingEvent::StartedSystems(started_systems)); } }