From eb7ea0eab4c0847023b3b1aa185d23a19ad4bd06 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Thu, 23 Jan 2025 20:01:02 -0700 Subject: [PATCH] Call `get_current_version` less often in bundle refresh loop In the bundle refresh loop, we can call `get_current_version` a lot less often, as 1) we can skip it for bundles that do not support versioning and 2) for those that do, we already know the version from the last time we refreshed! Since this is a local call, this isn't a huge gain. But every little bit helps. --- airflow/dag_processing/manager.py | 28 +++++++++--- tests/dag_processing/test_manager.py | 67 +++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 8 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index fd63f51a0d21a..536113b3dd405 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -440,11 +440,21 @@ def _refresh_dag_bundles(self): elapsed_time_since_refresh = ( now - (bundle_model.last_refreshed or timezone.utc_epoch()) ).total_seconds() - pre_refresh_version = bundle.get_current_version() + if bundle.supports_versioning: + # we will also check the version of the bundle to see if another DAG processor has seen + # a new version + pre_refresh_version = ( + self._bundle_versions.get(bundle.name) or bundle.get_current_version() + ) + current_version_matches_db = pre_refresh_version == bundle_model.version + else: + # With no versioning, it always "matches" + current_version_matches_db = True + previously_seen = bundle.name in self._bundle_versions if ( elapsed_time_since_refresh < bundle.refresh_interval - and bundle_model.version == pre_refresh_version + and current_version_matches_db and previously_seen ): self.log.info("Not time to refresh %s", bundle.name) @@ -458,13 +468,17 @@ def _refresh_dag_bundles(self): bundle_model.last_refreshed = now - version_after_refresh = bundle.get_current_version() if bundle.supports_versioning: # We can short-circuit the rest of this if (1) bundle was seen before by # this dag processor and (2) the version of the bundle did not change # after refreshing it + version_after_refresh = bundle.get_current_version() if previously_seen and pre_refresh_version == version_after_refresh: - self.log.debug("Bundle %s version not changed after refresh", bundle.name) + self.log.debug( + "Bundle %s version not changed after refresh: %s", + bundle.name, + version_after_refresh, + ) continue bundle_model.version = version_after_refresh @@ -472,6 +486,10 @@ def _refresh_dag_bundles(self): self.log.info( "Version changed for %s, new version: %s", bundle.name, version_after_refresh ) + else: + version_after_refresh = None + + self._bundle_versions[bundle.name] = version_after_refresh bundle_file_paths = self._find_files_in_bundle(bundle) @@ -484,8 +502,6 @@ def _refresh_dag_bundles(self): self.deactivate_deleted_dags(bundle_file_paths) self.clear_nonexistent_import_errors() - self._bundle_versions[bundle.name] = bundle.get_current_version() - def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[str]: """Refresh file paths from bundle dir.""" # Build up a list of Python files that could contain DAGs diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index a1915899203c4..3f58051d03b8e 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -62,6 +62,7 @@ from tests_common.test_utils.db import ( clear_db_assets, clear_db_callbacks, + clear_db_dag_bundles, clear_db_dags, clear_db_import_errors, clear_db_runs, @@ -93,6 +94,7 @@ def setup_method(self): clear_db_dags() clear_db_callbacks() clear_db_import_errors() + clear_db_dag_bundles() def teardown_class(self): clear_db_assets() @@ -101,6 +103,7 @@ def teardown_class(self): clear_db_dags() clear_db_callbacks() clear_db_import_errors() + clear_db_dag_bundles() def mock_processor(self) -> DagFileProcessorProcess: proc = MagicMock() @@ -527,7 +530,9 @@ def test_send_file_processing_statsd_timing( any_order=True, ) - def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path, configure_testing_dag_bundle): + def test_refresh_dags_dir_doesnt_delete_zipped_dags( + self, tmp_path, testing_dag_bundle, configure_testing_dag_bundle + ): """Test DagFileProcessorManager._refresh_dag_dir method""" dagbag = DagBag(dag_folder=tmp_path, include_examples=False) zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") @@ -790,7 +795,7 @@ def _update_bundletwo_version(): def test_bundles_versions_are_stored(self): config = [ { - "name": "mybundle", + "name": "bundleone", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {"path": "/dev/null", "refresh_interval": 0}, }, @@ -815,3 +820,61 @@ def test_bundles_versions_are_stored(self): with create_session() as session: model = session.get(DagBundleModel, "bundleone") assert model.version == "123" + + def test_non_versioned_bundle_get_version_not_called(self): + config = [ + { + "name": "bundleone", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": "/dev/null", "refresh_interval": 0}, + }, + ] + + mybundle = MagicMock() + mybundle.name = "bundleone" + mybundle.refresh_interval = 0 + mybundle.supports_versioning = False + + with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}): + DagBundlesManager().sync_bundles_to_db() + with mock.patch( + "airflow.dag_processing.bundles.manager.DagBundlesManager" + ) as mock_bundle_manager: + mock_bundle_manager.return_value._bundle_config = {"bundleone": None} + mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle] + manager = DagFileProcessorManager(max_runs=1) + manager.run() + + mybundle.get_current_version.assert_not_called() + + def test_versioned_bundle_get_version_called_once(self): + """Make sure in a normal "warm" loop, get_current_version is called just once after refresha""" + + config = [ + { + "name": "bundleone", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": "/dev/null", "refresh_interval": 0}, + }, + ] + + mybundle = MagicMock() + mybundle.name = "bundleone" + mybundle.refresh_interval = 0 + mybundle.supports_versioning = True + mybundle.get_current_version.return_value = "123" + + with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}): + DagBundlesManager().sync_bundles_to_db() + with mock.patch( + "airflow.dag_processing.bundles.manager.DagBundlesManager" + ) as mock_bundle_manager: + mock_bundle_manager.return_value._bundle_config = {"bundleone": None} + mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle] + manager = DagFileProcessorManager(max_runs=1) + manager.run() # run it once to warm up + + # now run it again so we can check we only call get_current_version once + mybundle.get_current_version.reset_mock() + manager.run() + mybundle.get_current_version.assert_called_once()