Skip to content

Commit

Permalink
Call get_current_version less often in bundle refresh loop (apache#…
Browse files Browse the repository at this point in the history
…46261)

In the bundle refresh loop, we can call `get_current_version` a lot less
often, as 1) we can skip it for bundles that do not support versioning
and 2) for those that do, we already know the version from the last time
we refreshed!

Since this is a local call, this isn't a huge gain. But every little bit
helps.
  • Loading branch information
jedcunningham authored and ambika-garg committed Feb 13, 2025
1 parent af1d8ee commit 15fd339
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 11 deletions.
28 changes: 22 additions & 6 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,21 @@ def _refresh_dag_bundles(self):
elapsed_time_since_refresh = (
now - (bundle_model.last_refreshed or timezone.utc_epoch())
).total_seconds()
pre_refresh_version = bundle.get_current_version()
if bundle.supports_versioning:
# we will also check the version of the bundle to see if another DAG processor has seen
# a new version
pre_refresh_version = (
self._bundle_versions.get(bundle.name) or bundle.get_current_version()
)
current_version_matches_db = pre_refresh_version == bundle_model.version
else:
# With no versioning, it always "matches"
current_version_matches_db = True

previously_seen = bundle.name in self._bundle_versions
if (
elapsed_time_since_refresh < bundle.refresh_interval
and bundle_model.version == pre_refresh_version
and current_version_matches_db
and previously_seen
):
self.log.info("Not time to refresh %s", bundle.name)
Expand All @@ -471,20 +481,28 @@ def _refresh_dag_bundles(self):

bundle_model.last_refreshed = now

version_after_refresh = bundle.get_current_version()
if bundle.supports_versioning:
# We can short-circuit the rest of this if (1) bundle was seen before by
# this dag processor and (2) the version of the bundle did not change
# after refreshing it
version_after_refresh = bundle.get_current_version()
if previously_seen and pre_refresh_version == version_after_refresh:
self.log.debug("Bundle %s version not changed after refresh", bundle.name)
self.log.debug(
"Bundle %s version not changed after refresh: %s",
bundle.name,
version_after_refresh,
)
continue

bundle_model.version = version_after_refresh

self.log.info(
"Version changed for %s, new version: %s", bundle.name, version_after_refresh
)
else:
version_after_refresh = None

self._bundle_versions[bundle.name] = version_after_refresh

found_files = [
DagFileInfo(rel_path=p, bundle_name=bundle.name, bundle_path=bundle.path)
Expand All @@ -503,8 +521,6 @@ def _refresh_dag_bundles(self):
self.deactivate_deleted_dags(active_files=found_files)
self.clear_nonexistent_import_errors()

self._bundle_versions[bundle.name] = bundle.get_current_version()

def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
"""Get relative paths for dag files from bundle dir."""
# Build up a list of Python files that could contain DAGs
Expand Down
74 changes: 69 additions & 5 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from tests_common.test_utils.db import (
clear_db_assets,
clear_db_callbacks,
clear_db_dag_bundles,
clear_db_dags,
clear_db_import_errors,
clear_db_runs,
Expand Down Expand Up @@ -93,6 +94,7 @@ def setup_method(self):
clear_db_dags()
clear_db_callbacks()
clear_db_import_errors()
clear_db_dag_bundles()

def teardown_class(self):
clear_db_assets()
Expand All @@ -101,6 +103,7 @@ def teardown_class(self):
clear_db_dags()
clear_db_callbacks()
clear_db_import_errors()
clear_db_dag_bundles()

def mock_processor(self) -> DagFileProcessorProcess:
proc = MagicMock()
Expand Down Expand Up @@ -575,7 +578,9 @@ def test_send_file_processing_statsd_timing(
any_order=True,
)

def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path, configure_testing_dag_bundle):
def test_refresh_dags_dir_doesnt_delete_zipped_dags(
self, tmp_path, testing_dag_bundle, configure_testing_dag_bundle
):
"""Test DagFileProcessorManager._refresh_dag_dir method"""
dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
Expand Down Expand Up @@ -869,14 +874,14 @@ def _update_bundletwo_version():
def test_bundles_versions_are_stored(self, session):
config = [
{
"name": "mybundle",
"name": "bundleone",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 0},
},
]

mybundle = MagicMock()
mybundle.name = "mybundle"
mybundle.name = "bundleone"
mybundle.path = "/dev/null"
mybundle.refresh_interval = 0
mybundle.supports_versioning = True
Expand All @@ -885,11 +890,70 @@ def test_bundles_versions_are_stored(self, session):
with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}):
DagBundlesManager().sync_bundles_to_db()
with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"mybundle": None}
mock_bundle_manager.return_value._bundle_config = {"bundleone": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
manager = DagFileProcessorManager(max_runs=1)
manager.run()

with create_session() as session:
model = session.get(DagBundleModel, "mybundle")
model = session.get(DagBundleModel, "bundleone")
assert model.version == "123"

def test_non_versioned_bundle_get_version_not_called(self):
config = [
{
"name": "bundleone",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 0},
},
]

bundleone = MagicMock()
bundleone.name = "bundleone"
bundleone.refresh_interval = 0
bundleone.supports_versioning = False
bundleone.path = Path("/dev/null")

with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}):
DagBundlesManager().sync_bundles_to_db()
with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"bundleone": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone]
manager = DagFileProcessorManager(max_runs=1)
manager.run()

bundleone.refresh.assert_called_once()
bundleone.get_current_version.assert_not_called()

def test_versioned_bundle_get_version_called_once(self):
"""Make sure in a normal "warm" loop, get_current_version is called just once after refresha"""

config = [
{
"name": "bundleone",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 0},
},
]

bundleone = MagicMock()
bundleone.name = "bundleone"
bundleone.refresh_interval = 0
bundleone.supports_versioning = True
bundleone.get_current_version.return_value = "123"
bundleone.path = Path("/dev/null")

with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}):
DagBundlesManager().sync_bundles_to_db()
with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"bundleone": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone]
manager = DagFileProcessorManager(max_runs=1)
manager.run() # run it once to warm up

# now run it again so we can check we only call get_current_version once
bundleone.refresh.reset_mock()
bundleone.get_current_version.reset_mock()
manager.run()
bundleone.refresh.assert_called_once()
bundleone.get_current_version.assert_called_once()

0 comments on commit 15fd339

Please sign in to comment.