From 4a5780e85642d992d09f30b0c1754e9c4d47ac9d Mon Sep 17 00:00:00 2001 From: Leah Antkiewicz Date: Thu, 28 Oct 2021 10:51:34 -0400 Subject: [PATCH] Perf improvement to graph processing --- core/dbt/graph/graph.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/dbt/graph/graph.py b/core/dbt/graph/graph.py index a2feba24f3c..e18f2983a87 100644 --- a/core/dbt/graph/graph.py +++ b/core/dbt/graph/graph.py @@ -1,6 +1,7 @@ from typing import ( Set, Iterable, Iterator, Optional, NewType ) +from itertools import product import networkx as nx # type: ignore from dbt.exceptions import InternalException @@ -77,17 +78,26 @@ def select_successors(self, selected: Set[UniqueId]) -> Set[UniqueId]: successors.update(self.graph.successors(node)) return successors - def get_subset_graph(self, selected: Iterable[UniqueId]) -> 'Graph': + def get_subset_graph(self, selected: Iterable[UniqueId]) -> "Graph": """Create and return a new graph that is a shallow copy of the graph, but with only the nodes in include_nodes. Transitive edges across removed nodes are preserved as explicit new edges. """ - new_graph = nx.algorithms.transitive_closure(self.graph) + new_graph = self.graph.copy() include_nodes = set(selected) for node in self: if node not in include_nodes: + source_nodes = [x for x, _ in new_graph.in_edges(node)] + target_nodes = [x for _, x in new_graph.out_edges(node)] + + new_edges = product(source_nodes, target_nodes) + new_edges = [ + (source, target) for source, target in new_edges if source != target + ] # removes cyclic refs + + new_graph.add_edges_from(new_edges) new_graph.remove_node(node) for node in include_nodes: @@ -96,6 +106,7 @@ def get_subset_graph(self, selected: Iterable[UniqueId]) -> 'Graph': "Couldn't find model '{}' -- does it exist or is " "it disabled?".format(node) ) + return Graph(new_graph) def subgraph(self, nodes: Iterable[UniqueId]) -> 'Graph':