diff --git a/src/rust/engine/graph/src/entry.rs b/src/rust/engine/graph/src/entry.rs index 67c4fc7e557..bd254ce20be 100644 --- a/src/rust/engine/graph/src/entry.rs +++ b/src/rust/engine/graph/src/entry.rs @@ -52,52 +52,6 @@ impl Generation { } } -/// -/// A result from running a Node. -/// -/// If the value is Dirty, the consumer should check whether the dependencies of the Node have the -/// same values as they did when this Node was last run; if so, the value can be re-used -/// (and should be marked "Clean"). -/// -/// If the value is Clean, the consumer can simply use the value as-is. -/// -#[derive(Clone, Debug)] -pub(crate) enum EntryResult { - Clean(Result), - Dirty(Result), -} - -impl EntryResult { - fn is_dirty(&self) -> bool { - if let EntryResult::Dirty(..) = self { - true - } else { - false - } - } - - fn dirty(&mut self) { - if let EntryResult::Clean(value) = self { - *self = EntryResult::Dirty(value.clone()) - } - } - - fn clean(&mut self) { - if let EntryResult::Dirty(value) = self { - *self = EntryResult::Clean(value.clone()) - } - } -} - -impl AsRef> for EntryResult { - fn as_ref(&self) -> &Result { - match self { - EntryResult::Clean(v) => v, - EntryResult::Dirty(v) => v, - } - } -} - #[allow(clippy::type_complexity)] #[derive(Debug)] pub(crate) enum EntryState { @@ -111,7 +65,7 @@ pub(crate) enum EntryState { NotStarted { run_token: RunToken, generation: Generation, - previous_result: Option>, + previous_result: Option>, }, // A node that is running. A running node that has been marked dirty re-runs rather than // completing. @@ -122,7 +76,7 @@ pub(crate) enum EntryState { generation: Generation, start_time: Instant, waiters: Vec>>, - previous_result: Option>, + previous_result: Option>, dirty: bool, }, // A node that has completed, and then possibly been marked dirty. Because marking a node @@ -131,8 +85,9 @@ pub(crate) enum EntryState { Completed { run_token: RunToken, generation: Generation, - result: EntryResult, + result: Result, dep_generations: Vec, + dirty: bool, }, } @@ -203,9 +158,8 @@ impl Entry { let state = self.state.lock(); match *state { EntryState::Completed { - result: EntryResult::Clean(ref result), - .. - } => Some(result.clone()), + ref result, dirty, .. + } if !dirty => Some(result.clone()), _ => None, } } @@ -221,7 +175,7 @@ impl Entry { run_token: RunToken, generation: Generation, previous_dep_generations: Option>, - previous_result: Option>, + previous_result: Option>, ) -> EntryState where C: NodeContext, @@ -289,15 +243,16 @@ impl Entry { start_time: Instant::now(), run_token, generation, - previous_result, + previous_result: previous_result, dirty: false, } } &EntryKey::Cyclic(_) => EntryState::Completed { - result: EntryResult::Clean(Err(N::Error::cyclic())), + result: Err(N::Error::cyclic()), dep_generations: Vec::new(), run_token, generation, + dirty: false, }, } } @@ -338,9 +293,10 @@ impl Entry { &mut EntryState::Completed { ref result, generation, + dirty, .. - } if self.node.content().cacheable() && !result.is_dirty() => { - return future::result(result.as_ref().clone()) + } if !dirty && self.node.content().cacheable() => { + return future::result(result.clone()) .map(move |res| (res, generation)) .to_boxed(); } @@ -367,21 +323,21 @@ impl Entry { EntryState::Completed { run_token, generation, - mut result, + result, dep_generations, + dirty, } => { - trace!( - "Re-starting node {:?}. It was: previous_result={:?}, cacheable={}", - self.node, - result, - self.node.content().cacheable() - ); assert!( - result.is_dirty() || !self.node.content().cacheable(), + dirty || !self.node.content().cacheable(), "A clean Node should not reach this point: {:?}", result ); - result.dirty(); + trace!( + "Re-starting node {:?}. It was: dirty={}, cacheable={}", + self.node, + dirty, + self.node.content().cacheable() + ); // The Node has already completed but is now marked dirty. This indicates that we are the // first caller to request it since it was marked dirty. We attempt to clean it (which will // cause it to re-run if the dep_generations mismatch). @@ -460,7 +416,7 @@ impl Entry { waiters, run_token, generation, - mut previous_result, + previous_result, dirty, .. } => { @@ -472,9 +428,6 @@ impl Entry { "Not completing node {:?} because it was invalidated before completing.", self.node ); - if let Some(previous_result) = previous_result.as_mut() { - previous_result.dirty(); - } EntryState::NotStarted { run_token: run_token.next(), generation, @@ -487,9 +440,6 @@ impl Entry { "Not completing node {:?} because it was dirtied before completing.", self.node ); - if let Some(previous_result) = previous_result.as_mut() { - previous_result.dirty(); - } Self::run( context, &self.node, @@ -502,19 +452,19 @@ impl Entry { } else { // If the new result does not match the previous result, the generation increments. let (generation, next_result) = if let Some(result) = result { - if Some(&result) == previous_result.as_ref().map(EntryResult::as_ref) { + if Some(&result) == previous_result.as_ref() { // Node was re-executed, but had the same result value. - (generation, EntryResult::Clean(result)) + (generation, result) } else { - (generation.next(), EntryResult::Clean(result)) + (generation.next(), result) } } else { // Node was marked clean. // NB: The `expect` here avoids a clone and a comparison: see the method docs. - let mut result = - previous_result.expect("A Node cannot be marked clean without a previous result."); - result.clean(); - (generation, result) + ( + generation, + previous_result.expect("A Node cannot be marked clean without a previous result."), + ) }; // Notify all waiters (ignoring any that have gone away), and then store the value. // A waiter will go away whenever they drop the `Future` `Receiver` of the value, perhaps @@ -526,13 +476,14 @@ impl Entry { waiters.len() ); for waiter in waiters { - let _ = waiter.send(next_result.as_ref().clone().map(|res| (res, generation))); + let _ = waiter.send(next_result.clone().map(|res| (res, generation))); } EntryState::Completed { result: next_result, dep_generations, run_token, generation, + dirty: false, } } } @@ -590,23 +541,15 @@ impl Entry { /// /// Clears the state of this Node, forcing it to be recomputed. /// - /// # Arguments - /// - /// * `graph_still_contains_edges` - If the caller has guaranteed that all edges from this Node - /// have been removed from the graph, they should pass false here, else true. We may want to - /// remove this parameter, and force this method to remove the edges, but that would require - /// acquiring the graph lock here, which we currently don't do. - /// - pub(crate) fn clear(&mut self, graph_still_contains_edges: bool) { + pub(crate) fn clear(&mut self) { let mut state = self.state.lock(); - let (run_token, generation, mut previous_result) = + let (run_token, generation, previous_result) = match mem::replace(&mut *state, EntryState::initial()) { EntryState::NotStarted { run_token, generation, previous_result, - .. } | EntryState::Running { run_token, @@ -624,12 +567,6 @@ impl Entry { trace!("Clearing node {:?}", self.node); - if graph_still_contains_edges { - if let Some(previous_result) = previous_result.as_mut() { - previous_result.dirty(); - } - } - // Swap in a state with a new RunToken value, which invalidates any outstanding work. *state = EntryState::NotStarted { run_token: run_token.next(), @@ -648,36 +585,15 @@ impl Entry { let state = &mut *self.state.lock(); trace!("Dirtying node {:?}", self.node); match state { - &mut EntryState::Running { ref mut dirty, .. } => { + &mut EntryState::Running { ref mut dirty, .. } + | &mut EntryState::Completed { ref mut dirty, .. } => { + // Mark dirty. *dirty = true; } - &mut EntryState::Completed { ref mut result, .. } => { - result.dirty(); - } &mut EntryState::NotStarted { .. } => {} } } - pub fn may_have_dirty_edges(&self) -> bool { - match *self.state.lock() { - EntryState::NotStarted { - ref previous_result, - .. - } - | EntryState::Running { - ref previous_result, - .. - } => { - if let Some(EntryResult::Dirty(..)) = previous_result { - true - } else { - false - } - } - EntryState::Completed { ref result, .. } => result.is_dirty(), - } - } - pub(crate) fn format(&self) -> String { let state = match self.peek() { Some(Ok(ref nr)) => format!("{:?}", nr), diff --git a/src/rust/engine/graph/src/lib.rs b/src/rust/engine/graph/src/lib.rs index 5d64a16426f..356e217851c 100644 --- a/src/rust/engine/graph/src/lib.rs +++ b/src/rust/engine/graph/src/lib.rs @@ -48,7 +48,7 @@ use fnv::FnvHasher; use futures::future::{self, Future}; use indexmap::IndexSet; -use log::{info, trace, warn}; +use log::trace; use parking_lot::Mutex; use petgraph::graph::DiGraph; use petgraph::visit::EdgeRef; @@ -59,7 +59,7 @@ use boxfuture::{BoxFuture, Boxable}; type FNV = BuildHasherDefault; -type PGraph = DiGraph, f32, u32>; +type PGraph = DiGraph, (), u32>; #[derive(Debug, Eq, PartialEq)] pub struct InvalidationResult { @@ -117,39 +117,27 @@ impl InnerGraph { /// /// Detect whether adding an edge from src to dst would create a cycle. /// - /// Returns a path which would cause the cycle if an edge were added from src to dst, or None if - /// no cycle would be created. + /// Returns true if a cycle would be created by adding an edge from src->dst. /// - fn detect_cycle(&self, src_id: EntryId, dst_id: EntryId) -> Option>> { - if src_id == dst_id { - return Some(vec![self.entry_for_id(src_id).unwrap().clone()]); - } - Self::shortest_path(&self.pg, dst_id, src_id).map(|path| { - path - .into_iter() - .map(|index| self.entry_for_id(index).unwrap().clone()) - .collect() - }) - } - - /// - /// Compute and return one shortest path from `src` to `dst`. - /// - fn shortest_path(graph: &PGraph, src: EntryId, dst: EntryId) -> Option> { - let (_path_weights, paths) = petgraph::algo::bellman_ford(graph, src) - .expect("There should not be any negative edge weights"); - - let mut next = dst; - let mut path = Vec::new(); - path.push(next); - while let Some(current) = paths[next.index()] { - path.push(current); - if current == src { - return Some(path); + fn detect_cycle(&self, src_id: EntryId, dst_id: EntryId) -> bool { + // Search either forward from the dst, or backward from the src. + let (root, needle, direction) = { + let out_from_dst = self.pg.neighbors(dst_id).count(); + let in_to_src = self + .pg + .neighbors_directed(src_id, Direction::Incoming) + .count(); + if out_from_dst < in_to_src { + (dst_id, src_id, Direction::Outgoing) + } else { + (src_id, dst_id, Direction::Incoming) } - next = current; - } - None + }; + + // Search for an existing path from dst to src. + let mut roots = VecDeque::new(); + roots.push_back(root); + self.walk(roots, direction).any(|eid| eid == needle) } /// @@ -167,7 +155,7 @@ impl InnerGraph { fn clear(&mut self) { for eid in self.nodes.values() { if let Some(entry) = self.pg.node_weight_mut(*eid) { - entry.clear(true); + entry.clear(); } } } @@ -205,7 +193,7 @@ impl InnerGraph { // Clear roots and remove their outbound edges. for id in &root_ids { if let Some(entry) = self.pg.node_weight_mut(*id) { - entry.clear(false); + entry.clear(); } } self.pg.retain_edges(|pg, edge| { @@ -526,26 +514,20 @@ impl Graph { // TODO: doing cycle detection under the lock... unfortunate, but probably unavoidable // without a much more complicated algorithm. let potential_dst_id = inner.ensure_entry(EntryKey::Valid(dst_node.clone())); - match Self::detect_cycle(src_id, potential_dst_id, &mut inner) { - Ok(true) => { - // Cyclic dependency: declare a dependency on a copy of the Node that is marked Cyclic. - inner.ensure_entry(EntryKey::Cyclic(dst_node)) - } - Ok(false) => { - // Valid dependency. - trace!( - "Adding dependency from {:?} to {:?}", - inner.entry_for_id(src_id).unwrap().node(), - inner.entry_for_id(potential_dst_id).unwrap().node() - ); - potential_dst_id - } - Err(err) => return futures::future::err(err).to_boxed(), + if inner.detect_cycle(src_id, potential_dst_id) { + // Cyclic dependency: declare a dependency on a copy of the Node that is marked Cyclic. + inner.ensure_entry(EntryKey::Cyclic(dst_node)) + } else { + // Valid dependency. + trace!( + "Adding dependency from {:?} to {:?}", + inner.entry_for_id(src_id).unwrap().node(), + inner.entry_for_id(potential_dst_id).unwrap().node() + ); + potential_dst_id } }; - // All edges get a weight of 1.0 so that we can Bellman-Ford over the graph, treating each - // edge as having equal weight. - inner.pg.add_edge(src_id, dst_id, 1.0); + inner.pg.add_edge(src_id, dst_id, ()); inner .entry_for_id(dst_id) .cloned() @@ -561,60 +543,6 @@ impl Graph { } } - fn detect_cycle( - src_id: EntryId, - potential_dst_id: EntryId, - inner: &mut InnerGraph, - ) -> Result { - let mut counter = 0; - loop { - // Find one cycle if any cycles exist. - if let Some(cycle_path) = inner.detect_cycle(src_id, potential_dst_id) { - // See if the cycle contains any dirty nodes. If there are dirty nodes, we can try clearing - // them, and then check if there are still any cycles in the graph. - let dirty_nodes: HashSet<_> = cycle_path - .iter() - .filter(|n| n.may_have_dirty_edges()) - .map(|n| n.node().clone()) - .collect(); - if dirty_nodes.is_empty() { - // We detected a cycle with no dirty nodes - there's a cycle and there's nothing we can do - // to remove it. - info!( - "Detected cycle considering adding edge from {:?} to {:?}; existing path: {:?}", - inner.entry_for_id(src_id).unwrap(), - inner.entry_for_id(potential_dst_id).unwrap(), - cycle_path - ); - return Ok(true); - } - counter += 1; - // Obsolete edges from a dirty node may cause fake cycles to be detected if there was a - // dirty dep from A to B, and we're trying to add a dep from B to A. - // If we detect a cycle that contains dirty nodes (and so potentially obsolete edges), - // we repeatedly cycle-detect, clearing (and re-running) and dirty nodes (and their edges) - // that we encounter. - // - // We do this repeatedly, because there may be multiple paths which would cause cycles, - // which contain dirty nodes. If we've cleared 10 separate paths which contain dirty nodes, - // and are still detecting cycle-causing paths containing dirty nodes, give up. 10 is a very - // arbitrary number, which we can increase if we find real graphs in the wild which hit this - // limit. - if counter > 10 { - warn!( - "Couldn't remove cycle containing dirty nodes after {} attempts; nodes in cycle: {:?}", - counter, cycle_path - ); - return Err(N::Error::cyclic()); - } - // Clear the dirty nodes, removing the edges from them, and try again. - inner.invalidate_from_roots(|node| dirty_nodes.contains(node)); - } else { - return Ok(false); - } - } - } - /// /// Create the given Node if it does not already exist. /// @@ -711,24 +639,6 @@ impl Graph { /// reliably the case because Entry happens to require a &mut InnerGraph reference; it would be /// great not to violate that in the future. /// - /// TODO: We don't track which generation actually added which edges, so over time nodes will end - /// up with spurious dependencies. This is mostly sound, but may lead to over-invalidation and - /// doing more work than is necessary. - /// As an example, if generation 0 or X depends on A and B, and generation 1 of X depends on C, - /// nothing will prune the dependencies from X onto A and B, so generation 1 of X will have - /// dependencies on A, B, and C in the graph, even though running it only depends on C. - /// At some point we should address this, but we must be careful with how we do so; anything which - /// ties together the generation of a node with specifics of edges would require careful - /// consideration of locking (probably it would require merging the EntryState locks and Graph - /// locks, or working out something clever). - /// - /// It would also require careful consideration of nodes in the Running EntryState - these may - /// have previous RunToken edges and next RunToken edges which collapse into the same Generation - /// edges; when working out whether a dirty node is really clean, care must be taken to avoid - /// spurious cycles. Currently we handle this as a special case by, if we detect a cycle that - /// contains dirty nodes, clearing those nodes (removing any edges from them). This is a little - /// hacky, but will tide us over until we fully solve this problem. - /// fn complete( &self, context: &C, @@ -984,11 +894,7 @@ mod tests { ); // Request with a new context that truncates execution at the middle Node. - let context = TContext::new_with_dependencies( - 0, - vec![(TNode(1), None)].into_iter().collect(), - graph.clone(), - ); + let context = TContext::new_with_stop_at(0, TNode(1), graph.clone()); assert_eq!( graph.create(TNode(2), &context).wait(), Ok(vec![T(1, 0), T(2, 0)]) @@ -1115,56 +1021,6 @@ mod tests { ); } - #[test] - fn cyclic_failure() { - // Confirms that an attempt to create a cycle fails. - let graph = Arc::new(Graph::new()); - let top = TNode(2); - let context = TContext::new_with_dependencies( - 0, - // Request creation of a cycle by sending the bottom most node to the top. - vec![(TNode(0), Some(top))].into_iter().collect(), - graph.clone(), - ); - - assert_eq!(graph.create(TNode(2), &context).wait(), Err(TError::Cyclic)); - } - - #[test] - fn cyclic_dirtying() { - // Confirms that a dirtied path between two nodes is able to reverse direction while being - // cleaned. - let graph = Arc::new(Graph::new()); - let initial_top = TNode(2); - let initial_bot = TNode(0); - - // Request with a context that creates a path downward. - let context_down = TContext::new(0, graph.clone()); - assert_eq!( - graph.create(initial_top.clone(), &context_down).wait(), - Ok(vec![T(0, 0), T(1, 0), T(2, 0)]) - ); - - // Clear the bottom node, and then clean it with a context that causes the path to reverse. - graph.invalidate_from_roots(|n| n == &initial_bot); - let context_up = TContext::new_with_dependencies( - 1, - // Reverse the path from bottom to top. - vec![(TNode(1), None), (TNode(0), Some(TNode(1)))] - .into_iter() - .collect(), - graph.clone(), - ); - - let res = graph.create(initial_bot, &context_up).wait(); - - assert_eq!(res, Ok(vec![T(1, 1), T(0, 1)])); - - let res = graph.create(initial_top, &context_up).wait(); - - assert_eq!(res, Ok(vec![T(1, 1), T(2, 1)])); - } - /// /// A token containing the id of a Node and the id of a Context, respectively. Has a short name /// to minimize the verbosity of tests. @@ -1185,11 +1041,12 @@ mod tests { fn run(self, context: TContext) -> BoxFuture, TError> { context.ran(self.clone()); - let token = T(self.0, context.id()); - if let Some(dep) = context.dependency_of(&self) { + let depth = self.0; + let token = T(depth, context.id()); + if depth > 0 && !context.stop_at(&self) { context.maybe_delay(&self); context - .get(dep) + .get(TNode(depth - 1)) .map(move |mut v| { v.push(token); v @@ -1268,11 +1125,7 @@ mod tests { #[derive(Clone)] struct TContext { id: usize, - // A mapping from source to optional destination that drives what values each TNode depends on. - // If there is no entry in this map for a node, then TNode::run will default to requesting - // the next smallest node. Finally, if a None entry is present, a node will have no - // dependencies. - edges: Arc>>, + stop_at: Option, delays: HashMap, graph: Arc>, runs: Arc>>, @@ -1283,7 +1136,7 @@ mod tests { fn clone_for(&self, entry_id: EntryId) -> TContext { TContext { id: self.id, - edges: self.edges.clone(), + stop_at: self.stop_at.clone(), delays: self.delays.clone(), graph: self.graph.clone(), runs: self.runs.clone(), @@ -1310,7 +1163,7 @@ mod tests { fn new(id: usize, graph: Arc>) -> TContext { TContext { id, - edges: Arc::default(), + stop_at: None, delays: HashMap::default(), graph, runs: Arc::new(Mutex::new(Vec::new())), @@ -1318,14 +1171,10 @@ mod tests { } } - fn new_with_dependencies( - id: usize, - edges: HashMap>, - graph: Arc>, - ) -> TContext { + fn new_with_stop_at(id: usize, stop_at: TNode, graph: Arc>) -> TContext { TContext { id, - edges: Arc::new(edges), + stop_at: Some(stop_at), delays: HashMap::default(), graph, runs: Arc::new(Mutex::new(Vec::new())), @@ -1340,7 +1189,7 @@ mod tests { ) -> TContext { TContext { id, - edges: Arc::default(), + stop_at: None, delays, graph, runs: Arc::new(Mutex::new(Vec::new())), @@ -1367,16 +1216,8 @@ mod tests { } } - /// - /// If the given TNode should declare a dependency on another TNode, returns that dependency. - /// - fn dependency_of(&self, node: &TNode) -> Option { - match self.edges.get(node) { - Some(Some(ref dep)) => Some(dep.clone()), - Some(None) => None, - None if node.0 > 0 => Some(TNode(node.0 - 1)), - None => None, - } + fn stop_at(&self, node: &TNode) -> bool { + Some(node) == self.stop_at.as_ref() } fn runs(&self) -> Vec { diff --git a/tests/python/pants_test/pantsd/test_pantsd_integration.py b/tests/python/pants_test/pantsd/test_pantsd_integration.py index 79a739c7ba5..cadea9704a2 100644 --- a/tests/python/pants_test/pantsd/test_pantsd_integration.py +++ b/tests/python/pants_test/pantsd/test_pantsd_integration.py @@ -13,7 +13,6 @@ import time import unittest from builtins import open, range, zip -from textwrap import dedent from pants.util.contextutil import environment_as, temporary_dir, temporary_file from pants.util.dirutil import rm_rf, safe_file_dump, safe_mkdir, safe_open, touch @@ -640,42 +639,3 @@ def test_daemon_auto_shutdown_after_first_run(self): for process in pantsd_runner_processes: self.assertFalse(process.is_running()) - - # This is a regression test for a bug where we would incorrectly detect a cycle if two targets swapped their - # dependency relationship (#7404). - def test_dependencies_swap(self): - template = dedent(""" - python_library( - name = 'A', - source = 'A.py', - {a_deps} - ) - - python_library( - name = 'B', - source = 'B.py', - {b_deps} - ) - """) - with self.pantsd_successful_run_context() as (pantsd_run, checker, _, _): - with temporary_dir('.') as directory: - safe_file_dump(os.path.join(directory, 'A.py'), mode='w') - safe_file_dump(os.path.join(directory, 'B.py'), mode='w') - - if directory.startswith('./'): - directory = directory[2:] - - def list_and_verify(): - result = pantsd_run(['list', '{}:'.format(directory)]) - checker.assert_started() - self.assert_success(result) - expected_targets = {'{}:{}'.format(directory, target) for target in ('A', 'B')} - self.assertEqual(expected_targets, set(result.stdout_data.strip().split('\n'))) - - with open(os.path.join(directory, 'BUILD'), 'w') as f: - f.write(template.format(a_deps='dependencies = [":B"],', b_deps='')) - list_and_verify() - - with open(os.path.join(directory, 'BUILD'), 'w') as f: - f.write(template.format(a_deps='', b_deps='dependencies = [":A"],')) - list_and_verify()