Skip to content

Commit

Permalink
Revert "Call get_current_version less often in bundle refresh loop (a…
Browse files Browse the repository at this point in the history
…pache#45999)" (apache#46037)

This reverts commit c600a95.
  • Loading branch information
potiuk authored Jan 25, 2025
1 parent a3294cc commit dd2252d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 87 deletions.
28 changes: 6 additions & 22 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,21 +440,11 @@ def _refresh_dag_bundles(self):
elapsed_time_since_refresh = (
now - (bundle_model.last_refreshed or timezone.utc_epoch())
).total_seconds()
if bundle.supports_versioning:
# we will also check the version of the bundle to see if another DAG processor has seen
# a new version
pre_refresh_version = (
self._bundle_versions.get(bundle.name) or bundle.get_current_version()
)
current_version_matches_db = pre_refresh_version == bundle_model.version
else:
# With no versioning, it always "matches"
current_version_matches_db = True

pre_refresh_version = bundle.get_current_version()
previously_seen = bundle.name in self._bundle_versions
if (
elapsed_time_since_refresh < bundle.refresh_interval
and current_version_matches_db
and bundle_model.version == pre_refresh_version
and previously_seen
):
self.log.info("Not time to refresh %s", bundle.name)
Expand All @@ -468,28 +458,20 @@ def _refresh_dag_bundles(self):

bundle_model.last_refreshed = now

version_after_refresh = bundle.get_current_version()
if bundle.supports_versioning:
# We can short-circuit the rest of this if (1) bundle was seen before by
# this dag processor and (2) the version of the bundle did not change
# after refreshing it
version_after_refresh = bundle.get_current_version()
if previously_seen and pre_refresh_version == version_after_refresh:
self.log.debug(
"Bundle %s version not changed after refresh: %s",
bundle.name,
version_after_refresh,
)
self.log.debug("Bundle %s version not changed after refresh", bundle.name)
continue

bundle_model.version = version_after_refresh

self.log.info(
"Version changed for %s, new version: %s", bundle.name, version_after_refresh
)
else:
version_after_refresh = None

self._bundle_versions[bundle.name] = version_after_refresh

bundle_file_paths = self._find_files_in_bundle(bundle)

Expand All @@ -502,6 +484,8 @@ def _refresh_dag_bundles(self):
self.deactivate_deleted_dags(bundle_file_paths)
self.clear_nonexistent_import_errors()

self._bundle_versions[bundle.name] = bundle.get_current_version()

def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[str]:
"""Refresh file paths from bundle dir."""
# Build up a list of Python files that could contain DAGs
Expand Down
67 changes: 2 additions & 65 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
from tests_common.test_utils.db import (
clear_db_assets,
clear_db_callbacks,
clear_db_dag_bundles,
clear_db_dags,
clear_db_import_errors,
clear_db_runs,
Expand Down Expand Up @@ -94,7 +93,6 @@ def setup_method(self):
clear_db_dags()
clear_db_callbacks()
clear_db_import_errors()
clear_db_dag_bundles()

def teardown_class(self):
clear_db_assets()
Expand All @@ -103,7 +101,6 @@ def teardown_class(self):
clear_db_dags()
clear_db_callbacks()
clear_db_import_errors()
clear_db_dag_bundles()

def mock_processor(self) -> DagFileProcessorProcess:
proc = MagicMock()
Expand Down Expand Up @@ -530,9 +527,7 @@ def test_send_file_processing_statsd_timing(
any_order=True,
)

def test_refresh_dags_dir_doesnt_delete_zipped_dags(
self, tmp_path, testing_dag_bundle, configure_testing_dag_bundle
):
def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path, configure_testing_dag_bundle):
"""Test DagFileProcessorManager._refresh_dag_dir method"""
dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
Expand Down Expand Up @@ -795,7 +790,7 @@ def _update_bundletwo_version():
def test_bundles_versions_are_stored(self):
config = [
{
"name": "bundleone",
"name": "mybundle",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 0},
},
Expand All @@ -820,61 +815,3 @@ def test_bundles_versions_are_stored(self):
with create_session() as session:
model = session.get(DagBundleModel, "bundleone")
assert model.version == "123"

def test_non_versioned_bundle_get_version_not_called(self):
config = [
{
"name": "bundleone",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 0},
},
]

mybundle = MagicMock()
mybundle.name = "bundleone"
mybundle.refresh_interval = 0
mybundle.supports_versioning = False

with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}):
DagBundlesManager().sync_bundles_to_db()
with mock.patch(
"airflow.dag_processing.bundles.manager.DagBundlesManager"
) as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"bundleone": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
manager = DagFileProcessorManager(max_runs=1)
manager.run()

mybundle.get_current_version.assert_not_called()

def test_versioned_bundle_get_version_called_once(self):
"""Make sure in a normal "warm" loop, get_current_version is called just once after refresha"""

config = [
{
"name": "bundleone",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 0},
},
]

mybundle = MagicMock()
mybundle.name = "bundleone"
mybundle.refresh_interval = 0
mybundle.supports_versioning = True
mybundle.get_current_version.return_value = "123"

with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}):
DagBundlesManager().sync_bundles_to_db()
with mock.patch(
"airflow.dag_processing.bundles.manager.DagBundlesManager"
) as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"bundleone": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
manager = DagFileProcessorManager(max_runs=1)
manager.run() # run it once to warm up

# now run it again so we can check we only call get_current_version once
mybundle.get_current_version.reset_mock()
manager.run()
mybundle.get_current_version.assert_called_once()

0 comments on commit dd2252d

Please sign in to comment.