Skip to content

Commit 5de9012

Browse files
authored
Two targets can swap positions with pantsd (pantsbuild#7583) (pantsbuild#7617)
Before this PR, nothing would remove the edges of a dirty node, so if two nodes swapped positions in the graph (e.g. if a dependency between two targets inverted), a cycle would be detected. With this PR, if we detect a cycle, but detect that there may be dirty edges in play, we fully clear that node (including removing its edges), which will cause it being re-triggered from scratch. This is specifically in place to handle the cycle scenario - the dirty bit, and dependency Generations are still the primary mechanism for handling re-use of old versions. There's an ugliness here that we still don't remove obsolete edges, so if Generation 2 of a node has differing dependencies from Generation 1, the dependency from Generation 1 will still dirty Generation 2. We _may_ want to consider solving that separately as/when it becomes a significant issue, or we may want to re-work this PR to do something like that... This PR happens to cover a part of that problem, but only where it causes definitive problems (a fake cycle) rather than also where it causes performance problems. There's probably a slightly more principled solution here along the lines of: * Rather than using () as an edge weight in the graph, use the Generation of the dependee Node as an edge weight. * When doing cycle detection, compare the edge weight against the generation of the node, and ignore obsolete edges. but I would want to think about that a lot more before doing it...
1 parent 3fadb2d commit 5de9012

File tree

3 files changed

+368
-85
lines changed

3 files changed

+368
-85
lines changed

src/rust/engine/graph/src/entry.rs

+121-37
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,52 @@ impl Generation {
5252
}
5353
}
5454

55+
///
56+
/// A result from running a Node.
57+
///
58+
/// If the value is Dirty, the consumer should check whether the dependencies of the Node have the
59+
/// same values as they did when this Node was last run; if so, the value can be re-used
60+
/// (and should be marked "Clean").
61+
///
62+
/// If the value is Clean, the consumer can simply use the value as-is.
63+
///
64+
#[derive(Clone, Debug)]
65+
pub(crate) enum EntryResult<N: Node> {
66+
Clean(Result<N::Item, N::Error>),
67+
Dirty(Result<N::Item, N::Error>),
68+
}
69+
70+
impl<N: Node> EntryResult<N> {
71+
fn is_dirty(&self) -> bool {
72+
if let EntryResult::Dirty(..) = self {
73+
true
74+
} else {
75+
false
76+
}
77+
}
78+
79+
fn dirty(&mut self) {
80+
if let EntryResult::Clean(value) = self {
81+
*self = EntryResult::Dirty(value.clone())
82+
}
83+
}
84+
85+
fn clean(&mut self) {
86+
if let EntryResult::Dirty(value) = self {
87+
*self = EntryResult::Clean(value.clone())
88+
}
89+
}
90+
}
91+
92+
impl<N: Node> AsRef<Result<N::Item, N::Error>> for EntryResult<N> {
93+
fn as_ref(&self) -> &Result<N::Item, N::Error> {
94+
match self {
95+
EntryResult::Clean(v) => v,
96+
EntryResult::Dirty(v) => v,
97+
}
98+
}
99+
}
100+
55101
#[allow(clippy::type_complexity)]
56102
#[derive(Debug)]
57103
pub(crate) enum EntryState<N: Node> {
@@ -65,7 +111,7 @@ pub(crate) enum EntryState<N: Node> {
65111
NotStarted {
66112
run_token: RunToken,
67113
generation: Generation,
68-
previous_result: Option<Result<N::Item, N::Error>>,
114+
previous_result: Option<EntryResult<N>>,
69115
},
70116
// A node that is running. A running node that has been marked dirty re-runs rather than
71117
// completing.
@@ -76,7 +122,7 @@ pub(crate) enum EntryState<N: Node> {
76122
generation: Generation,
77123
start_time: Instant,
78124
waiters: Vec<oneshot::Sender<Result<(N::Item, Generation), N::Error>>>,
79-
previous_result: Option<Result<N::Item, N::Error>>,
125+
previous_result: Option<EntryResult<N>>,
80126
dirty: bool,
81127
},
82128
// A node that has completed, and then possibly been marked dirty. Because marking a node
@@ -85,9 +131,8 @@ pub(crate) enum EntryState<N: Node> {
85131
Completed {
86132
run_token: RunToken,
87133
generation: Generation,
88-
result: Result<N::Item, N::Error>,
134+
result: EntryResult<N>,
89135
dep_generations: Vec<Generation>,
90-
dirty: bool,
91136
},
92137
}
93138

@@ -158,8 +203,9 @@ impl<N: Node> Entry<N> {
158203
let state = self.state.lock();
159204
match *state {
160205
EntryState::Completed {
161-
ref result, dirty, ..
162-
} if !dirty => Some(result.clone()),
206+
result: EntryResult::Clean(ref result),
207+
..
208+
} => Some(result.clone()),
163209
_ => None,
164210
}
165211
}
@@ -175,7 +221,7 @@ impl<N: Node> Entry<N> {
175221
run_token: RunToken,
176222
generation: Generation,
177223
previous_dep_generations: Option<Vec<Generation>>,
178-
previous_result: Option<Result<N::Item, N::Error>>,
224+
previous_result: Option<EntryResult<N>>,
179225
) -> EntryState<N>
180226
where
181227
C: NodeContext<Node = N>,
@@ -243,16 +289,15 @@ impl<N: Node> Entry<N> {
243289
start_time: Instant::now(),
244290
run_token,
245291
generation,
246-
previous_result: previous_result,
292+
previous_result,
247293
dirty: false,
248294
}
249295
}
250296
&EntryKey::Cyclic(_) => EntryState::Completed {
251-
result: Err(N::Error::cyclic()),
297+
result: EntryResult::Clean(Err(N::Error::cyclic())),
252298
dep_generations: Vec::new(),
253299
run_token,
254300
generation,
255-
dirty: false,
256301
},
257302
}
258303
}
@@ -293,10 +338,9 @@ impl<N: Node> Entry<N> {
293338
&mut EntryState::Completed {
294339
ref result,
295340
generation,
296-
dirty,
297341
..
298-
} if !dirty && self.node.content().cacheable() => {
299-
return future::result(result.clone())
342+
} if self.node.content().cacheable() && !result.is_dirty() => {
343+
return future::result(result.as_ref().clone())
300344
.map(move |res| (res, generation))
301345
.to_boxed();
302346
}
@@ -323,21 +367,21 @@ impl<N: Node> Entry<N> {
323367
EntryState::Completed {
324368
run_token,
325369
generation,
326-
result,
370+
mut result,
327371
dep_generations,
328-
dirty,
329372
} => {
330-
assert!(
331-
dirty || !self.node.content().cacheable(),
332-
"A clean Node should not reach this point: {:?}",
333-
result
334-
);
335373
trace!(
336-
"Re-starting node {:?}. It was: dirty={}, cacheable={}",
374+
"Re-starting node {:?}. It was: previous_result={:?}, cacheable={}",
337375
self.node,
338-
dirty,
376+
result,
339377
self.node.content().cacheable()
340378
);
379+
assert!(
380+
result.is_dirty() || !self.node.content().cacheable(),
381+
"A clean Node should not reach this point: {:?}",
382+
result
383+
);
384+
result.dirty();
341385
// The Node has already completed but is now marked dirty. This indicates that we are the
342386
// first caller to request it since it was marked dirty. We attempt to clean it (which will
343387
// cause it to re-run if the dep_generations mismatch).
@@ -416,7 +460,7 @@ impl<N: Node> Entry<N> {
416460
waiters,
417461
run_token,
418462
generation,
419-
previous_result,
463+
mut previous_result,
420464
dirty,
421465
..
422466
} => {
@@ -428,6 +472,9 @@ impl<N: Node> Entry<N> {
428472
"Not completing node {:?} because it was invalidated before completing.",
429473
self.node
430474
);
475+
if let Some(previous_result) = previous_result.as_mut() {
476+
previous_result.dirty();
477+
}
431478
EntryState::NotStarted {
432479
run_token: run_token.next(),
433480
generation,
@@ -440,6 +487,9 @@ impl<N: Node> Entry<N> {
440487
"Not completing node {:?} because it was dirtied before completing.",
441488
self.node
442489
);
490+
if let Some(previous_result) = previous_result.as_mut() {
491+
previous_result.dirty();
492+
}
443493
Self::run(
444494
context,
445495
&self.node,
@@ -452,19 +502,19 @@ impl<N: Node> Entry<N> {
452502
} else {
453503
// If the new result does not match the previous result, the generation increments.
454504
let (generation, next_result) = if let Some(result) = result {
455-
if Some(&result) == previous_result.as_ref() {
505+
if Some(&result) == previous_result.as_ref().map(EntryResult::as_ref) {
456506
// Node was re-executed, but had the same result value.
457-
(generation, result)
507+
(generation, EntryResult::Clean(result))
458508
} else {
459-
(generation.next(), result)
509+
(generation.next(), EntryResult::Clean(result))
460510
}
461511
} else {
462512
// Node was marked clean.
463513
// NB: The `expect` here avoids a clone and a comparison: see the method docs.
464-
(
465-
generation,
466-
previous_result.expect("A Node cannot be marked clean without a previous result."),
467-
)
514+
let mut result =
515+
previous_result.expect("A Node cannot be marked clean without a previous result.");
516+
result.clean();
517+
(generation, result)
468518
};
469519
// Notify all waiters (ignoring any that have gone away), and then store the value.
470520
// A waiter will go away whenever they drop the `Future` `Receiver` of the value, perhaps
@@ -476,14 +526,13 @@ impl<N: Node> Entry<N> {
476526
waiters.len()
477527
);
478528
for waiter in waiters {
479-
let _ = waiter.send(next_result.clone().map(|res| (res, generation)));
529+
let _ = waiter.send(next_result.as_ref().clone().map(|res| (res, generation)));
480530
}
481531
EntryState::Completed {
482532
result: next_result,
483533
dep_generations,
484534
run_token,
485535
generation,
486-
dirty: false,
487536
}
488537
}
489538
}
@@ -541,15 +590,23 @@ impl<N: Node> Entry<N> {
541590
///
542591
/// Clears the state of this Node, forcing it to be recomputed.
543592
///
544-
pub(crate) fn clear(&mut self) {
593+
/// # Arguments
594+
///
595+
/// * `graph_still_contains_edges` - If the caller has guaranteed that all edges from this Node
596+
/// have been removed from the graph, they should pass false here, else true. We may want to
597+
/// remove this parameter, and force this method to remove the edges, but that would require
598+
/// acquiring the graph lock here, which we currently don't do.
599+
///
600+
pub(crate) fn clear(&mut self, graph_still_contains_edges: bool) {
545601
let mut state = self.state.lock();
546602

547-
let (run_token, generation, previous_result) =
603+
let (run_token, generation, mut previous_result) =
548604
match mem::replace(&mut *state, EntryState::initial()) {
549605
EntryState::NotStarted {
550606
run_token,
551607
generation,
552608
previous_result,
609+
..
553610
}
554611
| EntryState::Running {
555612
run_token,
@@ -567,6 +624,12 @@ impl<N: Node> Entry<N> {
567624

568625
trace!("Clearing node {:?}", self.node);
569626

627+
if graph_still_contains_edges {
628+
if let Some(previous_result) = previous_result.as_mut() {
629+
previous_result.dirty();
630+
}
631+
}
632+
570633
// Swap in a state with a new RunToken value, which invalidates any outstanding work.
571634
*state = EntryState::NotStarted {
572635
run_token: run_token.next(),
@@ -585,15 +648,36 @@ impl<N: Node> Entry<N> {
585648
let state = &mut *self.state.lock();
586649
trace!("Dirtying node {:?}", self.node);
587650
match state {
588-
&mut EntryState::Running { ref mut dirty, .. }
589-
| &mut EntryState::Completed { ref mut dirty, .. } => {
590-
// Mark dirty.
651+
&mut EntryState::Running { ref mut dirty, .. } => {
591652
*dirty = true;
592653
}
654+
&mut EntryState::Completed { ref mut result, .. } => {
655+
result.dirty();
656+
}
593657
&mut EntryState::NotStarted { .. } => {}
594658
}
595659
}
596660

661+
pub fn may_have_dirty_edges(&self) -> bool {
662+
match *self.state.lock() {
663+
EntryState::NotStarted {
664+
ref previous_result,
665+
..
666+
}
667+
| EntryState::Running {
668+
ref previous_result,
669+
..
670+
} => {
671+
if let Some(EntryResult::Dirty(..)) = previous_result {
672+
true
673+
} else {
674+
false
675+
}
676+
}
677+
EntryState::Completed { ref result, .. } => result.is_dirty(),
678+
}
679+
}
680+
597681
pub(crate) fn format(&self) -> String {
598682
let state = match self.peek() {
599683
Some(Ok(ref nr)) => format!("{:?}", nr),

0 commit comments

Comments
 (0)