Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose the dbt graph in the DbtToAirflowConverter class #886

Merged
merged 9 commits into from
Mar 12, 2024
6 changes: 3 additions & 3 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,14 @@ def __init__(
# To keep this logic working, if converter is given no ProfileConfig,
# we can create a default retaining this value to preserve this functionality.
# We may want to consider defaulting this value in our actual ProjceConfig class?
dbt_graph = DbtGraph(
self.dbt_graph = DbtGraph(
project=project_config,
render_config=render_config,
execution_config=execution_config,
profile_config=profile_config,
dbt_vars=dbt_vars,
)
dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)
self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)

task_args = {
**operator_args,
Expand All @@ -266,7 +266,7 @@ def __init__(
)

build_airflow_graph(
nodes=dbt_graph.filtered_nodes,
nodes=self.dbt_graph.filtered_nodes,
dag=dag or (task_group and task_group.dag),
task_group=task_group,
execution_mode=execution_config.execution_mode,
Expand Down
33 changes: 32 additions & 1 deletion tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import DbtResourceType, ExecutionMode, InvocationMode, LoadMode
from cosmos.converter import DbtToAirflowConverter, validate_arguments, validate_initial_user_config
from cosmos.dbt.graph import DbtNode
from cosmos.dbt.graph import DbtGraph, DbtNode
from cosmos.exceptions import CosmosValueError
from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping

Expand Down Expand Up @@ -468,3 +468,34 @@ def test_converter_invocation_mode_added_to_task_args(
assert kwargs["task_args"]["invocation_mode"] == invocation_mode
else:
assert "invocation_mode" not in kwargs["task_args"]


@pytest.mark.parametrize(
"execution_mode,operator_args",
[
(ExecutionMode.KUBERNETES, {}),
],
)
@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes)
@patch("cosmos.converter.DbtGraph.load")
def test_converter_contains_dbt_graph(mock_load_dbt_graph, execution_mode, operator_args):
"""
This test validates that DbtToAirflowConverter contains and exposes a DbtGraph instance
"""
project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT)
execution_config = ExecutionConfig(execution_mode=execution_mode)
render_config = RenderConfig(emit_datasets=True)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
converter = DbtToAirflowConverter(
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
operator_args=operator_args,
)
assert isinstance(converter.dbt_graph, DbtGraph)
Loading