diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index fd63f51a0d21a..536113b3dd405 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -440,11 +440,21 @@ def _refresh_dag_bundles(self): elapsed_time_since_refresh = ( now - (bundle_model.last_refreshed or timezone.utc_epoch()) ).total_seconds() - pre_refresh_version = bundle.get_current_version() + if bundle.supports_versioning: + # we will also check the version of the bundle to see if another DAG processor has seen + # a new version + pre_refresh_version = ( + self._bundle_versions.get(bundle.name) or bundle.get_current_version() + ) + current_version_matches_db = pre_refresh_version == bundle_model.version + else: + # With no versioning, it always "matches" + current_version_matches_db = True + previously_seen = bundle.name in self._bundle_versions if ( elapsed_time_since_refresh < bundle.refresh_interval - and bundle_model.version == pre_refresh_version + and current_version_matches_db and previously_seen ): self.log.info("Not time to refresh %s", bundle.name) @@ -458,13 +468,17 @@ def _refresh_dag_bundles(self): bundle_model.last_refreshed = now - version_after_refresh = bundle.get_current_version() if bundle.supports_versioning: # We can short-circuit the rest of this if (1) bundle was seen before by # this dag processor and (2) the version of the bundle did not change # after refreshing it + version_after_refresh = bundle.get_current_version() if previously_seen and pre_refresh_version == version_after_refresh: - self.log.debug("Bundle %s version not changed after refresh", bundle.name) + self.log.debug( + "Bundle %s version not changed after refresh: %s", + bundle.name, + version_after_refresh, + ) continue bundle_model.version = version_after_refresh @@ -472,6 +486,10 @@ def _refresh_dag_bundles(self): self.log.info( "Version changed for %s, new version: %s", bundle.name, version_after_refresh ) + else: + version_after_refresh = None + + self._bundle_versions[bundle.name] = version_after_refresh bundle_file_paths = self._find_files_in_bundle(bundle) @@ -484,8 +502,6 @@ def _refresh_dag_bundles(self): self.deactivate_deleted_dags(bundle_file_paths) self.clear_nonexistent_import_errors() - self._bundle_versions[bundle.name] = bundle.get_current_version() - def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[str]: """Refresh file paths from bundle dir.""" # Build up a list of Python files that could contain DAGs diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index a1915899203c4..3f58051d03b8e 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -62,6 +62,7 @@ from tests_common.test_utils.db import ( clear_db_assets, clear_db_callbacks, + clear_db_dag_bundles, clear_db_dags, clear_db_import_errors, clear_db_runs, @@ -93,6 +94,7 @@ def setup_method(self): clear_db_dags() clear_db_callbacks() clear_db_import_errors() + clear_db_dag_bundles() def teardown_class(self): clear_db_assets() @@ -101,6 +103,7 @@ def teardown_class(self): clear_db_dags() clear_db_callbacks() clear_db_import_errors() + clear_db_dag_bundles() def mock_processor(self) -> DagFileProcessorProcess: proc = MagicMock() @@ -527,7 +530,9 @@ def test_send_file_processing_statsd_timing( any_order=True, ) - def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path, configure_testing_dag_bundle): + def test_refresh_dags_dir_doesnt_delete_zipped_dags( + self, tmp_path, testing_dag_bundle, configure_testing_dag_bundle + ): """Test DagFileProcessorManager._refresh_dag_dir method""" dagbag = DagBag(dag_folder=tmp_path, include_examples=False) zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") @@ -790,7 +795,7 @@ def _update_bundletwo_version(): def test_bundles_versions_are_stored(self): config = [ { - "name": "mybundle", + "name": "bundleone", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {"path": "/dev/null", "refresh_interval": 0}, }, @@ -815,3 +820,61 @@ def test_bundles_versions_are_stored(self): with create_session() as session: model = session.get(DagBundleModel, "bundleone") assert model.version == "123" + + def test_non_versioned_bundle_get_version_not_called(self): + config = [ + { + "name": "bundleone", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": "/dev/null", "refresh_interval": 0}, + }, + ] + + mybundle = MagicMock() + mybundle.name = "bundleone" + mybundle.refresh_interval = 0 + mybundle.supports_versioning = False + + with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}): + DagBundlesManager().sync_bundles_to_db() + with mock.patch( + "airflow.dag_processing.bundles.manager.DagBundlesManager" + ) as mock_bundle_manager: + mock_bundle_manager.return_value._bundle_config = {"bundleone": None} + mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle] + manager = DagFileProcessorManager(max_runs=1) + manager.run() + + mybundle.get_current_version.assert_not_called() + + def test_versioned_bundle_get_version_called_once(self): + """Make sure in a normal "warm" loop, get_current_version is called just once after refresha""" + + config = [ + { + "name": "bundleone", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": "/dev/null", "refresh_interval": 0}, + }, + ] + + mybundle = MagicMock() + mybundle.name = "bundleone" + mybundle.refresh_interval = 0 + mybundle.supports_versioning = True + mybundle.get_current_version.return_value = "123" + + with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}): + DagBundlesManager().sync_bundles_to_db() + with mock.patch( + "airflow.dag_processing.bundles.manager.DagBundlesManager" + ) as mock_bundle_manager: + mock_bundle_manager.return_value._bundle_config = {"bundleone": None} + mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle] + manager = DagFileProcessorManager(max_runs=1) + manager.run() # run it once to warm up + + # now run it again so we can check we only call get_current_version once + mybundle.get_current_version.reset_mock() + manager.run() + mybundle.get_current_version.assert_called_once()