Skip to content

Commit

Permalink
Dirty the dependents of uncacheable nodes, and use a session id to en…
Browse files Browse the repository at this point in the history
…sure that an uncacheable node runs (no more than once) per session.
  • Loading branch information
stuhood committed Jan 27, 2020
1 parent ebac9f4 commit 550b650
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 104 deletions.
84 changes: 50 additions & 34 deletions src/rust/engine/graph/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,29 +59,38 @@ impl Generation {
/// same values as they did when this Node was last run; if so, the value can be re-used
/// (and should be marked "Clean").
///
/// If the value is Uncacheable it may only be consumed in the Session that produced it, and should
/// be recomputed in a new Session.
///
/// If the value is Clean, the consumer can simply use the value as-is.
///
#[derive(Clone, Debug)]
pub(crate) enum EntryResult<N: Node> {
Clean(Result<N::Item, N::Error>),
Dirty(Result<N::Item, N::Error>),
Uncacheable(
Result<N::Item, N::Error>,
<<N as Node>::Context as NodeContext>::SessionId,
),
}

impl<N: Node> EntryResult<N> {
fn is_dirty(&self) -> bool {
if let EntryResult::Dirty(..) = self {
true
} else {
false
fn is_clean(&self, context: &N::Context) -> bool {
match self {
EntryResult::Clean(..) => true,
EntryResult::Uncacheable(_, session_id) => context.session_id() == session_id,
EntryResult::Dirty(..) => false,
}
}

/// Iff the value is Clean, mark it Dirty.
fn dirty(&mut self) {
if let EntryResult::Clean(value) = self {
*self = EntryResult::Dirty(value.clone())
}
}

/// Iff the value is Dirty, mark it Clean.
fn clean(&mut self) {
if let EntryResult::Dirty(value) = self {
*self = EntryResult::Clean(value.clone())
Expand All @@ -94,6 +103,7 @@ impl<N: Node> AsRef<Result<N::Item, N::Error>> for EntryResult<N> {
match self {
EntryResult::Clean(v) => v,
EntryResult::Dirty(v) => v,
EntryResult::Uncacheable(v, _) => v,
}
}
}
Expand Down Expand Up @@ -293,7 +303,6 @@ impl<N: Node> Entry<N> {
} => {
let (send, recv) = oneshot::channel();
waiters.push(send);
trace!("Adding waiter on {:?}", self.node);
return recv
.map_err(|_| N::Error::invalidated())
.flatten()
Expand All @@ -303,7 +312,7 @@ impl<N: Node> Entry<N> {
ref result,
generation,
..
} if self.node.cacheable() && !result.is_dirty() => {
} if result.is_clean(context) => {
return future::result(result.as_ref().clone())
.map(move |res| (res, generation))
.to_boxed();
Expand Down Expand Up @@ -331,43 +340,37 @@ impl<N: Node> Entry<N> {
EntryState::Completed {
run_token,
generation,
mut result,
result,
dep_generations,
} => {
trace!(
"Re-starting node {:?}. It was: previous_result={:?}, cacheable={}",
"Re-starting node {:?}. It was: previous_result={:?}",
self.node,
result,
self.node.cacheable()
);
assert!(
result.is_dirty() || !self.node.cacheable(),
!result.is_clean(context),
"A clean Node should not reach this point: {:?}",
result
);
result.dirty();
// The Node has already completed but is now marked dirty. This indicates that we are the
// first caller to request it since it was marked dirty. We attempt to clean it (which will
// cause it to re-run if the dep_generations mismatch).
// Note that if the node is uncacheable, we avoid storing a previous result, which will
// transitively invalidate every node that depends on us. This works because, in practice,
// the only uncacheable nodes are Select nodes and @goal_rule Task nodes. See #6146 and #6598
// The Node has already completed but needs to re-run. If the Node is dirty, we are the
// first caller to request it since it was marked dirty. We attempt to clean it (which
// will cause it to re-run if the dep_generations mismatch).
//
// On the other hand, if the Node is uncacheable, we store the previous result as
// Uncacheable, which allows its value to be used only within the current Session.
Self::run(
context,
&self.node,
entry_id,
run_token,
generation,
if self.node.cacheable() {
if self.node.cacheable(context) {
Some(dep_generations)
} else {
None
},
if self.node.cacheable() {
Some(result)
} else {
None
},
Some(result),
)
}
EntryState::Running { .. } => {
Expand Down Expand Up @@ -401,6 +404,7 @@ impl<N: Node> Entry<N> {
result_run_token: RunToken,
dep_generations: Vec<Generation>,
result: Option<Result<N::Item, N::Error>>,
complete_as_dirty: bool,
_graph: &mut super::InnerGraph<N>,
) {
let mut state = self.state.lock();
Expand Down Expand Up @@ -464,27 +468,39 @@ impl<N: Node> Entry<N> {
} else {
// If the new result does not match the previous result, the generation increments.
let (generation, next_result) = if let Some(result) = result {
if Some(&result) == previous_result.as_ref().map(EntryResult::as_ref) {
let next_result = if !self.node.cacheable(context) {
EntryResult::Uncacheable(result, context.session_id().clone())
} else if complete_as_dirty {
EntryResult::Dirty(result)
} else {
EntryResult::Clean(result)
};
if Some(next_result.as_ref()) == previous_result.as_ref().map(EntryResult::as_ref) {
// Node was re-executed, but had the same result value.
(generation, EntryResult::Clean(result))
(generation, next_result)
} else {
(generation.next(), EntryResult::Clean(result))
(generation.next(), next_result)
}
} else {
// Node was marked clean.
// NB: The `expect` here avoids a clone and a comparison: see the method docs.
let mut result =
previous_result.expect("A Node cannot be marked clean without a previous result.");
result.clean();
if complete_as_dirty {
result.dirty();
} else {
result.clean();
}
(generation, result)
};
// Notify all waiters (ignoring any that have gone away), and then store the value.
// A waiter will go away whenever they drop the `Future` `Receiver` of the value, perhaps
// due to failure of another Future in a `join` or `join_all`, or due to a timeout at the
// root of a request.
trace!(
"Completing node {:?} with {} waiters.",
"Completing node {:?} (generation {:?}) with {} waiters.",
self.node,
generation,
waiters.len()
);
for waiter in waiters {
Expand Down Expand Up @@ -620,7 +636,7 @@ impl<N: Node> Entry<N> {
}
}

pub fn may_have_dirty_edges(&self) -> bool {
pub fn is_clean(&self, context: &N::Context) -> bool {
match *self.state.lock() {
EntryState::NotStarted {
ref previous_result,
Expand All @@ -630,13 +646,13 @@ impl<N: Node> Entry<N> {
ref previous_result,
..
} => {
if let Some(EntryResult::Dirty(..)) = previous_result {
true
if let Some(result) = previous_result {
result.is_clean(context)
} else {
false
true
}
}
EntryState::Completed { ref result, .. } => result.is_dirty(),
EntryState::Completed { ref result, .. } => result.is_clean(context),
}
}

Expand Down
23 changes: 18 additions & 5 deletions src/rust/engine/graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,9 @@ impl<N: Node> Graph<N> {
// TODO: doing cycle detection under the lock... unfortunate, but probably unavoidable
// without a much more complicated algorithm.
let potential_dst_id = inner.ensure_entry(dst_node);
if let Some(cycle_path) = Self::report_cycle(src_id, potential_dst_id, &mut inner) {
if let Some(cycle_path) =
Self::report_cycle(src_id, potential_dst_id, &mut inner, context)
{
// Cyclic dependency: render an error.
let path_strs = cycle_path
.into_iter()
Expand Down Expand Up @@ -699,6 +701,7 @@ impl<N: Node> Graph<N> {
src_id: EntryId,
potential_dst_id: EntryId,
inner: &mut InnerGraph<N>,
context: &N::Context,
) -> Option<Vec<Entry<N>>> {
let mut counter = 0;
loop {
Expand All @@ -708,7 +711,7 @@ impl<N: Node> Graph<N> {
// them, and then check if there are still any cycles in the graph.
let dirty_nodes: HashSet<_> = cycle_path
.iter()
.filter(|n| n.may_have_dirty_edges())
.filter(|n| !n.is_clean(context))
.map(|n| n.node().clone())
.collect();
if dirty_nodes.is_empty() {
Expand Down Expand Up @@ -875,19 +878,28 @@ impl<N: Node> Graph<N> {
run_token: RunToken,
result: Option<Result<N::Item, N::Error>>,
) {
let (entry, entry_id, dep_generations) = {
let (entry, complete_as_dirty, dep_generations) = {
let inner = self.inner.lock();
let mut complete_as_dirty = false;
// Get the Generations of all dependencies of the Node. We can trust that these have not changed
// since we began executing, as long as we are not currently marked dirty (see the method doc).
let dep_generations = inner
.pg
.neighbors_directed(entry_id, Direction::Outgoing)
.filter_map(|dep_id| inner.entry_for_id(dep_id))
.map(Entry::generation)
.map(|entry| {
// If a dependency is uncacheable or currently dirty, this Node should complete as dirty,
// independent of matching Generation values. This is to allow for the behaviour that an
// uncacheable Node should always have dirty dependents, transitively.
if !entry.node().cacheable(context) || !entry.is_clean(context) {
complete_as_dirty = true;
}
entry.generation()
})
.collect();
(
inner.entry_for_id(entry_id).cloned(),
entry_id,
complete_as_dirty,
dep_generations,
)
};
Expand All @@ -899,6 +911,7 @@ impl<N: Node> Graph<N> {
run_token,
dep_generations,
result,
complete_as_dirty,
&mut inner,
);
}
Expand Down
15 changes: 14 additions & 1 deletion src/rust/engine/graph/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub trait Node: Clone + Debug + Display + Eq + Hash + Send + 'static {
///
/// If the node result is cacheable, return true.
///
fn cacheable(&self) -> bool;
fn cacheable(&self, context: &Self::Context) -> bool;

/// Nodes optionally have a user-facing name (distinct from their Debug and Display
/// implementations). This user-facing name is intended to provide high-level information
Expand Down Expand Up @@ -106,13 +106,26 @@ pub trait NodeContext: Clone + Send + 'static {
///
type Node: Node;

///
/// The Session ID type for this Context. Some Node behaviours (in particular: Node::cacheable)
/// have Session-specific semantics. More than one context object might be associated with a
/// single caller "session".
///
type SessionId: Clone + Debug + Eq;

///
/// Creates a clone of this NodeContext to be used for a different Node.
///
/// To clone a Context for use for the same Node, `Clone` is used directly.
///
fn clone_for(&self, entry_id: EntryId) -> <Self::Node as Node>::Context;

///
/// Returns the SessionId for this Context, which should uniquely identify a caller's run for the
/// purposes of "once per Session" behaviour.
///
fn session_id(&self) -> &Self::SessionId;

///
/// Returns a reference to the Graph for this Context.
///
Expand Down
Loading

0 comments on commit 550b650

Please sign in to comment.