From e1a836415af9b88eb703c0bbeb6a4163756e97db Mon Sep 17 00:00:00 2001 From: Micah Victoria Date: Sun, 11 Aug 2024 22:28:55 -0400 Subject: [PATCH 1/8] update GraphSelector and SelectorConfig updated GraphSelector to take into account path and tag dbt selector methods update SelectorConfig to use regex to parse selection statement to handle graph and path statements --- cosmos/dbt/selector.py | 67 ++++++++++++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 18 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 257d60721..9015b256d 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -84,6 +84,8 @@ def parse(text: str) -> GraphSelector | None: regex_match = re.search(GRAPH_SELECTOR_REGEX, text) if regex_match: precursors, node_name, descendants = regex_match.groups() + if "/" in node_name and not node_name.startswith(PATH_SELECTOR): + node_name = f"{PATH_SELECTOR}{node_name}" return GraphSelector(node_name, precursors, descendants) return None @@ -148,22 +150,43 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: :return: set of node ids that matches current graph selector """ selected_nodes: set[str] = set() + root_nodes: set[str] = set() # Index nodes by name, we can improve performance by doing this once # for multiple GraphSelectors - node_by_name = {} - for node_id, node in nodes.items(): - node_by_name[node.name] = node_id + if PATH_SELECTOR in self.node_name: + path_selection = self.node_name[len(PATH_SELECTOR):] + + for node_id, node in nodes.items(): + if path_selection in str(node.file_path): + root_nodes.add(node_id) + + elif TAG_SELECTOR in self.node_name: + tag_selection = self.node_name[len(TAG_SELECTOR):] + + for node_id, node in nodes.items(): + if tag_selection in node.tags: + root_nodes.add(node_id) + + elif CONFIG_SELECTOR in self.node_name: ... - if self.node_name in node_by_name: - root_id = node_by_name[self.node_name] else: - logger.warn(f"Selector {self.node_name} not found.") - return selected_nodes + node_by_name = {} + for node_id, node in nodes.items(): + node_by_name[node.name] = node_id + + if self.node_name in node_by_name: + root_id = node_by_name[self.node_name] + root_nodes.add(root_id) + else: + logger.warn(f"Selector {self.node_name} not found.") + return selected_nodes - selected_nodes.add(root_id) - self.select_node_precursors(nodes, root_id, selected_nodes) - self.select_node_descendants(nodes, root_id, selected_nodes) + selected_nodes.update(root_nodes) + + for root_id in root_nodes: + self.select_node_precursors(nodes, root_id, selected_nodes) + self.select_node_descendants(nodes, root_id, selected_nodes) return selected_nodes @@ -210,14 +233,22 @@ def load_from_statement(self, statement: str) -> None: items = statement.split(",") for item in items: - if item.startswith(PATH_SELECTOR): - self._parse_path_selector(item) - elif item.startswith(TAG_SELECTOR): - self._parse_tag_selector(item) - elif item.startswith(CONFIG_SELECTOR): - self._parse_config_selector(item) - else: - self._parse_unknown_selector(item) + regex_match = re.search(GRAPH_SELECTOR_REGEX, item) + if regex_match: + precursors, node_name, descendants = regex_match.groups() + + if precursors or descendants: + self._parse_unknown_selector(item) + elif node_name.startswith(PATH_SELECTOR): + self._parse_path_selector(item) + elif "/" in node_name: + self._parse_path_selector(f"{PATH_SELECTOR}{node_name}") + elif node_name.startswith(TAG_SELECTOR): + self._parse_tag_selector(item) + elif node_name.startswith(CONFIG_SELECTOR): + self._parse_config_selector(item) + else: + self._parse_unknown_selector(item) def _parse_unknown_selector(self, item: str) -> None: if item: From c9f7e423ad45fb245e4476c5792536b10fcfe0ec Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 12 Aug 2024 02:51:00 +0000 Subject: [PATCH 2/8] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/dbt/selector.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 9015b256d..46ccb5862 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -155,20 +155,21 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: # Index nodes by name, we can improve performance by doing this once # for multiple GraphSelectors if PATH_SELECTOR in self.node_name: - path_selection = self.node_name[len(PATH_SELECTOR):] + path_selection = self.node_name[len(PATH_SELECTOR) :] for node_id, node in nodes.items(): if path_selection in str(node.file_path): root_nodes.add(node_id) elif TAG_SELECTOR in self.node_name: - tag_selection = self.node_name[len(TAG_SELECTOR):] + tag_selection = self.node_name[len(TAG_SELECTOR) :] for node_id, node in nodes.items(): if tag_selection in node.tags: root_nodes.add(node_id) - elif CONFIG_SELECTOR in self.node_name: ... + elif CONFIG_SELECTOR in self.node_name: + ... else: node_by_name = {} From 80b11ed340e568742ce1fbf7ff0f63dc7ca003c4 Mon Sep 17 00:00:00 2001 From: Micah Victoria Date: Mon, 12 Aug 2024 07:01:36 -0400 Subject: [PATCH 3/8] short circuit if node_name is None --- cosmos/dbt/selector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 46ccb5862..d89849d2e 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -237,8 +237,8 @@ def load_from_statement(self, statement: str) -> None: regex_match = re.search(GRAPH_SELECTOR_REGEX, item) if regex_match: precursors, node_name, descendants = regex_match.groups() - - if precursors or descendants: + if node_name is None: ... + elif precursors or descendants: self._parse_unknown_selector(item) elif node_name.startswith(PATH_SELECTOR): self._parse_path_selector(item) From 5245eee9c732e174c1875fda7ec443b97e4b2bc1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 12 Aug 2024 11:01:50 +0000 Subject: [PATCH 4/8] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/dbt/selector.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index d89849d2e..24a87641a 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -237,7 +237,8 @@ def load_from_statement(self, statement: str) -> None: regex_match = re.search(GRAPH_SELECTOR_REGEX, item) if regex_match: precursors, node_name, descendants = regex_match.groups() - if node_name is None: ... + if node_name is None: + ... elif precursors or descendants: self._parse_unknown_selector(item) elif node_name.startswith(PATH_SELECTOR): From 614e22f41520462748d54c72cb567903b5c59e4d Mon Sep 17 00:00:00 2001 From: Micah Victoria Date: Mon, 12 Aug 2024 08:27:49 -0400 Subject: [PATCH 5/8] add tests for selecting using graph and path and tag methods --- tests/dbt/test_selector.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index ece32ac95..d1f1d9d08 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -432,3 +432,34 @@ def test_should_include_node_without_depends_on(selector_config): selector = NodeSelector({}, selector_config) selector.visited_nodes = set() selector._should_include_node(node.unique_id, node) + + +def test_select_upstream_nodes_with_select_path(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+path:gen2/models"]) + expected = [ + "model.dbt-proj.another_grandparent_node", + "model.dbt-proj.grandparent", + "model.dbt-proj.parent", + ] + assert sorted(selected.keys()) == expected + + +def test_select_downstream_nodes_with_select_path(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models+"]) + expected = [ + "model.dbt-proj.child", + "model.dbt-proj.parent", + "model.dbt-proj.sibling1", + "model.dbt-proj.sibling2", + ] + assert sorted(selected.keys()) == expected + + +def test_select_upstream_nodes_with_select_tag(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["1+tag:deprecated"]) + expected = [ + "model.dbt-proj.parent", + "model.dbt-proj.sibling1", + "model.dbt-proj.sibling2", + ] + assert sorted(selected.keys()) == expected From 40cbe79575eb40fe0d1d1817a680a2ec5b171229 Mon Sep 17 00:00:00 2001 From: Micah Victoria Date: Mon, 12 Aug 2024 13:30:41 -0400 Subject: [PATCH 6/8] reduce depth of to comply with ruff combine added tests into single parameterized test --- cosmos/dbt/selector.py | 10 +---- tests/dbt/test_selector.py | 77 ++++++++++++++++++++++++-------------- 2 files changed, 51 insertions(+), 36 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 24a87641a..750fe2872 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -156,17 +156,11 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: # for multiple GraphSelectors if PATH_SELECTOR in self.node_name: path_selection = self.node_name[len(PATH_SELECTOR) :] - - for node_id, node in nodes.items(): - if path_selection in str(node.file_path): - root_nodes.add(node_id) + root_nodes.update({node_id for node_id, node in nodes.items() if path_selection in str(node.file_path)}) elif TAG_SELECTOR in self.node_name: tag_selection = self.node_name[len(TAG_SELECTOR) :] - - for node_id, node in nodes.items(): - if tag_selection in node.tags: - root_nodes.add(node_id) + root_nodes.update({node_id for node_id, node in nodes.items() if tag_selection in node.tags}) elif CONFIG_SELECTOR in self.node_name: ... diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index d1f1d9d08..9c8bc34a4 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -434,32 +434,53 @@ def test_should_include_node_without_depends_on(selector_config): selector._should_include_node(node.unique_id, node) -def test_select_upstream_nodes_with_select_path(): - selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+path:gen2/models"]) - expected = [ - "model.dbt-proj.another_grandparent_node", - "model.dbt-proj.grandparent", - "model.dbt-proj.parent", - ] - assert sorted(selected.keys()) == expected - - -def test_select_downstream_nodes_with_select_path(): - selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models+"]) - expected = [ - "model.dbt-proj.child", - "model.dbt-proj.parent", - "model.dbt-proj.sibling1", - "model.dbt-proj.sibling2", - ] - assert sorted(selected.keys()) == expected - - -def test_select_upstream_nodes_with_select_tag(): - selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["1+tag:deprecated"]) - expected = [ - "model.dbt-proj.parent", - "model.dbt-proj.sibling1", - "model.dbt-proj.sibling2", - ] +@pytest.mark.parametrize( + "select_statement, expected", + [ + ( + ["+path:gen2/models"], + [ + "model.dbt-proj.another_grandparent_node", + "model.dbt-proj.grandparent", + "model.dbt-proj.parent", + ], + ), + ( + ["path:gen2/models+"], + [ + "model.dbt-proj.child", + "model.dbt-proj.parent", + "model.dbt-proj.sibling1", + "model.dbt-proj.sibling2", + ], + ), + ( + ["gen2/models+"], + [ + "model.dbt-proj.child", + "model.dbt-proj.parent", + "model.dbt-proj.sibling1", + "model.dbt-proj.sibling2", + ], + ), + ( + ["+gen2/models"], + [ + "model.dbt-proj.another_grandparent_node", + "model.dbt-proj.grandparent", + "model.dbt-proj.parent", + ], + ), + ( + ["1+tag:deprecated"], + [ + "model.dbt-proj.parent", + "model.dbt-proj.sibling1", + "model.dbt-proj.sibling2", + ], + ), + ], +) +def test_select_using_graph_operators(select_statement, expected): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=select_statement) assert sorted(selected.keys()) == expected From afde3d314e39d2737ef8d6214d7be522a123f94c Mon Sep 17 00:00:00 2001 From: Micah Victoria Date: Wed, 14 Aug 2024 10:10:08 -0400 Subject: [PATCH 7/8] added config selection method added tests covering config --- cosmos/dbt/selector.py | 27 ++++++++++++++++++++++++++- tests/dbt/test_selector.py | 16 ++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 750fe2872..89a55afdb 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -163,7 +163,32 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: root_nodes.update({node_id for node_id, node in nodes.items() if tag_selection in node.tags}) elif CONFIG_SELECTOR in self.node_name: - ... + config_selection_key, config_selection_value = self.node_name[len(CONFIG_SELECTOR) :].split(":") + if config_selection_key not in SUPPORTED_CONFIG: + logger.warning("Unsupported config key selector: %s", config_selection_key) + + # currently tags, materialized, and schema are the only supported config keys + # logic is separated into two conditions because the config 'tags' contains a + # list of tags, but the config 'materialized', and 'schema' contain strings + elif config_selection_key == "tags": + root_nodes.update( + { + node_id + for node_id, node in nodes.items() + if config_selection_value in node.config.get(config_selection_key, []) + } + ) + elif config_selection_key in ( + "materialized", + "schema", + ): + root_nodes.update( + { + node_id + for node_id, node in nodes.items() + if config_selection_value == node.config.get(config_selection_key, "") + } + ) else: node_by_name = {} diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index 9c8bc34a4..a41a2f13e 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -479,6 +479,22 @@ def test_should_include_node_without_depends_on(selector_config): "model.dbt-proj.sibling2", ], ), + ( + ["1+config.tags:deprecated"], + [ + "model.dbt-proj.parent", + "model.dbt-proj.sibling1", + "model.dbt-proj.sibling2", + ], + ), + ( + ["config.materialized:table+"], + [ + "model.dbt-proj.child", + "model.dbt-proj.sibling1", + "model.dbt-proj.sibling2", + ], + ), ], ) def test_select_using_graph_operators(select_statement, expected): From 67cb56ff9c4d7ab20dcd3da926a83aa83bea80b6 Mon Sep 17 00:00:00 2001 From: Micah Victoria Date: Wed, 14 Aug 2024 11:00:34 -0400 Subject: [PATCH 8/8] add test for paths without path selector update GraphSelector doc string to include selectors --- cosmos/dbt/selector.py | 4 ++++ tests/dbt/test_selector.py | 8 ++++++++ 2 files changed, 12 insertions(+) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 89a55afdb..1e7b42667 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -35,6 +35,10 @@ class GraphSelector: +model_d+ 2+model_e model_f+3 + +/path/to/model_g+ + path:/path/to/model_h+ + +tag:nightly + +config.materialized:view https://docs.getdbt.com/reference/node-selection/graph-operators """ diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index a41a2f13e..56f65dad0 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -191,6 +191,14 @@ def test_select_nodes_by_select_path(): assert selected == expected +def test_select_nodes_with_slash_but_no_path_selector(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["gen2/models"]) + expected = { + parent_node.unique_id: parent_node, + } + assert selected == expected + + def test_select_nodes_by_select_union(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["tag:has_child", "tag:nightly"]) expected = {