Skip to content

Commit

Permalink
More test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jedcunningham committed Jan 31, 2025
1 parent 3f23db3 commit c9fb76f
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ def test_bundles_versions_are_stored(self, session):
]

mybundle = MagicMock()
mybundle.name = "mybundle"
mybundle.name = "bundleone"
mybundle.path = "/dev/null"
mybundle.refresh_interval = 0
mybundle.supports_versioning = True
Expand All @@ -873,13 +873,13 @@ def test_bundles_versions_are_stored(self, session):
with mock.patch(
"airflow.dag_processing.bundles.manager.DagBundlesManager"
) as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"mybundle": None}
mock_bundle_manager.return_value._bundle_config = {"bundleone": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
manager = DagFileProcessorManager(max_runs=1)
manager.run()

with create_session() as session:
model = session.get(DagBundleModel, "mybundle")
model = session.get(DagBundleModel, "bundleone")
assert model.version == "123"

def test_non_versioned_bundle_get_version_not_called(self):
Expand All @@ -891,22 +891,23 @@ def test_non_versioned_bundle_get_version_not_called(self):
},
]

mybundle = MagicMock()
mybundle.name = "bundleone"
mybundle.refresh_interval = 0
mybundle.supports_versioning = False
bundleone = MagicMock()
bundleone.name = "bundleone"
bundleone.refresh_interval = 0
bundleone.supports_versioning = False
bundleone.path = Path("/dev/null")

with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}):
DagBundlesManager().sync_bundles_to_db()
with mock.patch(
"airflow.dag_processing.bundles.manager.DagBundlesManager"
) as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"bundleone": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone]
manager = DagFileProcessorManager(max_runs=1)
manager.run()

mybundle.get_current_version.assert_not_called()
bundleone.get_current_version.assert_not_called()

def test_versioned_bundle_get_version_called_once(self):
"""Make sure in a normal "warm" loop, get_current_version is called just once after refresha"""
Expand All @@ -919,23 +920,24 @@ def test_versioned_bundle_get_version_called_once(self):
},
]

mybundle = MagicMock()
mybundle.name = "bundleone"
mybundle.refresh_interval = 0
mybundle.supports_versioning = True
mybundle.get_current_version.return_value = "123"
bundleone = MagicMock()
bundleone.name = "bundleone"
bundleone.refresh_interval = 0
bundleone.supports_versioning = True
bundleone.get_current_version.return_value = "123"
bundleone.path = Path("/dev/null")

with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}):
DagBundlesManager().sync_bundles_to_db()
with mock.patch(
"airflow.dag_processing.bundles.manager.DagBundlesManager"
) as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"bundleone": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone]
manager = DagFileProcessorManager(max_runs=1)
manager.run() # run it once to warm up

# now run it again so we can check we only call get_current_version once
mybundle.get_current_version.reset_mock()
bundleone.get_current_version.reset_mock()
manager.run()
mybundle.get_current_version.assert_called_once()
bundleone.get_current_version.assert_called_once()

0 comments on commit c9fb76f

Please sign in to comment.