From bf5903307229a006fc2b227b0bc3de54f91e6b42 Mon Sep 17 00:00:00 2001 From: Ulada Zakharava Date: Fri, 22 Mar 2024 13:16:27 +0000 Subject: [PATCH 1/7] Replace dill package to use cloudpickle --- .../tutorial_taskflow_api_virtualenv.py | 2 +- airflow/models/dagpickle.py | 4 +- airflow/models/taskinstance.py | 4 +- airflow/operators/python.py | 57 ++++++++++++++----- .../cncf/kubernetes/decorators/kubernetes.py | 20 +++++-- airflow/providers/docker/decorators/docker.py | 21 +++++-- .../cloud/utils/mlengine_operator_utils.py | 4 +- .../utils/mlengine_prediction_summary.py | 10 ++-- hatch_build.py | 1 + tests/decorators/test_external_python.py | 26 ++++----- tests/decorators/test_python_virtualenv.py | 22 +++---- tests/operators/test_python.py | 36 ++++++------ .../docker/decorators/test_docker.py | 8 +-- .../utils/test_mlengine_operator_utils.py | 8 +-- .../utils/test_mlengine_prediction_summary.py | 6 +- .../example_taskflow_api_docker_virtualenv.py | 2 +- ...preexisting_python_virtualenv_decorator.py | 2 +- tests/utils/test_python_virtualenv.py | 2 +- 18 files changed, 144 insertions(+), 91 deletions(-) diff --git a/airflow/example_dags/tutorial_taskflow_api_virtualenv.py b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py index 44134e445891d..98127f21176c4 100644 --- a/airflow/example_dags/tutorial_taskflow_api_virtualenv.py +++ b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py @@ -38,7 +38,7 @@ def tutorial_taskflow_api_virtualenv(): """ @task.virtualenv( - use_dill=True, + use_cloudpickle=True, system_site_packages=False, requirements=["funcsigs"], ) diff --git a/airflow/models/dagpickle.py b/airflow/models/dagpickle.py index e6f4561d8e1bf..f451050c803e7 100644 --- a/airflow/models/dagpickle.py +++ b/airflow/models/dagpickle.py @@ -19,7 +19,7 @@ from typing import TYPE_CHECKING -import dill +import cloudpickle from sqlalchemy import BigInteger, Column, Integer, PickleType from airflow.models.base import Base @@ -42,7 +42,7 @@ class DagPickle(Base): """ id = Column(Integer, primary_key=True) - pickle = Column(PickleType(pickler=dill)) + pickle = Column(PickleType(pickler=cloudpickle)) created_dttm = Column(UtcDateTime, default=timezone.utcnow) pickle_hash = Column(BigInteger) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index b4d5d5d65a1b3..79790c0e85f4a 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -34,7 +34,7 @@ from typing import TYPE_CHECKING, Any, Callable, Collection, Generator, Iterable, Mapping, Tuple from urllib.parse import quote -import dill +import cloudpickle import jinja2 import lazy_object_proxy import pendulum @@ -1353,7 +1353,7 @@ class TaskInstance(Base, LoggingMixin): queued_by_job_id = Column(Integer) pid = Column(Integer) executor = Column(String(1000)) - executor_config = Column(ExecutorConfigType(pickler=dill)) + executor_config = Column(ExecutorConfigType(pickler=cloudpickle)) updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow) rendered_map_index = Column(String(250)) diff --git a/airflow/operators/python.py b/airflow/operators/python.py index 998e47c91b53f..00b7a9535c509 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -36,12 +36,13 @@ from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Mapping, NamedTuple, Sequence, cast -import dill +import cloudpickle from airflow.compat.functools import cache from airflow.exceptions import ( AirflowConfigException, AirflowException, + AirflowProviderDeprecationWarning, AirflowSkipException, DeserializingResultError, RemovedInAirflow3Warning, @@ -394,6 +395,7 @@ def __init__( *, python_callable: Callable, use_dill: bool = False, + use_cloudpickle: bool = False, op_args: Collection[Any] | None = None, op_kwargs: Mapping[str, Any] | None = None, string_args: Iterable[str] | None = None, @@ -420,8 +422,16 @@ def __init__( **kwargs, ) self.string_args = string_args or [] - self.use_dill = use_dill - self.pickling_library = dill if self.use_dill else pickle + if use_dill: + warnings.warn( + "The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use " + "'use_cloudpickle' instead. ", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + use_cloudpickle = use_dill + self.use_cloudpickle = use_cloudpickle + self.pickling_library = cloudpickle if self.use_cloudpickle else pickle self.expect_airflow = expect_airflow self.skip_on_exit_code = ( skip_on_exit_code @@ -552,9 +562,9 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): "requirements file" as specified by pip. :param python_version: The Python version to run the virtual environment with. Note that both 2 and 2.7 are acceptable forms. - :param use_dill: Whether to use dill to serialize - the args and result (pickle is default). This allow more complex types - but requires you to include dill in your requirements. + :param use_cloudpickle: Whether to use cloudpickle to serialize + the args and result (pickle is default). This allows more complex types + but requires you to include cloudpickle in your requirements. :param system_site_packages: Whether to include system_site_packages in your virtual environment. See virtualenv documentation for more information. @@ -597,6 +607,7 @@ def __init__( requirements: None | Iterable[str] | str = None, python_version: str | None = None, use_dill: bool = False, + use_cloudpickle: bool = False, system_site_packages: bool = True, pip_install_options: list[str] | None = None, op_args: Collection[Any] | None = None, @@ -627,6 +638,14 @@ def __init__( RemovedInAirflow3Warning, stacklevel=2, ) + if use_dill: + warnings.warn( + "The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use " + "'use_cloudpickle' instead. ", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + use_cloudpickle = use_dill if not is_venv_installed(): raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.") if not requirements: @@ -647,7 +666,7 @@ def __init__( self.venv_cache_path = venv_cache_path super().__init__( python_callable=python_callable, - use_dill=use_dill, + use_cloudpickle=use_cloudpickle, op_args=op_args, op_kwargs=op_kwargs, string_args=string_args, @@ -661,8 +680,8 @@ def __init__( def _requirements_list(self) -> list[str]: """Prepare a list of requirements that need to be installed for the virtual environment.""" requirements = [str(dependency) for dependency in self.requirements] - if not self.system_site_packages and self.use_dill and "dill" not in requirements: - requirements.append("dill") + if not self.system_site_packages and self.use_cloudpickle and "cloudpickle" not in requirements: + requirements.append("cloudpickle") requirements.sort() # Ensure a hash is stable return requirements @@ -820,10 +839,11 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator): a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path (so usually start with "/" or "X:/" depending on the filesystem/os used). :param python_callable: A python function with no references to outside variables, - defined with def, which will be run in a virtual environment - :param use_dill: Whether to use dill to serialize - the args and result (pickle is default). This allow more complex types - but if dill is not preinstalled in your virtual environment, the task will fail with use_dill enabled. + defined with def, which will be run in a virtual environment. + :param use_cloudpickle: Whether to use cloudpickle to serialize + the args and result (pickle is default). This allows more complex types + but if cloudpickle is not preinstalled in your virtual environment, the task will fail + with use_cloudpickle enabled. :param op_args: A list of positional arguments to pass to python_callable. :param op_kwargs: A dict of keyword arguments to pass to python_callable. :param string_args: Strings that are present in the global var virtualenv_string_args, @@ -851,6 +871,7 @@ def __init__( python: str, python_callable: Callable, use_dill: bool = False, + use_cloudpickle: bool = False, op_args: Collection[Any] | None = None, op_kwargs: Mapping[str, Any] | None = None, string_args: Iterable[str] | None = None, @@ -863,11 +884,19 @@ def __init__( ): if not python: raise ValueError("Python Path must be defined in ExternalPythonOperator") + if use_dill: + warnings.warn( + "The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use " + "'use_cloudpickle' instead. ", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + use_cloudpickle = use_dill self.python = python self.expect_pendulum = expect_pendulum super().__init__( python_callable=python_callable, - use_dill=use_dill, + use_cloudpickle=use_cloudpickle, op_args=op_args, op_kwargs=op_kwargs, string_args=string_args, diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index 6e914a2d1f946..7dd50984d9809 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -20,14 +20,16 @@ import os import pickle import uuid +import warnings from shlex import quote from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Callable, Sequence -import dill +import cloudpickle from kubernetes.client import models as k8s from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.providers.cncf.kubernetes.python_kubernetes_script import ( write_python_script, @@ -65,8 +67,18 @@ class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator): # there are some cases we can't deepcopy the objects (e.g protobuf). shallow_copy_attrs: Sequence[str] = ("python_callable",) - def __init__(self, namespace: str = "default", use_dill: bool = False, **kwargs) -> None: - self.use_dill = use_dill + def __init__( + self, namespace: str = "default", use_dill: bool = False, use_cloudpickle: bool = False, **kwargs + ) -> None: + if use_dill: + warnings.warn( + "The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use " + "'use_cloudpickle' instead. ", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + self.use_cloudpickle = use_dill + self.use_cloudpickle = use_cloudpickle super().__init__( namespace=namespace, name=kwargs.pop("name", f"k8s_airflow_pod_{uuid.uuid4().hex}"), @@ -100,7 +112,7 @@ def _generate_cmds(self) -> list[str]: def execute(self, context: Context): with TemporaryDirectory(prefix="venv") as tmp_dir: - pickling_library = dill if self.use_dill else pickle + pickling_library = cloudpickle if self.use_cloudpickle else pickle script_filename = os.path.join(tmp_dir, "script.py") input_filename = os.path.join(tmp_dir, "script.in") diff --git a/airflow/providers/docker/decorators/docker.py b/airflow/providers/docker/decorators/docker.py index 9aafdd1d79bfb..fd482b20b2d50 100644 --- a/airflow/providers/docker/decorators/docker.py +++ b/airflow/providers/docker/decorators/docker.py @@ -19,12 +19,14 @@ import base64 import os import pickle +import warnings from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Callable, Sequence -import dill +import cloudpickle from airflow.decorators.base import DecoratedOperator, task_decorator_factory +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.docker.operators.docker import DockerOperator from airflow.utils.python_virtualenv import write_python_script @@ -53,7 +55,7 @@ class _DockerDecoratedOperator(DecoratedOperator, DockerOperator): :param python_callable: A reference to an object that is callable :param python: Python binary name to use - :param use_dill: Whether dill should be used to serialize the callable + :param use_cloudpickle: Whether cloudpickle should be used to serialize the callable :param expect_airflow: whether to expect airflow to be installed in the docker environment. if this one is specified, the script to run callable will attempt to load Airflow macros. :param op_kwargs: a dictionary of keyword arguments that will get unpacked @@ -72,6 +74,7 @@ class _DockerDecoratedOperator(DecoratedOperator, DockerOperator): def __init__( self, use_dill=False, + use_cloudpickle=False, python_command="python3", expect_airflow: bool = True, **kwargs, @@ -79,7 +82,15 @@ def __init__( command = "placeholder command" self.python_command = python_command self.expect_airflow = expect_airflow - self.use_dill = use_dill + if use_dill: + warnings.warn( + "The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use " + "'use_cloudpickle' instead. ", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + use_cloudpickle = use_dill + self.use_cloudpickle = use_cloudpickle super().__init__( command=command, retrieve_output=True, retrieve_output_path="/tmp/script.out", **kwargs ) @@ -128,8 +139,8 @@ def execute(self, context: Context): @property def pickling_library(self): - if self.use_dill: - return dill + if self.use_cloudpickle: + return cloudpickle return pickle diff --git a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py index 0c94e11c741c3..ad2b2464bc858 100644 --- a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py +++ b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py @@ -25,7 +25,7 @@ from typing import TYPE_CHECKING, Callable, Iterable, TypeVar from urllib.parse import urlsplit -import dill +import cloudpickle from airflow.exceptions import AirflowException from airflow.operators.python import PythonOperator @@ -234,7 +234,7 @@ def validate_err_and_count(summary): dag=dag, ) - metric_fn_encoded = base64.b64encode(dill.dumps(metric_fn, recurse=True)).decode() + metric_fn_encoded = base64.b64encode(cloudpickle.dumps(metric_fn)).decode() evaluate_summary = BeamRunPythonPipelineOperator( task_id=(task_prefix + "-summary"), runner=BeamRunnerType.DataflowRunner, diff --git a/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py b/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py index d0a13378b479d..a3037234c7cdb 100644 --- a/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py +++ b/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py @@ -30,7 +30,7 @@ - ``--metric_fn_encoded``: An encoded function that calculates and returns a tuple of metric(s) for a given instance (as a dictionary). It should be encoded - via ``base64.b64encode(dill.dumps(fn, recurse=True))``. + via ``base64.b64encode(cloudpickle.dumps(fn, recurse=True))``. - ``--metric_keys``: A comma-separated key(s) of the aggregated metric(s) in the summary output. The order and the size of the keys must match to the output @@ -57,7 +57,7 @@ def metric_fn(inst): squared_err = (classes-label)**2 return (log_loss, squared_err) return metric_fn - metric_fn_encoded = base64.b64encode(dill.dumps(get_metric_fn(), recurse=True)) + metric_fn_encoded = base64.b64encode(cloudpickle.dumps(get_metric_fn(), recurse=True)) DataflowCreatePythonJobOperator( task_id="summary-prediction", py_options=["-m"], @@ -116,7 +116,7 @@ def metric_fn(inst): import os import apache_beam as beam -import dill +import cloudpickle from apache_beam.coders.coders import Coder @@ -170,7 +170,7 @@ def run(argv=None): help=( "An encoded function that calculates and returns a tuple of " "metric(s) for a given instance (as a dictionary). It should be " - "encoded via base64.b64encode(dill.dumps(fn, recurse=True))." + "encoded via base64.b64encode(cloudpickle.dumps(fn))." ), ) parser.add_argument( @@ -186,7 +186,7 @@ def run(argv=None): ) known_args, pipeline_args = parser.parse_known_args(argv) - metric_fn = dill.loads(base64.b64decode(known_args.metric_fn_encoded)) + metric_fn = cloudpickle.loads(base64.b64decode(known_args.metric_fn_encoded)) if not callable(metric_fn): raise ValueError("--metric_fn_encoded must be an encoded callable.") metric_keys = known_args.metric_keys.split(",") diff --git a/hatch_build.py b/hatch_build.py index 704883929efc2..5a27115038af4 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -416,6 +416,7 @@ "blinker>=1.6.2", # Colorlog 6.x merges TTYColoredFormatter into ColoredFormatter, breaking backwards compatibility with 4.x # Update CustomTTYColoredFormatter to remove + "cloudpickle>=2.0.0", "colorlog>=4.0.2, <5.0", "configupdater>=3.1.1", # `airflow/www/extensions/init_views` imports `connexion.decorators.validation.RequestBodyValidator` diff --git a/tests/decorators/test_external_python.py b/tests/decorators/test_external_python.py index 4ce4beebd0774..7e390e5eee5c3 100644 --- a/tests/decorators/test_external_python.py +++ b/tests/decorators/test_external_python.py @@ -54,42 +54,42 @@ def venv_python(): @pytest.fixture -def venv_python_with_dill(): +def venv_python_with_cloudpickle(): with TemporaryDirectory() as d: venv.create(d, with_pip=True) python_path = Path(d) / "bin" / "python" - subprocess.call([python_path, "-m", "pip", "install", "dill"]) + subprocess.call([python_path, "-m", "pip", "install", "cloudpickle"]) yield python_path class TestExternalPythonDecorator: - def test_with_dill_works(self, dag_maker, venv_python_with_dill): - @task.external_python(python=venv_python_with_dill, use_dill=True) + def test_with_cloudpickle_works(self, dag_maker, venv_python_with_cloudpickle): + @task.external_python(python=venv_python_with_cloudpickle, use_cloudpickle=True) def f(): - """Import dill to double-check it is installed .""" - import dill # noqa: F401 + """Import cloudpickle to double-check it is installed .""" + import cloudpickle # noqa: F401 with dag_maker(): ret = f() ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - def test_with_templated_python(self, dag_maker, venv_python_with_dill): + def test_with_templated_python(self, dag_maker, venv_python_with_cloudpickle): # add template that produces empty string when rendered - templated_python_with_dill = venv_python_with_dill.as_posix() + "{{ '' }}" + templated_python_with_cloudpickle = venv_python_with_cloudpickle.as_posix() + "{{ '' }}" - @task.external_python(python=templated_python_with_dill, use_dill=True) + @task.external_python(python=templated_python_with_cloudpickle, use_cloudpickle=True) def f(): - """Import dill to double-check it is installed .""" - import dill # noqa: F401 + """Import cloudpickle to double-check it is installed .""" + import cloudpickle # noqa: F401 with dag_maker(): ret = f() ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - def test_no_dill_installed_raises_exception_when_use_dill(self, dag_maker, venv_python): - @task.external_python(python=venv_python, use_dill=True) + def test_no_cloudpickle_installed_raises_exception_when_use_cloudpickle(self, dag_maker, venv_python): + @task.external_python(python=venv_python, use_cloudpickle=True) def f(): pass diff --git a/tests/decorators/test_python_virtualenv.py b/tests/decorators/test_python_virtualenv.py index 09631aafe8d42..548b75499e630 100644 --- a/tests/decorators/test_python_virtualenv.py +++ b/tests/decorators/test_python_virtualenv.py @@ -33,11 +33,11 @@ class TestPythonVirtualenvDecorator: - def test_add_dill(self, dag_maker): - @task.virtualenv(use_dill=True, system_site_packages=False) + def test_add_cloudpickle(self, dag_maker): + @task.virtualenv(use_cloudpickle=True, system_site_packages=False) def f(): - """Ensure dill is correctly installed.""" - import dill # noqa: F401 + """Ensure cloudpickle is correctly installed.""" + import cloudpickle # noqa: F401 with dag_maker(): ret = f() @@ -57,7 +57,7 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) def test_no_system_site_packages(self, dag_maker): - @task.virtualenv(system_site_packages=False, python_version=PYTHON_VERSION, use_dill=True) + @task.virtualenv(system_site_packages=False, python_version=PYTHON_VERSION, use_cloudpickle=True) def f(): try: import funcsigs # noqa: F401 @@ -75,7 +75,7 @@ def test_system_site_packages(self, dag_maker): system_site_packages=False, requirements=["funcsigs"], python_version=PYTHON_VERSION, - use_dill=True, + use_cloudpickle=True, ) def f(): import funcsigs # noqa: F401 @@ -90,7 +90,7 @@ def test_with_requirements_pinned(self, dag_maker): system_site_packages=False, requirements=["funcsigs==0.4"], python_version=PYTHON_VERSION, - use_dill=True, + use_cloudpickle=True, ) def f(): import funcsigs @@ -111,7 +111,7 @@ def test_with_requirements_file(self, dag_maker, tmp_path): system_site_packages=False, requirements="requirements.txt", python_version=PYTHON_VERSION, - use_dill=True, + use_cloudpickle=True, ) def f(): import funcsigs @@ -132,9 +132,9 @@ def f(): def test_unpinned_requirements(self, dag_maker): @task.virtualenv( system_site_packages=False, - requirements=["funcsigs", "dill"], + requirements=["funcsigs", "cloudpickle"], python_version=PYTHON_VERSION, - use_dill=True, + use_cloudpickle=True, ) def f(): import funcsigs # noqa: F401 @@ -156,7 +156,7 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) def test_python_3(self, dag_maker): - @task.virtualenv(python_version=3, use_dill=False, requirements=["dill"]) + @task.virtualenv(python_version=3, use_cloudpickle=False, requirements=["cloudpickle"]) def f(): import sys diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index 5ad893b36d1b3..b98c532974ee1 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -923,12 +923,12 @@ def f(): with pytest.raises(AirflowException, match="requires virtualenv"): self.run_as_task(f) - def test_add_dill(self): + def test_add_cloudpickle(self): def f(): - """Ensure dill is correctly installed.""" - import dill # noqa: F401 + """Ensure cloudpickle is correctly installed.""" + import cloudpickle # noqa: F401 - self.run_as_task(f, use_dill=True, system_site_packages=False) + self.run_as_task(f, use_cloudpickle=True, system_site_packages=False) def test_no_requirements(self): """Tests that the python callable is invoked on task run.""" @@ -946,7 +946,7 @@ def f(): return True raise RuntimeError - self.run_as_task(f, system_site_packages=False, requirements=["dill"]) + self.run_as_task(f, system_site_packages=False, requirements=["cloudpickle"]) def test_system_site_packages(self): def f(): @@ -983,13 +983,13 @@ def test_unpinned_requirements(self): def f(): import funcsigs # noqa: F401 - self.run_as_task(f, requirements=["funcsigs", "dill"], system_site_packages=False) + self.run_as_task(f, requirements=["funcsigs", "cloudpickle"], system_site_packages=False) def test_range_requirements(self): def f(): import funcsigs # noqa: F401 - self.run_as_task(f, requirements=["funcsigs>1.0", "dill"], system_site_packages=False) + self.run_as_task(f, requirements=["funcsigs>1.0", "cloudpickle"], system_site_packages=False) def test_requirements_file(self): def f(): @@ -1028,7 +1028,7 @@ def f(): self.run_as_operator( f, requirements="requirements.txt", - use_dill=True, + use_cloudpickle=True, params={"environ": "templated_unit_test"}, system_site_packages=False, ) @@ -1044,13 +1044,13 @@ def f(): return raise RuntimeError - self.run_as_task(f, python_version="3", use_dill=False, requirements=["dill"]) + self.run_as_task(f, python_version="3", use_cloudpickle=False, requirements=["cloudpickle"]) - def test_without_dill(self): + def test_without_cloudpickle(self): def f(a): return a - self.run_as_task(f, system_site_packages=False, use_dill=False, op_args=[4]) + self.run_as_task(f, system_site_packages=False, use_cloudpickle=False, op_args=[4]) def test_with_index_urls(self): def f(a): @@ -1075,17 +1075,17 @@ def f(a): self.run_as_task(f, venv_cache_path=tmp_dir, op_args=[4]) # This tests might take longer than default 60 seconds as it is serializing a lot of - # context using dill (which is slow apparently). + # context using cloudpickle (which is slow apparently). @pytest.mark.execution_timeout(120) @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") @pytest.mark.skipif( - os.environ.get("PYTEST_PLAIN_ASSERTS") != "true" or PY311, - reason="assertion rewriting breaks this test because dill will try to serialize " + os.environ.get("PYTEST_PLAIN_ASSERTS") != "true", + reason="assertion rewriting breaks this test because cloudpickle will try to serialize " "AssertRewritingHook including captured stdout and we need to run " "it with `--assert=plain`pytest option and PYTEST_PLAIN_ASSERTS=true ." "Also this test is skipped on Python 3.11 because of impact of regression in Python 3.11 " "connected likely with CodeType behaviour https://github.com/python/cpython/issues/100316 " - "That likely causes that dill is not able to serialize the `conf` correctly " + "That likely causes that cloudpickle is not able to serialize the `conf` correctly " "Issue about fixing it is captured in https://github.com/apache/airflow/issues/35307", ) def test_airflow_context(self): @@ -1127,7 +1127,7 @@ def f( ): pass - self.run_as_operator(f, use_dill=True, system_site_packages=True, requirements=None) + self.run_as_operator(f, use_cloudpickle=True, system_site_packages=True, requirements=None) @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_pendulum_context(self): @@ -1162,7 +1162,7 @@ def f( ): pass - self.run_as_task(f, use_dill=True, system_site_packages=False, requirements=["pendulum"]) + self.run_as_task(f, use_cloudpickle=True, system_site_packages=False, requirements=["pendulum"]) @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_base_context(self): @@ -1190,7 +1190,7 @@ def f( ): pass - self.run_as_task(f, use_dill=True, system_site_packages=False, requirements=None) + self.run_as_task(f, use_cloudpickle=True, system_site_packages=False, requirements=None) # when venv tests are run in parallel to other test they create new processes and this might take diff --git a/tests/providers/docker/decorators/test_docker.py b/tests/providers/docker/decorators/test_docker.py index e4fbe15fc3243..33b682ab977e6 100644 --- a/tests/providers/docker/decorators/test_docker.py +++ b/tests/providers/docker/decorators/test_docker.py @@ -204,13 +204,13 @@ def f(): assert teardown_task.is_teardown assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun - @pytest.mark.parametrize("use_dill", [True, False]) - def test_deepcopy_with_python_operator(self, dag_maker, use_dill): + @pytest.mark.parametrize("use_cloudpickle", [True, False]) + def test_deepcopy_with_python_operator(self, dag_maker, use_cloudpickle): import copy from airflow.providers.docker.decorators.docker import _DockerDecoratedOperator - @task.docker(image="python:3.9-slim", auto_remove="force", use_dill=use_dill) + @task.docker(image="python:3.9-slim", auto_remove="force", use_cloudpickle=use_cloudpickle) def f(): import logging @@ -244,7 +244,7 @@ def g(): assert isinstance(clone_of_docker_operator, _DockerDecoratedOperator) assert some_task.command == clone_of_docker_operator.command assert some_task.expect_airflow == clone_of_docker_operator.expect_airflow - assert some_task.use_dill == clone_of_docker_operator.use_dill + assert some_task.use_cloudpickle == clone_of_docker_operator.use_cloudpickle assert some_task.pickling_library is clone_of_docker_operator.pickling_library def test_respect_docker_host_env(self, monkeypatch, dag_maker): diff --git a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py index 82260ddc247d3..4e1ea7989f9d4 100644 --- a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py +++ b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py @@ -21,7 +21,7 @@ from datetime import datetime from unittest import mock -import dill +import cloudpickle import pytest from airflow.exceptions import AirflowException @@ -116,7 +116,7 @@ def test_create_evaluate_ops(self, mock_beam_pipeline, mock_python): # importing apache_beam elsewhere modifies the metrics. In order to avoid metrics being modified # by apache_beam import happening after importing this test, we retrieve the metrics here rather than # at the top of the file. - METRIC_FN_ENCODED = base64.b64encode(dill.dumps(METRIC_FN, recurse=True)).decode() + METRIC_FN_ENCODED = base64.b64encode(cloudpickle.dumps(METRIC_FN)).decode() assert TASK_PREFIX_PREDICTION == evaluate_prediction.task_id assert PROJECT_ID == evaluate_prediction.project_id @@ -162,7 +162,7 @@ def test_create_evaluate_ops_model_and_version_name(self, mock_beam_pipeline, mo # importing apache_beam elsewhere modifies the metrics. In order to avoid metrics being modified # by apache_beam import happening after importing this test, we retrieve the metrics here rather than # at the top of the file. - METRIC_FN_ENCODED = base64.b64encode(dill.dumps(METRIC_FN, recurse=True)).decode() + METRIC_FN_ENCODED = base64.b64encode(cloudpickle.dumps(METRIC_FN)).decode() assert TASK_PREFIX_PREDICTION == evaluate_prediction.task_id assert PROJECT_ID == evaluate_prediction.project_id @@ -205,7 +205,7 @@ def test_create_evaluate_ops_dag(self, mock_dataflow, mock_python): # importing apache_beam elsewhere modifies the metrics. In order to avoid metrics being modified # by apache_beam import happening after importing this test, we retrieve the metrics here rather than # at the top of the file. - METRIC_FN_ENCODED = base64.b64encode(dill.dumps(METRIC_FN, recurse=True)).decode() + METRIC_FN_ENCODED = base64.b64encode(cloudpickle.dumps(METRIC_FN)).decode() assert TASK_PREFIX_PREDICTION == evaluate_prediction.task_id assert PROJECT_ID == evaluate_prediction.project_id diff --git a/tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py b/tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py index 7d28cf1e68413..acc96f3899cc0 100644 --- a/tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py +++ b/tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py @@ -21,7 +21,7 @@ import sys from unittest import mock -import dill +import cloudpickle import pytest if sys.version_info < (3, 12): @@ -75,7 +75,7 @@ def test_run_should_fail_for_invalid_encoded_fn(self): def test_run_should_fail_if_enc_fn_is_not_callable(self): non_callable_value = 1 - fn_enc = base64.b64encode(dill.dumps(non_callable_value)).decode("utf-8") + fn_enc = base64.b64encode(cloudpickle.dumps(non_callable_value)).decode("utf-8") with pytest.raises(ValueError): mlengine_prediction_summary.run( @@ -98,7 +98,7 @@ def test_run_should_not_fail_with_valid_fn(self): def metric_function(): return 1 - fn_enc = base64.b64encode(dill.dumps(metric_function)).decode("utf-8") + fn_enc = base64.b64encode(cloudpickle.dumps(metric_function)).decode("utf-8") mlengine_prediction_summary.run( [ diff --git a/tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py b/tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py index 95c502a221073..7b172af11d3f7 100644 --- a/tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py +++ b/tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py @@ -44,7 +44,7 @@ def tutorial_taskflow_api_docker_virtualenv(): # [START extract_virtualenv] @task.virtualenv( - use_dill=True, + use_cloudpickle=True, system_site_packages=False, requirements=["funcsigs"], ) diff --git a/tests/utils/test_preexisting_python_virtualenv_decorator.py b/tests/utils/test_preexisting_python_virtualenv_decorator.py index 2e97469958fd3..c15c9479f2576 100644 --- a/tests/utils/test_preexisting_python_virtualenv_decorator.py +++ b/tests/utils/test_preexisting_python_virtualenv_decorator.py @@ -22,7 +22,7 @@ class TestExternalPythonDecorator: def test_remove_task_decorator(self): - py_source = "@task.external_python(use_dill=True)\ndef f():\nimport funcsigs" + py_source = "@task.external_python(use_cloudpickle=True)\ndef f():\nimport funcsigs" res = remove_task_decorator(python_source=py_source, task_decorator_name="@task.external_python") assert res == "def f():\nimport funcsigs" diff --git a/tests/utils/test_python_virtualenv.py b/tests/utils/test_python_virtualenv.py index 38cda4854baf6..6bf03d623cff9 100644 --- a/tests/utils/test_python_virtualenv.py +++ b/tests/utils/test_python_virtualenv.py @@ -116,7 +116,7 @@ def test_should_create_virtualenv_with_extra_packages(self, mock_execute_in_subp mock_execute_in_subprocess.assert_called_with(["/VENV/bin/pip", "install", "apache-beam[gcp]"]) def test_remove_task_decorator(self): - py_source = "@task.virtualenv(use_dill=True)\ndef f():\nimport funcsigs" + py_source = "@task.virtualenv(use_cloudpickle=True)\ndef f():\nimport funcsigs" res = remove_task_decorator(python_source=py_source, task_decorator_name="@task.virtualenv") assert res == "def f():\nimport funcsigs" From f832498feb15fc829d72dcf9588d4b239ea94ee9 Mon Sep 17 00:00:00 2001 From: Ulada Zakharava Date: Fri, 29 Mar 2024 12:09:16 +0000 Subject: [PATCH 2/7] Address comments --- INSTALL | 6 +- airflow/operators/python.py | 54 +++-- .../cncf/kubernetes/decorators/kubernetes.py | 33 ++- airflow/providers/docker/decorators/docker.py | 33 ++- .../cloud/utils/mlengine_operator_utils.py | 18 +- .../utils/mlengine_prediction_summary.py | 18 +- .../12_airflow_dependencies_and_extras.rst | 6 +- docs/apache-airflow/extra-packages-ref.rst | 4 + docs/apache-airflow/howto/operator/python.rst | 6 + docs/spelling_wordlist.txt | 1 + hatch_build.py | 8 +- pyproject.toml | 6 +- tests/decorators/test_external_python.py | 76 ++++++- tests/decorators/test_python_virtualenv.py | 120 +++++++++- tests/operators/test_python.py | 205 +++++++++++++++++- .../utils/test_mlengine_operator_utils.py | 43 +++- .../utils/test_mlengine_prediction_summary.py | 37 +++- 17 files changed, 587 insertions(+), 87 deletions(-) diff --git a/INSTALL b/INSTALL index 38434d9192d40..0dc388788510e 100644 --- a/INSTALL +++ b/INSTALL @@ -262,9 +262,9 @@ Those extras are available as regular core airflow extras - they install optiona # START CORE EXTRAS HERE -aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, deprecated-api, github-enterprise, -google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, rabbitmq, s3fs, -saml, sentry, statsd, uv, virtualenv +aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, deprecated-api, dill, +github-enterprise, google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, +rabbitmq, s3fs, saml, sentry, statsd, uv, virtualenv # END CORE EXTRAS HERE diff --git a/airflow/operators/python.py b/airflow/operators/python.py index 00b7a9535c509..3e89e55457bac 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -23,7 +23,6 @@ import json import logging import os -import pickle import shutil import subprocess import sys @@ -36,13 +35,10 @@ from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Mapping, NamedTuple, Sequence, cast -import cloudpickle - from airflow.compat.functools import cache from airflow.exceptions import ( AirflowConfigException, AirflowException, - AirflowProviderDeprecationWarning, AirflowSkipException, DeserializingResultError, RemovedInAirflow3Warning, @@ -59,6 +55,16 @@ from airflow.utils.process_utils import execute_in_subprocess from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script +log = logging.getLogger(__name__) + +if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): + import cloudpickle as serialization_library +elif shutil.which("dill") or importlib.util.find_spec("dill"): + import dill as serialization_library +else: + log.warning("Neither dill and cloudpickle are installed. Please install one with: pip install [name]") + import pickle + if TYPE_CHECKING: from pendulum.datetime import DateTime @@ -422,16 +428,15 @@ def __init__( **kwargs, ) self.string_args = string_args or [] - if use_dill: - warnings.warn( - "The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use " - "'use_cloudpickle' instead. ", - AirflowProviderDeprecationWarning, - stacklevel=2, + if use_dill and use_cloudpickle: + raise AirflowException( + "Both 'use_dill' and 'use_cloudpickle' parameters are set to True. Please," + " choose only one." ) + if use_dill: use_cloudpickle = use_dill self.use_cloudpickle = use_cloudpickle - self.pickling_library = cloudpickle if self.use_cloudpickle else pickle + self.pickling_library = serialization_library if self.use_cloudpickle else pickle self.expect_airflow = expect_airflow self.skip_on_exit_code = ( skip_on_exit_code @@ -562,6 +567,9 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): "requirements file" as specified by pip. :param python_version: The Python version to run the virtual environment with. Note that both 2 and 2.7 are acceptable forms. + :param use_dill: Whether to use dill to serialize + the args and result (pickle is default). This allow more complex types + but requires you to include dill in your requirements. :param use_cloudpickle: Whether to use cloudpickle to serialize the args and result (pickle is default). This allows more complex types but requires you to include cloudpickle in your requirements. @@ -638,13 +646,12 @@ def __init__( RemovedInAirflow3Warning, stacklevel=2, ) - if use_dill: - warnings.warn( - "The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use " - "'use_cloudpickle' instead. ", - AirflowProviderDeprecationWarning, - stacklevel=2, + if use_dill and use_cloudpickle: + raise AirflowException( + "Both 'use_dill' and 'use_cloudpickle' parameters are set to True. Please, " + "choose only one." ) + if use_dill: use_cloudpickle = use_dill if not is_venv_installed(): raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.") @@ -840,6 +847,9 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator): (so usually start with "/" or "X:/" depending on the filesystem/os used). :param python_callable: A python function with no references to outside variables, defined with def, which will be run in a virtual environment. + :param use_dill: Whether to use dill to serialize + the args and result (pickle is default). This allow more complex types + but requires you to include dill in your requirements. :param use_cloudpickle: Whether to use cloudpickle to serialize the args and result (pickle is default). This allows more complex types but if cloudpickle is not preinstalled in your virtual environment, the task will fail @@ -884,13 +894,11 @@ def __init__( ): if not python: raise ValueError("Python Path must be defined in ExternalPythonOperator") - if use_dill: - warnings.warn( - "The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use " - "'use_cloudpickle' instead. ", - AirflowProviderDeprecationWarning, - stacklevel=2, + if use_dill and use_cloudpickle: + raise AirflowException( + "Both 'use_dill' and 'use_cloudpickle' parameters are set to True. Please, choose only one." ) + if use_dill: use_cloudpickle = use_dill self.python = python self.expect_pendulum = expect_pendulum diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index 7dd50984d9809..df565f11870b1 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -17,19 +17,19 @@ from __future__ import annotations import base64 +import importlib +import logging import os -import pickle +import shutil import uuid -import warnings from shlex import quote from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Callable, Sequence -import cloudpickle from kubernetes.client import models as k8s from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory -from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.exceptions import AirflowException from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.providers.cncf.kubernetes.python_kubernetes_script import ( write_python_script, @@ -38,6 +38,16 @@ if TYPE_CHECKING: from airflow.utils.context import Context +log = logging.getLogger(__name__) + +if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): + import cloudpickle as serialization_library +elif shutil.which("dill") or importlib.util.find_spec("dill"): + import dill as serialization_library +else: + log.warning("Neither dill and cloudpickle are installed. Please install one with: pip install [name]") + import pickle + _PYTHON_SCRIPT_ENV = "__PYTHON_SCRIPT" _PYTHON_INPUT_ENV = "__PYTHON_INPUT" @@ -70,14 +80,13 @@ class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator): def __init__( self, namespace: str = "default", use_dill: bool = False, use_cloudpickle: bool = False, **kwargs ) -> None: - if use_dill: - warnings.warn( - "The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use " - "'use_cloudpickle' instead. ", - AirflowProviderDeprecationWarning, - stacklevel=2, + if use_dill and use_cloudpickle: + raise AirflowException( + "Both 'use_dill' and 'use_cloudpickle' parameters are set to True. Please," + " choose only one." ) - self.use_cloudpickle = use_dill + if use_dill: + use_cloudpickle = use_dill self.use_cloudpickle = use_cloudpickle super().__init__( namespace=namespace, @@ -112,7 +121,7 @@ def _generate_cmds(self) -> list[str]: def execute(self, context: Context): with TemporaryDirectory(prefix="venv") as tmp_dir: - pickling_library = cloudpickle if self.use_cloudpickle else pickle + pickling_library = serialization_library if self.use_cloudpickle else pickle script_filename = os.path.join(tmp_dir, "script.py") input_filename = os.path.join(tmp_dir, "script.in") diff --git a/airflow/providers/docker/decorators/docker.py b/airflow/providers/docker/decorators/docker.py index fd482b20b2d50..71f6836eee693 100644 --- a/airflow/providers/docker/decorators/docker.py +++ b/airflow/providers/docker/decorators/docker.py @@ -17,19 +17,28 @@ from __future__ import annotations import base64 +import importlib +import logging import os -import pickle -import warnings +import shutil from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Callable, Sequence -import cloudpickle - from airflow.decorators.base import DecoratedOperator, task_decorator_factory -from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.exceptions import AirflowException from airflow.providers.docker.operators.docker import DockerOperator from airflow.utils.python_virtualenv import write_python_script +log = logging.getLogger(__name__) + +if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): + import cloudpickle as serialization_library +elif shutil.which("dill") or importlib.util.find_spec("dill"): + import dill as serialization_library +else: + log.warning("Neither dill and cloudpickle are installed. Please install one with: pip install [name]") + import pickle + if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator from airflow.utils.context import Context @@ -55,6 +64,7 @@ class _DockerDecoratedOperator(DecoratedOperator, DockerOperator): :param python_callable: A reference to an object that is callable :param python: Python binary name to use + :param use_dill: Whether dill should be used to serialize the callable :param use_cloudpickle: Whether cloudpickle should be used to serialize the callable :param expect_airflow: whether to expect airflow to be installed in the docker environment. if this one is specified, the script to run callable will attempt to load Airflow macros. @@ -82,13 +92,12 @@ def __init__( command = "placeholder command" self.python_command = python_command self.expect_airflow = expect_airflow - if use_dill: - warnings.warn( - "The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use " - "'use_cloudpickle' instead. ", - AirflowProviderDeprecationWarning, - stacklevel=2, + if use_dill and use_cloudpickle: + raise AirflowException( + "Both 'use_dill' and 'use_cloudpickle' parameters are set to True. Please," + " choose only one." ) + if use_dill: use_cloudpickle = use_dill self.use_cloudpickle = use_cloudpickle super().__init__( @@ -140,7 +149,7 @@ def execute(self, context: Context): @property def pickling_library(self): if self.use_cloudpickle: - return cloudpickle + return serialization_library return pickle diff --git a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py index ad2b2464bc858..7f8b4d6adc934 100644 --- a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py +++ b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py @@ -19,14 +19,15 @@ from __future__ import annotations import base64 +import importlib import json +import logging import os import re +import shutil from typing import TYPE_CHECKING, Callable, Iterable, TypeVar from urllib.parse import urlsplit -import cloudpickle - from airflow.exceptions import AirflowException from airflow.operators.python import PythonOperator from airflow.providers.apache.beam.hooks.beam import BeamRunnerType @@ -37,6 +38,16 @@ if TYPE_CHECKING: from airflow import DAG +log = logging.getLogger(__name__) + +if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): + import cloudpickle as serialization_library +elif shutil.which("dill") or importlib.util.find_spec("dill"): + import dill as serialization_library +else: + log.warning("Neither dill and cloudpickle are installed. Please install one with: pip install [name]") + import pickle as serialization_library + T = TypeVar("T", bound=Callable) @@ -233,8 +244,7 @@ def validate_err_and_count(summary): version_name=version_name, dag=dag, ) - - metric_fn_encoded = base64.b64encode(cloudpickle.dumps(metric_fn)).decode() + metric_fn_encoded = base64.b64encode(serialization_library.dumps(metric_fn)).decode() evaluate_summary = BeamRunPythonPipelineOperator( task_id=(task_prefix + "-summary"), runner=BeamRunnerType.DataflowRunner, diff --git a/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py b/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py index a3037234c7cdb..bcc2069f1e865 100644 --- a/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py +++ b/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py @@ -111,14 +111,25 @@ def metric_fn(inst): import argparse import base64 +import importlib import json import logging import os +import shutil import apache_beam as beam -import cloudpickle from apache_beam.coders.coders import Coder +log = logging.getLogger(__name__) + +if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): + import cloudpickle as serialization_library +elif shutil.which("dill") or importlib.util.find_spec("dill"): + import dill as serialization_library +else: + log.warning("Neither dill and cloudpickle are installed. Please install one with: pip install [name]") + import pickle as serialization_library + class JsonCoder(Coder): """JSON encoder/decoder.""" @@ -170,7 +181,7 @@ def run(argv=None): help=( "An encoded function that calculates and returns a tuple of " "metric(s) for a given instance (as a dictionary). It should be " - "encoded via base64.b64encode(cloudpickle.dumps(fn))." + "encoded via base64.b64encode(cloudpickle.dumps(fn)) or base64.b64encode(dill.dumps(fn))." ), ) parser.add_argument( @@ -185,8 +196,7 @@ def run(argv=None): ), ) known_args, pipeline_args = parser.parse_known_args(argv) - - metric_fn = cloudpickle.loads(base64.b64decode(known_args.metric_fn_encoded)) + metric_fn = serialization_library.loads(base64.b64decode(known_args.metric_fn_encoded)) if not callable(metric_fn): raise ValueError("--metric_fn_encoded must be an encoded callable.") metric_keys = known_args.metric_keys.split(",") diff --git a/contributing-docs/12_airflow_dependencies_and_extras.rst b/contributing-docs/12_airflow_dependencies_and_extras.rst index 91328a24abb39..4e9be81bd44ad 100644 --- a/contributing-docs/12_airflow_dependencies_and_extras.rst +++ b/contributing-docs/12_airflow_dependencies_and_extras.rst @@ -164,9 +164,9 @@ Those extras are available as regular core airflow extras - they install optiona .. START CORE EXTRAS HERE -aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, deprecated-api, github-enterprise, -google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, rabbitmq, s3fs, -saml, sentry, statsd, uv, virtualenv +aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, deprecated-api, dill, +github-enterprise, google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, +rabbitmq, s3fs, saml, sentry, statsd, uv, virtualenv .. END CORE EXTRAS HERE diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst index 0a9a73600d9cc..de10d5547abf6 100644 --- a/docs/apache-airflow/extra-packages-ref.rst +++ b/docs/apache-airflow/extra-packages-ref.rst @@ -87,6 +87,10 @@ python dependencies for the provided package. +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ | virtualenv | ``pip install 'apache-airflow[virtualenv]'`` | Running python tasks in local virtualenv | +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ +| cloudpickle | pip install apache-airflow[cloudpickle] | Cloudpickle hooks and operators | ++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ +| dill | pip install apache-airflow[dill] | Dill hooks and operators | ++-------------+-----------------------------------------+------------------------------------------------------------------------------------------------+ Providers extras diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst index eab22073de870..cd2a78185e96a 100644 --- a/docs/apache-airflow/howto/operator/python.rst +++ b/docs/apache-airflow/howto/operator/python.rst @@ -111,6 +111,12 @@ Use the :class:`~airflow.operators.python.PythonVirtualenvOperator` decorator to inside a new Python virtual environment. The ``virtualenv`` package needs to be installed in the environment that runs Airflow (as optional dependency ``pip install apache-airflow[virtualenv] --constraint ...``). +Additionally, the ``cloudpickle`` package needs to be installed as an optional dependency using command +``pip install [cloudpickle] --constraint ...``. This package is a replacement for currently used ``dill`` package, +which is currently converted to an optional dependency. Cloudpickle offers a strong advantage for its focus on +standard pickling protocol, ensuring wider compatibility and smoother data exchange, while still effectively +handling common Python objects and global variables within functions. + .. tip:: The ``@task.virtualenv`` decorator is recommended over the classic ``PythonVirtualenvOperator`` to execute Python callables inside new Python virtual environments. diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index eb7dcb7f47225..da04944231852 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -255,6 +255,7 @@ cloudant cloudbuild CloudBuildClient cloudml +cloudpickle cloudsqldatabehook CloudTasksClient Cloudwatch diff --git a/hatch_build.py b/hatch_build.py index 5a27115038af4..1a4eccd1b79bc 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -145,6 +145,12 @@ "virtualenv": [ "virtualenv", ], + "dill": [ + "dill", + ], + "cloudpickle": [ + "cloudpickle", + ], } DOC_EXTRAS: dict[str, list[str]] = { @@ -416,7 +422,6 @@ "blinker>=1.6.2", # Colorlog 6.x merges TTYColoredFormatter into ColoredFormatter, breaking backwards compatibility with 4.x # Update CustomTTYColoredFormatter to remove - "cloudpickle>=2.0.0", "colorlog>=4.0.2, <5.0", "configupdater>=3.1.1", # `airflow/www/extensions/init_views` imports `connexion.decorators.validation.RequestBodyValidator` @@ -430,7 +435,6 @@ "croniter>=2.0.2", "cryptography>=39.0.0", "deprecated>=1.2.13", - "dill>=0.2.2", "flask-caching>=1.5.0", # Flask-Session 0.6 add new arguments into the SqlAlchemySessionInterface constructor as well as # all parameters now are mandatory which make AirflowDatabaseSessionInterface incopatible with this version. diff --git a/pyproject.toml b/pyproject.toml index 9329ed6e8b830..a2a67eb6eda92 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,9 +78,9 @@ dynamic = ["version", "optional-dependencies", "dependencies"] # # START CORE EXTRAS HERE # -# aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, deprecated-api, github-enterprise, -# google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, rabbitmq, s3fs, -# saml, sentry, statsd, uv, virtualenv +# aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, deprecated-api, dill, +# github-enterprise, google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, +# rabbitmq, s3fs, saml, sentry, statsd, uv, virtualenv # # END CORE EXTRAS HERE # diff --git a/tests/decorators/test_external_python.py b/tests/decorators/test_external_python.py index 7e390e5eee5c3..41a30bbe50362 100644 --- a/tests/decorators/test_external_python.py +++ b/tests/decorators/test_external_python.py @@ -18,6 +18,7 @@ from __future__ import annotations import datetime +import logging import subprocess import venv from datetime import timedelta @@ -30,6 +31,8 @@ from airflow.decorators import setup, task, teardown from airflow.utils import timezone +log = logging.getLogger(__name__) + pytestmark = pytest.mark.db_test @@ -62,26 +65,47 @@ def venv_python_with_cloudpickle(): yield python_path +@pytest.fixture +def venv_python_with_dill(): + with TemporaryDirectory() as d: + venv.create(d, with_pip=True) + python_path = Path(d) / "bin" / "python" + subprocess.call([python_path, "-m", "pip", "install", "dill"]) + yield python_path + + class TestExternalPythonDecorator: def test_with_cloudpickle_works(self, dag_maker, venv_python_with_cloudpickle): @task.external_python(python=venv_python_with_cloudpickle, use_cloudpickle=True) def f(): """Import cloudpickle to double-check it is installed .""" - import cloudpickle # noqa: F401 + try: + import cloudpickle # noqa: F401 + except ImportError: + log.warning( + "Cloudpickle package is required to be installed." + " Please install it with: pip install [cloudpickle]" + ) with dag_maker(): ret = f() ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - def test_with_templated_python(self, dag_maker, venv_python_with_cloudpickle): + def test_with_templated_python_cloudpickle(self, dag_maker, venv_python_with_cloudpickle): # add template that produces empty string when rendered templated_python_with_cloudpickle = venv_python_with_cloudpickle.as_posix() + "{{ '' }}" @task.external_python(python=templated_python_with_cloudpickle, use_cloudpickle=True) def f(): """Import cloudpickle to double-check it is installed .""" - import cloudpickle # noqa: F401 + try: + import cloudpickle # noqa: F401 + except ImportError: + log.warning( + "Cloudpickle package is required to be installed." + " Please install it with: pip install [cloudpickle]" + ) with dag_maker(): ret = f() @@ -99,6 +123,52 @@ def f(): with pytest.raises(CalledProcessError): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + def test_with_dill_works(self, dag_maker, venv_python_with_dill): + @task.external_python(python=venv_python_with_dill, use_dill=True) + def f(): + """Import dill to double-check it is installed .""" + try: + import dill # noqa: F401 + except ImportError: + log.warning( + "Dill package is required to be installed. Please install it with: pip install [dill]" + ) + + with dag_maker(): + ret = f() + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_with_templated_python_dill(self, dag_maker, venv_python_with_dill): + # add template that produces empty string when rendered + templated_python_with_dill = venv_python_with_dill.as_posix() + "{{ '' }}" + + @task.external_python(python=templated_python_with_dill, use_dill=True) + def f(): + """Import dill to double-check it is installed .""" + try: + import dill # noqa: F401 + except ImportError: + log.warning( + "Dill package is required to be installed. Please install it with: pip install [dill]" + ) + + with dag_maker(): + ret = f() + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_no_dill_installed_raises_exception_when_use_dill(self, dag_maker, venv_python): + @task.external_python(python=venv_python, use_dill=True) + def f(): + pass + + with dag_maker(): + ret = f() + + with pytest.raises(CalledProcessError): + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + def test_exception_raises_error(self, dag_maker, venv_python): @task.external_python(python=venv_python) def f(): diff --git a/tests/decorators/test_python_virtualenv.py b/tests/decorators/test_python_virtualenv.py index 548b75499e630..bb18dcacabf17 100644 --- a/tests/decorators/test_python_virtualenv.py +++ b/tests/decorators/test_python_virtualenv.py @@ -18,6 +18,7 @@ from __future__ import annotations import datetime +import logging import sys from subprocess import CalledProcessError @@ -26,6 +27,8 @@ from airflow.decorators import setup, task, teardown from airflow.utils import timezone +log = logging.getLogger(__name__) + pytestmark = pytest.mark.db_test DEFAULT_DATE = timezone.datetime(2016, 1, 1) @@ -44,6 +47,22 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + def test_add_dill(self, dag_maker): + @task.virtualenv(use_dill=True, system_site_packages=False) + def f(): + """Ensure dill is correctly installed.""" + try: + import dill # noqa: F401 + except ImportError: + log.warning( + "Dill package is required to be installed. Please install it with: pip install [dill]" + ) + + with dag_maker(): + ret = f() + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + def test_no_requirements(self, dag_maker): """Tests that the python callable is invoked on task run.""" @@ -70,7 +89,7 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - def test_system_site_packages(self, dag_maker): + def test_system_site_packages_cloudpickle(self, dag_maker): @task.virtualenv( system_site_packages=False, requirements=["funcsigs"], @@ -85,7 +104,22 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - def test_with_requirements_pinned(self, dag_maker): + def test_system_site_packages_dill(self, dag_maker): + @task.virtualenv( + system_site_packages=False, + requirements=["funcsigs"], + python_version=PYTHON_VERSION, + use_dill=True, + ) + def f(): + import funcsigs # noqa: F401 + + with dag_maker(): + ret = f() + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_with_requirements_pinned_cloudpickle(self, dag_maker): @task.virtualenv( system_site_packages=False, requirements=["funcsigs==0.4"], @@ -103,7 +137,25 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - def test_with_requirements_file(self, dag_maker, tmp_path): + def test_with_requirements_pinned_dill(self, dag_maker): + @task.virtualenv( + system_site_packages=False, + requirements=["funcsigs==0.4"], + python_version=PYTHON_VERSION, + use_dill=True, + ) + def f(): + import funcsigs + + if funcsigs.__version__ != "0.4": + raise Exception + + with dag_maker(): + ret = f() + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_with_requirements_file_cloudpickle(self, dag_maker, tmp_path): requirements_file = tmp_path / "requirements.txt" requirements_file.write_text("funcsigs==0.4\nattrs==23.1.0") @@ -129,7 +181,33 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - def test_unpinned_requirements(self, dag_maker): + def test_with_requirements_file_dill(self, dag_maker, tmp_path): + requirements_file = tmp_path / "requirements.txt" + requirements_file.write_text("funcsigs==0.4\nattrs==23.1.0") + + @task.virtualenv( + system_site_packages=False, + requirements="requirements.txt", + python_version=PYTHON_VERSION, + use_dill=True, + ) + def f(): + import funcsigs + + if funcsigs.__version__ != "0.4": + raise Exception + + import attrs + + if attrs.__version__ != "23.1.0": + raise Exception + + with dag_maker(template_searchpath=tmp_path.as_posix()): + ret = f() + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_unpinned_requirements_cloudpickle(self, dag_maker): @task.virtualenv( system_site_packages=False, requirements=["funcsigs", "cloudpickle"], @@ -144,6 +222,21 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + def test_unpinned_requirements_dill(self, dag_maker): + @task.virtualenv( + system_site_packages=False, + requirements=["funcsigs", "dill"], + python_version=PYTHON_VERSION, + use_dill=True, + ) + def f(): + import funcsigs # noqa: F401 + + with dag_maker(): + ret = f() + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + def test_fail(self, dag_maker): @task.virtualenv() def f(): @@ -155,7 +248,7 @@ def f(): with pytest.raises(CalledProcessError): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - def test_python_3(self, dag_maker): + def test_python_3_cloudpickle(self, dag_maker): @task.virtualenv(python_version=3, use_cloudpickle=False, requirements=["cloudpickle"]) def f(): import sys @@ -172,6 +265,23 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + def test_python_3_dill(self, dag_maker): + @task.virtualenv(python_version=3, use_dill=False, requirements=["dill"]) + def f(): + import sys + + print(sys.version) + try: + {}.iteritems() + except AttributeError: + return + raise Exception + + with dag_maker(): + ret = f() + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + def test_with_args(self, dag_maker): @task.virtualenv def f(a, b, c=False, d=False): diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index b98c532974ee1..57435e8735400 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -65,6 +65,8 @@ from tests.test_utils import AIRFLOW_MAIN_FOLDER from tests.test_utils.db import clear_db_runs +log = logging.getLogger(__name__) + pytestmark = pytest.mark.db_test @@ -926,10 +928,28 @@ def f(): def test_add_cloudpickle(self): def f(): """Ensure cloudpickle is correctly installed.""" - import cloudpickle # noqa: F401 + try: + import cloudpickle # noqa: F401 + except ImportError: + log.warning( + "Cloudpickle package is required to be installed." + " Please install it with: pip install [cloudpickle]" + ) self.run_as_task(f, use_cloudpickle=True, system_site_packages=False) + def test_add_dill(self): + def f(): + """Ensure dill is correctly installed.""" + try: + import dill # noqa: F401 + except ImportError: + log.warning( + "Dill package is required to be installed. Please install it with: pip install [dill]" + ) + + self.run_as_task(f, use_dill=True, system_site_packages=False) + def test_no_requirements(self): """Tests that the python callable is invoked on task run.""" @@ -938,7 +958,7 @@ def f(): self.run_as_task(f) - def test_no_system_site_packages(self): + def test_no_system_site_packages_cloudpickle(self): def f(): try: import funcsigs # noqa: F401 @@ -948,6 +968,16 @@ def f(): self.run_as_task(f, system_site_packages=False, requirements=["cloudpickle"]) + def test_no_system_site_packages_dill(self): + def f(): + try: + import funcsigs # noqa: F401 + except ImportError: + return True + raise RuntimeError + + self.run_as_task(f, system_site_packages=False, requirements=["dill"]) + def test_system_site_packages(self): def f(): import funcsigs # noqa: F401 @@ -979,18 +1009,30 @@ def f(): self.run_as_task(f, requirements=["funcsigs==0.4"], do_not_use_caching=True) - def test_unpinned_requirements(self): + def test_unpinned_requirements_cloudpickle(self): def f(): import funcsigs # noqa: F401 self.run_as_task(f, requirements=["funcsigs", "cloudpickle"], system_site_packages=False) - def test_range_requirements(self): + def test_unpinned_requirements_dill(self): + def f(): + import funcsigs # noqa: F401 + + self.run_as_task(f, requirements=["funcsigs", "dill"], system_site_packages=False) + + def test_range_requirements_cloudpickle(self): def f(): import funcsigs # noqa: F401 self.run_as_task(f, requirements=["funcsigs>1.0", "cloudpickle"], system_site_packages=False) + def test_range_requirements_dill(self): + def f(): + import funcsigs # noqa: F401 + + self.run_as_task(f, requirements=["funcsigs>1.0", "dill"], system_site_packages=False) + def test_requirements_file(self): def f(): import funcsigs # noqa: F401 @@ -1019,7 +1061,7 @@ def f(): pip_install_options=["--no-deps"], ) - def test_templated_requirements_file(self): + def test_templated_requirements_file_cloudpickle(self): def f(): import funcsigs @@ -1033,7 +1075,21 @@ def f(): system_site_packages=False, ) - def test_python_3(self): + def test_templated_requirements_file_dill(self): + def f(): + import funcsigs + + assert funcsigs.__version__ == "1.0.2" + + self.run_as_operator( + f, + requirements="requirements.txt", + use_dill=True, + params={"environ": "templated_unit_test"}, + system_site_packages=False, + ) + + def test_python_3_cloudpickle(self): def f(): import sys @@ -1046,12 +1102,31 @@ def f(): self.run_as_task(f, python_version="3", use_cloudpickle=False, requirements=["cloudpickle"]) + def test_python_3_dill(self): + def f(): + import sys + + print(sys.version) + try: + {}.iteritems() + except AttributeError: + return + raise RuntimeError + + self.run_as_task(f, python_version="3", use_dill=False, requirements=["dill"]) + def test_without_cloudpickle(self): def f(a): return a self.run_as_task(f, system_site_packages=False, use_cloudpickle=False, op_args=[4]) + def test_without_dill(self): + def f(a): + return a + + self.run_as_task(f, system_site_packages=False, use_dill=False, op_args=[4]) + def test_with_index_urls(self): def f(a): import sys @@ -1129,6 +1204,61 @@ def f( self.run_as_operator(f, use_cloudpickle=True, system_site_packages=True, requirements=None) + # This tests might take longer than default 60 seconds as it is serializing a lot of + # context using dill (which is slow apparently). + @pytest.mark.execution_timeout(120) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") + @pytest.mark.skipif( + os.environ.get("PYTEST_PLAIN_ASSERTS") != "true", + reason="assertion rewriting breaks this test because dill will try to serialize " + "AssertRewritingHook including captured stdout and we need to run " + "it with `--assert=plain`pytest option and PYTEST_PLAIN_ASSERTS=true ." + "Also this test is skipped on Python 3.11 because of impact of regression in Python 3.11 " + "connected likely with CodeType behaviour https://github.com/python/cpython/issues/100316 " + "That likely causes that dill is not able to serialize the `conf` correctly " + "Issue about fixing it is captured in https://github.com/apache/airflow/issues/35307", + ) + def test_airflow_context_dill(self): + def f( + # basic + ds_nodash, + inlets, + next_ds, + next_ds_nodash, + outlets, + params, + prev_ds, + prev_ds_nodash, + run_id, + task_instance_key_str, + test_mode, + tomorrow_ds, + tomorrow_ds_nodash, + ts, + ts_nodash, + ts_nodash_with_tz, + yesterday_ds, + yesterday_ds_nodash, + # pendulum-specific + execution_date, + next_execution_date, + prev_execution_date, + prev_execution_date_success, + prev_start_date_success, + prev_end_date_success, + # airflow-specific + macros, + conf, + dag, + dag_run, + task, + # other + **context, + ): + pass + + self.run_as_operator(f, use_dill=True, system_site_packages=True, requirements=None) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_pendulum_context(self): def f( @@ -1164,6 +1294,41 @@ def f( self.run_as_task(f, use_cloudpickle=True, system_site_packages=False, requirements=["pendulum"]) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") + def test_pendulum_context_dill(self): + def f( + # basic + ds_nodash, + inlets, + next_ds, + next_ds_nodash, + outlets, + prev_ds, + prev_ds_nodash, + run_id, + task_instance_key_str, + test_mode, + tomorrow_ds, + tomorrow_ds_nodash, + ts, + ts_nodash, + ts_nodash_with_tz, + yesterday_ds, + yesterday_ds_nodash, + # pendulum-specific + execution_date, + next_execution_date, + prev_execution_date, + prev_execution_date_success, + prev_start_date_success, + prev_end_date_success, + # other + **context, + ): + pass + + self.run_as_task(f, use_dill=True, system_site_packages=False, requirements=["pendulum"]) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_base_context(self): def f( @@ -1192,6 +1357,34 @@ def f( self.run_as_task(f, use_cloudpickle=True, system_site_packages=False, requirements=None) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") + def test_base_context_dill(self): + def f( + # basic + ds_nodash, + inlets, + next_ds, + next_ds_nodash, + outlets, + prev_ds, + prev_ds_nodash, + run_id, + task_instance_key_str, + test_mode, + tomorrow_ds, + tomorrow_ds_nodash, + ts, + ts_nodash, + ts_nodash_with_tz, + yesterday_ds, + yesterday_ds_nodash, + # other + **context, + ): + pass + + self.run_as_task(f, use_dill=True, system_site_packages=False, requirements=None) + # when venv tests are run in parallel to other test they create new processes and this might take # quite some time in shared docker environment and get some contention even between different containers diff --git a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py index 4e1ea7989f9d4..ac3ae634aa08e 100644 --- a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py +++ b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py @@ -17,11 +17,13 @@ from __future__ import annotations import base64 +import importlib import json +import logging +import shutil from datetime import datetime from unittest import mock -import cloudpickle import pytest from airflow.exceptions import AirflowException @@ -31,6 +33,30 @@ from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.utils.mlengine_operator_utils import create_evaluate_ops +log = logging.getLogger(__name__) + +try: + import cloudpickle + import dill +except ImportError: + log.warning( + "Dill or cloudpickle packages are required to be installed." + " Please install one of them with: pip install [dill] or pip install [cloudpickle]" + ) + + +def is_cloudpickle_installed() -> bool: + """ + Check if the dill or cloudpickle package is installed via checking if it is on the + path or installed as package. + + :return: True if it is. Whichever way of checking it works, is fine. + """ + if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): + return True + return False + + TASK_PREFIX = "test-task-prefix" TASK_PREFIX_PREDICTION = TASK_PREFIX + "-prediction" TASK_PREFIX_SUMMARY = TASK_PREFIX + "-summary" @@ -116,7 +142,10 @@ def test_create_evaluate_ops(self, mock_beam_pipeline, mock_python): # importing apache_beam elsewhere modifies the metrics. In order to avoid metrics being modified # by apache_beam import happening after importing this test, we retrieve the metrics here rather than # at the top of the file. - METRIC_FN_ENCODED = base64.b64encode(cloudpickle.dumps(METRIC_FN)).decode() + if is_cloudpickle_installed(): + METRIC_FN_ENCODED = base64.b64encode(cloudpickle.dumps(METRIC_FN)).decode() + else: + METRIC_FN_ENCODED = base64.b64encode(dill.dumps(METRIC_FN)).decode() assert TASK_PREFIX_PREDICTION == evaluate_prediction.task_id assert PROJECT_ID == evaluate_prediction.project_id @@ -162,7 +191,10 @@ def test_create_evaluate_ops_model_and_version_name(self, mock_beam_pipeline, mo # importing apache_beam elsewhere modifies the metrics. In order to avoid metrics being modified # by apache_beam import happening after importing this test, we retrieve the metrics here rather than # at the top of the file. - METRIC_FN_ENCODED = base64.b64encode(cloudpickle.dumps(METRIC_FN)).decode() + if is_cloudpickle_installed(): + METRIC_FN_ENCODED = base64.b64encode(cloudpickle.dumps(METRIC_FN)).decode() + else: + METRIC_FN_ENCODED = base64.b64encode(dill.dumps(METRIC_FN)).decode() assert TASK_PREFIX_PREDICTION == evaluate_prediction.task_id assert PROJECT_ID == evaluate_prediction.project_id @@ -205,7 +237,10 @@ def test_create_evaluate_ops_dag(self, mock_dataflow, mock_python): # importing apache_beam elsewhere modifies the metrics. In order to avoid metrics being modified # by apache_beam import happening after importing this test, we retrieve the metrics here rather than # at the top of the file. - METRIC_FN_ENCODED = base64.b64encode(cloudpickle.dumps(METRIC_FN)).decode() + if is_cloudpickle_installed(): + METRIC_FN_ENCODED = base64.b64encode(cloudpickle.dumps(METRIC_FN)).decode() + else: + METRIC_FN_ENCODED = base64.b64encode(dill.dumps(METRIC_FN)).decode() assert TASK_PREFIX_PREDICTION == evaluate_prediction.task_id assert PROJECT_ID == evaluate_prediction.project_id diff --git a/tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py b/tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py index acc96f3899cc0..526cfc3c0ad20 100644 --- a/tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py +++ b/tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py @@ -18,10 +18,12 @@ import base64 import binascii +import importlib +import logging +import shutil import sys from unittest import mock -import cloudpickle import pytest if sys.version_info < (3, 12): @@ -31,6 +33,29 @@ f"package apache_beam is not available for Python 3.12. Skipping all tests in {__name__}" ) +log = logging.getLogger(__name__) + +try: + import cloudpickle + import dill +except ImportError: + log.warning( + "Dill or cloudpickle packages are required to be installed." + " Please install one of them with: pip install [dill] or pip install [cloudpickle]" + ) + + +def is_cloudpickle_installed() -> bool: + """ + Check if the dill or cloudpickle package is installed via checking if it is on the + path or installed as package. + + :return: True if it is. Whichever way of checking it works, is fine. + """ + if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): + return True + return False + class TestJsonCode: def test_encode(self): @@ -75,7 +100,10 @@ def test_run_should_fail_for_invalid_encoded_fn(self): def test_run_should_fail_if_enc_fn_is_not_callable(self): non_callable_value = 1 - fn_enc = base64.b64encode(cloudpickle.dumps(non_callable_value)).decode("utf-8") + if is_cloudpickle_installed(): + fn_enc = base64.b64encode(cloudpickle.dumps(non_callable_value)).decode("utf-8") + else: + fn_enc = base64.b64encode(dill.dumps(non_callable_value)).decode("utf-8") with pytest.raises(ValueError): mlengine_prediction_summary.run( @@ -98,7 +126,10 @@ def test_run_should_not_fail_with_valid_fn(self): def metric_function(): return 1 - fn_enc = base64.b64encode(cloudpickle.dumps(metric_function)).decode("utf-8") + if is_cloudpickle_installed(): + fn_enc = base64.b64encode(cloudpickle.dumps(metric_function)).decode("utf-8") + else: + fn_enc = base64.b64encode(dill.dumps(metric_function)).decode("utf-8") mlengine_prediction_summary.run( [ From 9c0d922fa62e4701c69e59244999f55ad019d364 Mon Sep 17 00:00:00 2001 From: Ulada Zakharava Date: Mon, 8 Apr 2024 10:56:22 +0000 Subject: [PATCH 3/7] Update hatch_build.py file to set version for dill --- hatch_build.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hatch_build.py b/hatch_build.py index 1a4eccd1b79bc..803a012588ad8 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -146,7 +146,7 @@ "virtualenv", ], "dill": [ - "dill", + "dill>=0.2.2", ], "cloudpickle": [ "cloudpickle", From be908ac8cec7694fcc8e726e6318933c0e1e4bed Mon Sep 17 00:00:00 2001 From: Maksim Moiseenkov Date: Wed, 17 Apr 2024 09:50:54 +0000 Subject: [PATCH 4/7] fix unit test indentation --- tests/operators/test_python.py | 108 ++++++++++++++++----------------- 1 file changed, 54 insertions(+), 54 deletions(-) diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index 57435e8735400..7213014e46755 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -1204,60 +1204,60 @@ def f( self.run_as_operator(f, use_cloudpickle=True, system_site_packages=True, requirements=None) - # This tests might take longer than default 60 seconds as it is serializing a lot of - # context using dill (which is slow apparently). - @pytest.mark.execution_timeout(120) - @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") - @pytest.mark.skipif( - os.environ.get("PYTEST_PLAIN_ASSERTS") != "true", - reason="assertion rewriting breaks this test because dill will try to serialize " - "AssertRewritingHook including captured stdout and we need to run " - "it with `--assert=plain`pytest option and PYTEST_PLAIN_ASSERTS=true ." - "Also this test is skipped on Python 3.11 because of impact of regression in Python 3.11 " - "connected likely with CodeType behaviour https://github.com/python/cpython/issues/100316 " - "That likely causes that dill is not able to serialize the `conf` correctly " - "Issue about fixing it is captured in https://github.com/apache/airflow/issues/35307", - ) - def test_airflow_context_dill(self): - def f( - # basic - ds_nodash, - inlets, - next_ds, - next_ds_nodash, - outlets, - params, - prev_ds, - prev_ds_nodash, - run_id, - task_instance_key_str, - test_mode, - tomorrow_ds, - tomorrow_ds_nodash, - ts, - ts_nodash, - ts_nodash_with_tz, - yesterday_ds, - yesterday_ds_nodash, - # pendulum-specific - execution_date, - next_execution_date, - prev_execution_date, - prev_execution_date_success, - prev_start_date_success, - prev_end_date_success, - # airflow-specific - macros, - conf, - dag, - dag_run, - task, - # other - **context, - ): - pass - - self.run_as_operator(f, use_dill=True, system_site_packages=True, requirements=None) + # This tests might take longer than default 60 seconds as it is serializing a lot of + # context using dill (which is slow apparently). + @pytest.mark.execution_timeout(120) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") + @pytest.mark.skipif( + os.environ.get("PYTEST_PLAIN_ASSERTS") != "true", + reason="assertion rewriting breaks this test because dill will try to serialize " + "AssertRewritingHook including captured stdout and we need to run " + "it with `--assert=plain`pytest option and PYTEST_PLAIN_ASSERTS=true ." + "Also this test is skipped on Python 3.11 because of impact of regression in Python 3.11 " + "connected likely with CodeType behaviour https://github.com/python/cpython/issues/100316 " + "That likely causes that dill is not able to serialize the `conf` correctly " + "Issue about fixing it is captured in https://github.com/apache/airflow/issues/35307", + ) + def test_airflow_context_dill(self): + def f( + # basic + ds_nodash, + inlets, + next_ds, + next_ds_nodash, + outlets, + params, + prev_ds, + prev_ds_nodash, + run_id, + task_instance_key_str, + test_mode, + tomorrow_ds, + tomorrow_ds_nodash, + ts, + ts_nodash, + ts_nodash_with_tz, + yesterday_ds, + yesterday_ds_nodash, + # pendulum-specific + execution_date, + next_execution_date, + prev_execution_date, + prev_execution_date_success, + prev_start_date_success, + prev_end_date_success, + # airflow-specific + macros, + conf, + dag, + dag_run, + task, + # other + **context, + ): + pass + + self.run_as_operator(f, use_dill=True, system_site_packages=True, requirements=None) @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_pendulum_context(self): From d1f577d30836aa7836f707b44482cdba15b45bea Mon Sep 17 00:00:00 2001 From: Maksim Moiseenkov Date: Wed, 17 Apr 2024 14:13:18 +0000 Subject: [PATCH 5/7] Undo core and providers changes --- INSTALL | 4 +- .../tutorial_taskflow_api_virtualenv.py | 2 +- airflow/models/dagpickle.py | 4 +- airflow/models/taskinstance.py | 4 +- airflow/operators/python.py | 2 +- .../cncf/kubernetes/decorators/kubernetes.py | 31 +++---------- airflow/providers/docker/decorators/docker.py | 32 +++----------- .../cloud/utils/mlengine_operator_utils.py | 18 ++------ .../utils/mlengine_prediction_summary.py | 22 +++------- .../12_airflow_dependencies_and_extras.rst | 4 +- docs/apache-airflow/extra-packages-ref.rst | 2 - docs/apache-airflow/howto/operator/python.rst | 7 ++- hatch_build.py | 8 +--- pyproject.toml | 4 +- tests/decorators/test_external_python.py | 39 ++++++++--------- tests/operators/test_python.py | 22 +++++----- .../docker/decorators/test_docker.py | 8 ++-- .../utils/test_mlengine_operator_utils.py | 43 ++----------------- .../utils/test_mlengine_prediction_summary.py | 37 ++-------------- .../example_taskflow_api_docker_virtualenv.py | 2 +- ...preexisting_python_virtualenv_decorator.py | 2 +- tests/utils/test_python_virtualenv.py | 2 +- 22 files changed, 80 insertions(+), 219 deletions(-) diff --git a/INSTALL b/INSTALL index 0dc388788510e..d237a79d040c1 100644 --- a/INSTALL +++ b/INSTALL @@ -262,8 +262,8 @@ Those extras are available as regular core airflow extras - they install optiona # START CORE EXTRAS HERE -aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, deprecated-api, dill, -github-enterprise, google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, +aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, deprecated-api, github- +enterprise, google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, rabbitmq, s3fs, saml, sentry, statsd, uv, virtualenv # END CORE EXTRAS HERE diff --git a/airflow/example_dags/tutorial_taskflow_api_virtualenv.py b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py index 98127f21176c4..44134e445891d 100644 --- a/airflow/example_dags/tutorial_taskflow_api_virtualenv.py +++ b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py @@ -38,7 +38,7 @@ def tutorial_taskflow_api_virtualenv(): """ @task.virtualenv( - use_cloudpickle=True, + use_dill=True, system_site_packages=False, requirements=["funcsigs"], ) diff --git a/airflow/models/dagpickle.py b/airflow/models/dagpickle.py index f451050c803e7..e6f4561d8e1bf 100644 --- a/airflow/models/dagpickle.py +++ b/airflow/models/dagpickle.py @@ -19,7 +19,7 @@ from typing import TYPE_CHECKING -import cloudpickle +import dill from sqlalchemy import BigInteger, Column, Integer, PickleType from airflow.models.base import Base @@ -42,7 +42,7 @@ class DagPickle(Base): """ id = Column(Integer, primary_key=True) - pickle = Column(PickleType(pickler=cloudpickle)) + pickle = Column(PickleType(pickler=dill)) created_dttm = Column(UtcDateTime, default=timezone.utcnow) pickle_hash = Column(BigInteger) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 79790c0e85f4a..b4d5d5d65a1b3 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -34,7 +34,7 @@ from typing import TYPE_CHECKING, Any, Callable, Collection, Generator, Iterable, Mapping, Tuple from urllib.parse import quote -import cloudpickle +import dill import jinja2 import lazy_object_proxy import pendulum @@ -1353,7 +1353,7 @@ class TaskInstance(Base, LoggingMixin): queued_by_job_id = Column(Integer) pid = Column(Integer) executor = Column(String(1000)) - executor_config = Column(ExecutorConfigType(pickler=cloudpickle)) + executor_config = Column(ExecutorConfigType(pickler=dill)) updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow) rendered_map_index = Column(String(250)) diff --git a/airflow/operators/python.py b/airflow/operators/python.py index 3e89e55457bac..cfaf5f16a6074 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -23,6 +23,7 @@ import json import logging import os +import pickle import shutil import subprocess import sys @@ -63,7 +64,6 @@ import dill as serialization_library else: log.warning("Neither dill and cloudpickle are installed. Please install one with: pip install [name]") - import pickle if TYPE_CHECKING: from pendulum.datetime import DateTime diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index df565f11870b1..6e914a2d1f946 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -17,19 +17,17 @@ from __future__ import annotations import base64 -import importlib -import logging import os -import shutil +import pickle import uuid from shlex import quote from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Callable, Sequence +import dill from kubernetes.client import models as k8s from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory -from airflow.exceptions import AirflowException from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.providers.cncf.kubernetes.python_kubernetes_script import ( write_python_script, @@ -38,16 +36,6 @@ if TYPE_CHECKING: from airflow.utils.context import Context -log = logging.getLogger(__name__) - -if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): - import cloudpickle as serialization_library -elif shutil.which("dill") or importlib.util.find_spec("dill"): - import dill as serialization_library -else: - log.warning("Neither dill and cloudpickle are installed. Please install one with: pip install [name]") - import pickle - _PYTHON_SCRIPT_ENV = "__PYTHON_SCRIPT" _PYTHON_INPUT_ENV = "__PYTHON_INPUT" @@ -77,17 +65,8 @@ class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator): # there are some cases we can't deepcopy the objects (e.g protobuf). shallow_copy_attrs: Sequence[str] = ("python_callable",) - def __init__( - self, namespace: str = "default", use_dill: bool = False, use_cloudpickle: bool = False, **kwargs - ) -> None: - if use_dill and use_cloudpickle: - raise AirflowException( - "Both 'use_dill' and 'use_cloudpickle' parameters are set to True. Please," - " choose only one." - ) - if use_dill: - use_cloudpickle = use_dill - self.use_cloudpickle = use_cloudpickle + def __init__(self, namespace: str = "default", use_dill: bool = False, **kwargs) -> None: + self.use_dill = use_dill super().__init__( namespace=namespace, name=kwargs.pop("name", f"k8s_airflow_pod_{uuid.uuid4().hex}"), @@ -121,7 +100,7 @@ def _generate_cmds(self) -> list[str]: def execute(self, context: Context): with TemporaryDirectory(prefix="venv") as tmp_dir: - pickling_library = serialization_library if self.use_cloudpickle else pickle + pickling_library = dill if self.use_dill else pickle script_filename = os.path.join(tmp_dir, "script.py") input_filename = os.path.join(tmp_dir, "script.in") diff --git a/airflow/providers/docker/decorators/docker.py b/airflow/providers/docker/decorators/docker.py index 71f6836eee693..9aafdd1d79bfb 100644 --- a/airflow/providers/docker/decorators/docker.py +++ b/airflow/providers/docker/decorators/docker.py @@ -17,28 +17,17 @@ from __future__ import annotations import base64 -import importlib -import logging import os -import shutil +import pickle from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Callable, Sequence +import dill + from airflow.decorators.base import DecoratedOperator, task_decorator_factory -from airflow.exceptions import AirflowException from airflow.providers.docker.operators.docker import DockerOperator from airflow.utils.python_virtualenv import write_python_script -log = logging.getLogger(__name__) - -if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): - import cloudpickle as serialization_library -elif shutil.which("dill") or importlib.util.find_spec("dill"): - import dill as serialization_library -else: - log.warning("Neither dill and cloudpickle are installed. Please install one with: pip install [name]") - import pickle - if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator from airflow.utils.context import Context @@ -65,7 +54,6 @@ class _DockerDecoratedOperator(DecoratedOperator, DockerOperator): :param python_callable: A reference to an object that is callable :param python: Python binary name to use :param use_dill: Whether dill should be used to serialize the callable - :param use_cloudpickle: Whether cloudpickle should be used to serialize the callable :param expect_airflow: whether to expect airflow to be installed in the docker environment. if this one is specified, the script to run callable will attempt to load Airflow macros. :param op_kwargs: a dictionary of keyword arguments that will get unpacked @@ -84,7 +72,6 @@ class _DockerDecoratedOperator(DecoratedOperator, DockerOperator): def __init__( self, use_dill=False, - use_cloudpickle=False, python_command="python3", expect_airflow: bool = True, **kwargs, @@ -92,14 +79,7 @@ def __init__( command = "placeholder command" self.python_command = python_command self.expect_airflow = expect_airflow - if use_dill and use_cloudpickle: - raise AirflowException( - "Both 'use_dill' and 'use_cloudpickle' parameters are set to True. Please," - " choose only one." - ) - if use_dill: - use_cloudpickle = use_dill - self.use_cloudpickle = use_cloudpickle + self.use_dill = use_dill super().__init__( command=command, retrieve_output=True, retrieve_output_path="/tmp/script.out", **kwargs ) @@ -148,8 +128,8 @@ def execute(self, context: Context): @property def pickling_library(self): - if self.use_cloudpickle: - return serialization_library + if self.use_dill: + return dill return pickle diff --git a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py index 7f8b4d6adc934..0c94e11c741c3 100644 --- a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py +++ b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py @@ -19,15 +19,14 @@ from __future__ import annotations import base64 -import importlib import json -import logging import os import re -import shutil from typing import TYPE_CHECKING, Callable, Iterable, TypeVar from urllib.parse import urlsplit +import dill + from airflow.exceptions import AirflowException from airflow.operators.python import PythonOperator from airflow.providers.apache.beam.hooks.beam import BeamRunnerType @@ -38,16 +37,6 @@ if TYPE_CHECKING: from airflow import DAG -log = logging.getLogger(__name__) - -if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): - import cloudpickle as serialization_library -elif shutil.which("dill") or importlib.util.find_spec("dill"): - import dill as serialization_library -else: - log.warning("Neither dill and cloudpickle are installed. Please install one with: pip install [name]") - import pickle as serialization_library - T = TypeVar("T", bound=Callable) @@ -244,7 +233,8 @@ def validate_err_and_count(summary): version_name=version_name, dag=dag, ) - metric_fn_encoded = base64.b64encode(serialization_library.dumps(metric_fn)).decode() + + metric_fn_encoded = base64.b64encode(dill.dumps(metric_fn, recurse=True)).decode() evaluate_summary = BeamRunPythonPipelineOperator( task_id=(task_prefix + "-summary"), runner=BeamRunnerType.DataflowRunner, diff --git a/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py b/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py index bcc2069f1e865..d0a13378b479d 100644 --- a/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py +++ b/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py @@ -30,7 +30,7 @@ - ``--metric_fn_encoded``: An encoded function that calculates and returns a tuple of metric(s) for a given instance (as a dictionary). It should be encoded - via ``base64.b64encode(cloudpickle.dumps(fn, recurse=True))``. + via ``base64.b64encode(dill.dumps(fn, recurse=True))``. - ``--metric_keys``: A comma-separated key(s) of the aggregated metric(s) in the summary output. The order and the size of the keys must match to the output @@ -57,7 +57,7 @@ def metric_fn(inst): squared_err = (classes-label)**2 return (log_loss, squared_err) return metric_fn - metric_fn_encoded = base64.b64encode(cloudpickle.dumps(get_metric_fn(), recurse=True)) + metric_fn_encoded = base64.b64encode(dill.dumps(get_metric_fn(), recurse=True)) DataflowCreatePythonJobOperator( task_id="summary-prediction", py_options=["-m"], @@ -111,25 +111,14 @@ def metric_fn(inst): import argparse import base64 -import importlib import json import logging import os -import shutil import apache_beam as beam +import dill from apache_beam.coders.coders import Coder -log = logging.getLogger(__name__) - -if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): - import cloudpickle as serialization_library -elif shutil.which("dill") or importlib.util.find_spec("dill"): - import dill as serialization_library -else: - log.warning("Neither dill and cloudpickle are installed. Please install one with: pip install [name]") - import pickle as serialization_library - class JsonCoder(Coder): """JSON encoder/decoder.""" @@ -181,7 +170,7 @@ def run(argv=None): help=( "An encoded function that calculates and returns a tuple of " "metric(s) for a given instance (as a dictionary). It should be " - "encoded via base64.b64encode(cloudpickle.dumps(fn)) or base64.b64encode(dill.dumps(fn))." + "encoded via base64.b64encode(dill.dumps(fn, recurse=True))." ), ) parser.add_argument( @@ -196,7 +185,8 @@ def run(argv=None): ), ) known_args, pipeline_args = parser.parse_known_args(argv) - metric_fn = serialization_library.loads(base64.b64decode(known_args.metric_fn_encoded)) + + metric_fn = dill.loads(base64.b64decode(known_args.metric_fn_encoded)) if not callable(metric_fn): raise ValueError("--metric_fn_encoded must be an encoded callable.") metric_keys = known_args.metric_keys.split(",") diff --git a/contributing-docs/12_airflow_dependencies_and_extras.rst b/contributing-docs/12_airflow_dependencies_and_extras.rst index 4e9be81bd44ad..3303f4fafb75f 100644 --- a/contributing-docs/12_airflow_dependencies_and_extras.rst +++ b/contributing-docs/12_airflow_dependencies_and_extras.rst @@ -164,8 +164,8 @@ Those extras are available as regular core airflow extras - they install optiona .. START CORE EXTRAS HERE -aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, deprecated-api, dill, -github-enterprise, google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, +aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, deprecated-api, github- +enterprise, google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, rabbitmq, s3fs, saml, sentry, statsd, uv, virtualenv .. END CORE EXTRAS HERE diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst index de10d5547abf6..f16fa908843df 100644 --- a/docs/apache-airflow/extra-packages-ref.rst +++ b/docs/apache-airflow/extra-packages-ref.rst @@ -89,8 +89,6 @@ python dependencies for the provided package. +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ | cloudpickle | pip install apache-airflow[cloudpickle] | Cloudpickle hooks and operators | +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ -| dill | pip install apache-airflow[dill] | Dill hooks and operators | -+-------------+-----------------------------------------+------------------------------------------------------------------------------------------------+ Providers extras diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst index cd2a78185e96a..b8619cd38bce8 100644 --- a/docs/apache-airflow/howto/operator/python.rst +++ b/docs/apache-airflow/howto/operator/python.rst @@ -112,10 +112,9 @@ inside a new Python virtual environment. The ``virtualenv`` package needs to be that runs Airflow (as optional dependency ``pip install apache-airflow[virtualenv] --constraint ...``). Additionally, the ``cloudpickle`` package needs to be installed as an optional dependency using command -``pip install [cloudpickle] --constraint ...``. This package is a replacement for currently used ``dill`` package, -which is currently converted to an optional dependency. Cloudpickle offers a strong advantage for its focus on -standard pickling protocol, ensuring wider compatibility and smoother data exchange, while still effectively -handling common Python objects and global variables within functions. +``pip install [cloudpickle] --constraint ...``. This package is a replacement for currently used ``dill`` package. +Cloudpickle offers a strong advantage for its focus on standard pickling protocol, ensuring wider compatibility and +smoother data exchange, while still effectively handling common Python objects and global variables within functions. .. tip:: The ``@task.virtualenv`` decorator is recommended over the classic ``PythonVirtualenvOperator`` diff --git a/hatch_build.py b/hatch_build.py index 803a012588ad8..6b87f3b0c0981 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -145,12 +145,6 @@ "virtualenv": [ "virtualenv", ], - "dill": [ - "dill>=0.2.2", - ], - "cloudpickle": [ - "cloudpickle", - ], } DOC_EXTRAS: dict[str, list[str]] = { @@ -420,6 +414,7 @@ # Blinker use for signals in Flask, this is an optional dependency in Flask 2.2 and lower. # In Flask 2.3 it becomes a mandatory dependency, and flask signals are always available. "blinker>=1.6.2", + "cloudpickle", # Colorlog 6.x merges TTYColoredFormatter into ColoredFormatter, breaking backwards compatibility with 4.x # Update CustomTTYColoredFormatter to remove "colorlog>=4.0.2, <5.0", @@ -435,6 +430,7 @@ "croniter>=2.0.2", "cryptography>=39.0.0", "deprecated>=1.2.13", + "dill>=0.2.2", "flask-caching>=1.5.0", # Flask-Session 0.6 add new arguments into the SqlAlchemySessionInterface constructor as well as # all parameters now are mandatory which make AirflowDatabaseSessionInterface incopatible with this version. diff --git a/pyproject.toml b/pyproject.toml index a2a67eb6eda92..bb4cf71d801f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,8 +78,8 @@ dynamic = ["version", "optional-dependencies", "dependencies"] # # START CORE EXTRAS HERE # -# aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, deprecated-api, dill, -# github-enterprise, google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, +# aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, deprecated-api, github- +# enterprise, google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, # rabbitmq, s3fs, saml, sentry, statsd, uv, virtualenv # # END CORE EXTRAS HERE diff --git a/tests/decorators/test_external_python.py b/tests/decorators/test_external_python.py index 41a30bbe50362..fe5c76101c6d2 100644 --- a/tests/decorators/test_external_python.py +++ b/tests/decorators/test_external_python.py @@ -57,26 +57,17 @@ def venv_python(): @pytest.fixture -def venv_python_with_cloudpickle(): +def venv_python_with_cloudpickle_and_dill(): with TemporaryDirectory() as d: venv.create(d, with_pip=True) python_path = Path(d) / "bin" / "python" - subprocess.call([python_path, "-m", "pip", "install", "cloudpickle"]) - yield python_path - - -@pytest.fixture -def venv_python_with_dill(): - with TemporaryDirectory() as d: - venv.create(d, with_pip=True) - python_path = Path(d) / "bin" / "python" - subprocess.call([python_path, "-m", "pip", "install", "dill"]) + subprocess.call([python_path, "-m", "pip", "install", "cloudpickle", "dill"]) yield python_path class TestExternalPythonDecorator: - def test_with_cloudpickle_works(self, dag_maker, venv_python_with_cloudpickle): - @task.external_python(python=venv_python_with_cloudpickle, use_cloudpickle=True) + def test_with_cloudpickle_works(self, dag_maker, venv_python_with_cloudpickle_and_dill): + @task.external_python(python=venv_python_with_cloudpickle_and_dill, use_cloudpickle=True) def f(): """Import cloudpickle to double-check it is installed .""" try: @@ -92,9 +83,9 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - def test_with_templated_python_cloudpickle(self, dag_maker, venv_python_with_cloudpickle): + def test_with_templated_python_cloudpickle(self, dag_maker, venv_python_with_cloudpickle_and_dill): # add template that produces empty string when rendered - templated_python_with_cloudpickle = venv_python_with_cloudpickle.as_posix() + "{{ '' }}" + templated_python_with_cloudpickle = venv_python_with_cloudpickle_and_dill.as_posix() + "{{ '' }}" @task.external_python(python=templated_python_with_cloudpickle, use_cloudpickle=True) def f(): @@ -123,14 +114,17 @@ def f(): with pytest.raises(CalledProcessError): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - def test_with_dill_works(self, dag_maker, venv_python_with_dill): - @task.external_python(python=venv_python_with_dill, use_dill=True) + def test_with_dill_works(self, dag_maker, venv_python_with_cloudpickle_and_dill): + @task.external_python(python=venv_python_with_cloudpickle_and_dill, use_dill=True) def f(): """Import dill to double-check it is installed .""" try: import dill # noqa: F401 except ImportError: - log.warning( + import logging + + _log = logging.getLogger(__name__) + _log.warning( "Dill package is required to be installed. Please install it with: pip install [dill]" ) @@ -139,9 +133,9 @@ def f(): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - def test_with_templated_python_dill(self, dag_maker, venv_python_with_dill): + def test_with_templated_python_dill(self, dag_maker, venv_python_with_cloudpickle_and_dill): # add template that produces empty string when rendered - templated_python_with_dill = venv_python_with_dill.as_posix() + "{{ '' }}" + templated_python_with_dill = venv_python_with_cloudpickle_and_dill.as_posix() + "{{ '' }}" @task.external_python(python=templated_python_with_dill, use_dill=True) def f(): @@ -149,7 +143,10 @@ def f(): try: import dill # noqa: F401 except ImportError: - log.warning( + import logging + + _log = logging.getLogger(__name__) + _log.warning( "Dill package is required to be installed. Please install it with: pip install [dill]" ) diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index 7213014e46755..cf9630d141f92 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -931,7 +931,10 @@ def f(): try: import cloudpickle # noqa: F401 except ImportError: - log.warning( + import logging + + _log = logging.getLogger(__name__) + _log.warning( "Cloudpickle package is required to be installed." " Please install it with: pip install [cloudpickle]" ) @@ -944,7 +947,10 @@ def f(): try: import dill # noqa: F401 except ImportError: - log.warning( + import logging + + _log = logging.getLogger(__name__) + _log.warning( "Dill package is required to be installed. Please install it with: pip install [dill]" ) @@ -1157,11 +1163,7 @@ def f(a): os.environ.get("PYTEST_PLAIN_ASSERTS") != "true", reason="assertion rewriting breaks this test because cloudpickle will try to serialize " "AssertRewritingHook including captured stdout and we need to run " - "it with `--assert=plain`pytest option and PYTEST_PLAIN_ASSERTS=true ." - "Also this test is skipped on Python 3.11 because of impact of regression in Python 3.11 " - "connected likely with CodeType behaviour https://github.com/python/cpython/issues/100316 " - "That likely causes that cloudpickle is not able to serialize the `conf` correctly " - "Issue about fixing it is captured in https://github.com/apache/airflow/issues/35307", + "it with `--assert=plain`pytest option and PYTEST_PLAIN_ASSERTS=true .", ) def test_airflow_context(self): def f( @@ -1212,11 +1214,7 @@ def f( os.environ.get("PYTEST_PLAIN_ASSERTS") != "true", reason="assertion rewriting breaks this test because dill will try to serialize " "AssertRewritingHook including captured stdout and we need to run " - "it with `--assert=plain`pytest option and PYTEST_PLAIN_ASSERTS=true ." - "Also this test is skipped on Python 3.11 because of impact of regression in Python 3.11 " - "connected likely with CodeType behaviour https://github.com/python/cpython/issues/100316 " - "That likely causes that dill is not able to serialize the `conf` correctly " - "Issue about fixing it is captured in https://github.com/apache/airflow/issues/35307", + "it with `--assert=plain`pytest option and PYTEST_PLAIN_ASSERTS=true .", ) def test_airflow_context_dill(self): def f( diff --git a/tests/providers/docker/decorators/test_docker.py b/tests/providers/docker/decorators/test_docker.py index 33b682ab977e6..e4fbe15fc3243 100644 --- a/tests/providers/docker/decorators/test_docker.py +++ b/tests/providers/docker/decorators/test_docker.py @@ -204,13 +204,13 @@ def f(): assert teardown_task.is_teardown assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun - @pytest.mark.parametrize("use_cloudpickle", [True, False]) - def test_deepcopy_with_python_operator(self, dag_maker, use_cloudpickle): + @pytest.mark.parametrize("use_dill", [True, False]) + def test_deepcopy_with_python_operator(self, dag_maker, use_dill): import copy from airflow.providers.docker.decorators.docker import _DockerDecoratedOperator - @task.docker(image="python:3.9-slim", auto_remove="force", use_cloudpickle=use_cloudpickle) + @task.docker(image="python:3.9-slim", auto_remove="force", use_dill=use_dill) def f(): import logging @@ -244,7 +244,7 @@ def g(): assert isinstance(clone_of_docker_operator, _DockerDecoratedOperator) assert some_task.command == clone_of_docker_operator.command assert some_task.expect_airflow == clone_of_docker_operator.expect_airflow - assert some_task.use_cloudpickle == clone_of_docker_operator.use_cloudpickle + assert some_task.use_dill == clone_of_docker_operator.use_dill assert some_task.pickling_library is clone_of_docker_operator.pickling_library def test_respect_docker_host_env(self, monkeypatch, dag_maker): diff --git a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py index ac3ae634aa08e..82260ddc247d3 100644 --- a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py +++ b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py @@ -17,13 +17,11 @@ from __future__ import annotations import base64 -import importlib import json -import logging -import shutil from datetime import datetime from unittest import mock +import dill import pytest from airflow.exceptions import AirflowException @@ -33,30 +31,6 @@ from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.utils.mlengine_operator_utils import create_evaluate_ops -log = logging.getLogger(__name__) - -try: - import cloudpickle - import dill -except ImportError: - log.warning( - "Dill or cloudpickle packages are required to be installed." - " Please install one of them with: pip install [dill] or pip install [cloudpickle]" - ) - - -def is_cloudpickle_installed() -> bool: - """ - Check if the dill or cloudpickle package is installed via checking if it is on the - path or installed as package. - - :return: True if it is. Whichever way of checking it works, is fine. - """ - if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): - return True - return False - - TASK_PREFIX = "test-task-prefix" TASK_PREFIX_PREDICTION = TASK_PREFIX + "-prediction" TASK_PREFIX_SUMMARY = TASK_PREFIX + "-summary" @@ -142,10 +116,7 @@ def test_create_evaluate_ops(self, mock_beam_pipeline, mock_python): # importing apache_beam elsewhere modifies the metrics. In order to avoid metrics being modified # by apache_beam import happening after importing this test, we retrieve the metrics here rather than # at the top of the file. - if is_cloudpickle_installed(): - METRIC_FN_ENCODED = base64.b64encode(cloudpickle.dumps(METRIC_FN)).decode() - else: - METRIC_FN_ENCODED = base64.b64encode(dill.dumps(METRIC_FN)).decode() + METRIC_FN_ENCODED = base64.b64encode(dill.dumps(METRIC_FN, recurse=True)).decode() assert TASK_PREFIX_PREDICTION == evaluate_prediction.task_id assert PROJECT_ID == evaluate_prediction.project_id @@ -191,10 +162,7 @@ def test_create_evaluate_ops_model_and_version_name(self, mock_beam_pipeline, mo # importing apache_beam elsewhere modifies the metrics. In order to avoid metrics being modified # by apache_beam import happening after importing this test, we retrieve the metrics here rather than # at the top of the file. - if is_cloudpickle_installed(): - METRIC_FN_ENCODED = base64.b64encode(cloudpickle.dumps(METRIC_FN)).decode() - else: - METRIC_FN_ENCODED = base64.b64encode(dill.dumps(METRIC_FN)).decode() + METRIC_FN_ENCODED = base64.b64encode(dill.dumps(METRIC_FN, recurse=True)).decode() assert TASK_PREFIX_PREDICTION == evaluate_prediction.task_id assert PROJECT_ID == evaluate_prediction.project_id @@ -237,10 +205,7 @@ def test_create_evaluate_ops_dag(self, mock_dataflow, mock_python): # importing apache_beam elsewhere modifies the metrics. In order to avoid metrics being modified # by apache_beam import happening after importing this test, we retrieve the metrics here rather than # at the top of the file. - if is_cloudpickle_installed(): - METRIC_FN_ENCODED = base64.b64encode(cloudpickle.dumps(METRIC_FN)).decode() - else: - METRIC_FN_ENCODED = base64.b64encode(dill.dumps(METRIC_FN)).decode() + METRIC_FN_ENCODED = base64.b64encode(dill.dumps(METRIC_FN, recurse=True)).decode() assert TASK_PREFIX_PREDICTION == evaluate_prediction.task_id assert PROJECT_ID == evaluate_prediction.project_id diff --git a/tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py b/tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py index 526cfc3c0ad20..7d28cf1e68413 100644 --- a/tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py +++ b/tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py @@ -18,12 +18,10 @@ import base64 import binascii -import importlib -import logging -import shutil import sys from unittest import mock +import dill import pytest if sys.version_info < (3, 12): @@ -33,29 +31,6 @@ f"package apache_beam is not available for Python 3.12. Skipping all tests in {__name__}" ) -log = logging.getLogger(__name__) - -try: - import cloudpickle - import dill -except ImportError: - log.warning( - "Dill or cloudpickle packages are required to be installed." - " Please install one of them with: pip install [dill] or pip install [cloudpickle]" - ) - - -def is_cloudpickle_installed() -> bool: - """ - Check if the dill or cloudpickle package is installed via checking if it is on the - path or installed as package. - - :return: True if it is. Whichever way of checking it works, is fine. - """ - if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"): - return True - return False - class TestJsonCode: def test_encode(self): @@ -100,10 +75,7 @@ def test_run_should_fail_for_invalid_encoded_fn(self): def test_run_should_fail_if_enc_fn_is_not_callable(self): non_callable_value = 1 - if is_cloudpickle_installed(): - fn_enc = base64.b64encode(cloudpickle.dumps(non_callable_value)).decode("utf-8") - else: - fn_enc = base64.b64encode(dill.dumps(non_callable_value)).decode("utf-8") + fn_enc = base64.b64encode(dill.dumps(non_callable_value)).decode("utf-8") with pytest.raises(ValueError): mlengine_prediction_summary.run( @@ -126,10 +98,7 @@ def test_run_should_not_fail_with_valid_fn(self): def metric_function(): return 1 - if is_cloudpickle_installed(): - fn_enc = base64.b64encode(cloudpickle.dumps(metric_function)).decode("utf-8") - else: - fn_enc = base64.b64encode(dill.dumps(metric_function)).decode("utf-8") + fn_enc = base64.b64encode(dill.dumps(metric_function)).decode("utf-8") mlengine_prediction_summary.run( [ diff --git a/tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py b/tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py index 7b172af11d3f7..95c502a221073 100644 --- a/tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py +++ b/tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py @@ -44,7 +44,7 @@ def tutorial_taskflow_api_docker_virtualenv(): # [START extract_virtualenv] @task.virtualenv( - use_cloudpickle=True, + use_dill=True, system_site_packages=False, requirements=["funcsigs"], ) diff --git a/tests/utils/test_preexisting_python_virtualenv_decorator.py b/tests/utils/test_preexisting_python_virtualenv_decorator.py index c15c9479f2576..2e97469958fd3 100644 --- a/tests/utils/test_preexisting_python_virtualenv_decorator.py +++ b/tests/utils/test_preexisting_python_virtualenv_decorator.py @@ -22,7 +22,7 @@ class TestExternalPythonDecorator: def test_remove_task_decorator(self): - py_source = "@task.external_python(use_cloudpickle=True)\ndef f():\nimport funcsigs" + py_source = "@task.external_python(use_dill=True)\ndef f():\nimport funcsigs" res = remove_task_decorator(python_source=py_source, task_decorator_name="@task.external_python") assert res == "def f():\nimport funcsigs" diff --git a/tests/utils/test_python_virtualenv.py b/tests/utils/test_python_virtualenv.py index 6bf03d623cff9..38cda4854baf6 100644 --- a/tests/utils/test_python_virtualenv.py +++ b/tests/utils/test_python_virtualenv.py @@ -116,7 +116,7 @@ def test_should_create_virtualenv_with_extra_packages(self, mock_execute_in_subp mock_execute_in_subprocess.assert_called_with(["/VENV/bin/pip", "install", "apache-beam[gcp]"]) def test_remove_task_decorator(self): - py_source = "@task.virtualenv(use_cloudpickle=True)\ndef f():\nimport funcsigs" + py_source = "@task.virtualenv(use_dill=True)\ndef f():\nimport funcsigs" res = remove_task_decorator(python_source=py_source, task_decorator_name="@task.virtualenv") assert res == "def f():\nimport funcsigs" From 863c5c37f6ce59c771a6f7f9f328d5e79990cf35 Mon Sep 17 00:00:00 2001 From: Maksim Moiseenkov Date: Thu, 18 Apr 2024 13:20:31 +0000 Subject: [PATCH 6/7] fixes --- airflow/operators/python.py | 2 +- hatch_build.py | 4 +++- tests/decorators/test_python_virtualenv.py | 5 ++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/airflow/operators/python.py b/airflow/operators/python.py index cfaf5f16a6074..cc76b5d2c094e 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -63,7 +63,7 @@ elif shutil.which("dill") or importlib.util.find_spec("dill"): import dill as serialization_library else: - log.warning("Neither dill and cloudpickle are installed. Please install one with: pip install [name]") + log.debug("Neither dill and cloudpickle are installed. Please install one with: pip install [name]") if TYPE_CHECKING: from pendulum.datetime import DateTime diff --git a/hatch_build.py b/hatch_build.py index 6b87f3b0c0981..7fddae5bb2545 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -75,6 +75,9 @@ # Cgroupspy 0.2.2 added Python 3.10 compatibility "cgroupspy>=0.2.2", ], + "cloudpickle": [ + "cloudpickle", + ], "deprecated-api": [ "requests>=2.27.0,<3", ], @@ -414,7 +417,6 @@ # Blinker use for signals in Flask, this is an optional dependency in Flask 2.2 and lower. # In Flask 2.3 it becomes a mandatory dependency, and flask signals are always available. "blinker>=1.6.2", - "cloudpickle", # Colorlog 6.x merges TTYColoredFormatter into ColoredFormatter, breaking backwards compatibility with 4.x # Update CustomTTYColoredFormatter to remove "colorlog>=4.0.2, <5.0", diff --git a/tests/decorators/test_python_virtualenv.py b/tests/decorators/test_python_virtualenv.py index bb18dcacabf17..aff442589e7ba 100644 --- a/tests/decorators/test_python_virtualenv.py +++ b/tests/decorators/test_python_virtualenv.py @@ -54,7 +54,10 @@ def f(): try: import dill # noqa: F401 except ImportError: - log.warning( + import logging + + _log = logging.getLogger(__name__) + _log.warning( "Dill package is required to be installed. Please install it with: pip install [dill]" ) From e2eabbb0b057d68908107deb70ca58336b80cec1 Mon Sep 17 00:00:00 2001 From: Maksim Moiseenkov Date: Thu, 18 Apr 2024 14:15:30 +0000 Subject: [PATCH 7/7] Hadle case when cached venv is used after the operator's upgrade --- airflow/operators/python.py | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/airflow/operators/python.py b/airflow/operators/python.py index cc76b5d2c094e..2368d78d80ce4 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -684,11 +684,12 @@ def __init__( **kwargs, ) - def _requirements_list(self) -> list[str]: + def _requirements_list(self, exclude_cloudpickle: bool = False) -> list[str]: """Prepare a list of requirements that need to be installed for the virtual environment.""" requirements = [str(dependency) for dependency in self.requirements] - if not self.system_site_packages and self.use_cloudpickle and "cloudpickle" not in requirements: - requirements.append("cloudpickle") + if not exclude_cloudpickle: + if not self.system_site_packages and self.use_cloudpickle and "cloudpickle" not in requirements: + requirements.append("cloudpickle") requirements.sort() # Ensure a hash is stable return requirements @@ -705,7 +706,7 @@ def _prepare_venv(self, venv_path: Path) -> None: index_urls=self.index_urls, ) - def _calculate_cache_hash(self) -> tuple[str, str]: + def _calculate_cache_hash(self, exclude_cloudpickle: bool = False) -> tuple[str, str]: """Generate the hash of the cache folder to use. The following factors are used as input for the hash: @@ -719,7 +720,7 @@ def _calculate_cache_hash(self) -> tuple[str, str]: Returns a hash and the data dict which is the base for the hash as text. """ hash_dict = { - "requirements_list": self._requirements_list(), + "requirements_list": self._requirements_list(exclude_cloudpickle=exclude_cloudpickle), "pip_install_options": self.pip_install_options, "index_urls": self.index_urls, "cache_key": str(Variable.get("PythonVirtualenvOperator.cache_key", "")), @@ -750,14 +751,22 @@ def _ensure_venv_cache_exists(self, venv_cache_path: Path) -> Path: self.log.info("Re-using cached Python virtual environment in %s", venv_path) return venv_path - self.log.error( - "Unicorn alert: Found a previous virtual environment in %s " - "with the same hash but different parameters. Previous setup: '%s' / " - "Requested venv setup: '%s'. Please report a bug to airflow!", - venv_path, - previous_hash_data, - hash_data, - ) + _, hash_data_before_upgrade = self._calculate_cache_hash(exclude_cloudpickle=True) + if previous_hash_data == hash_data_before_upgrade: + self.log.warning( + "Found a previous virtual environment in with outdated dependencies %s, " + "deleting and re-creating.", + venv_path, + ) + else: + self.log.error( + "Unicorn alert: Found a previous virtual environment in %s " + "with the same hash but different parameters. Previous setup: '%s' / " + "Requested venv setup: '%s'. Please report a bug to airflow!", + venv_path, + previous_hash_data, + hash_data, + ) else: self.log.warning( "Found a previous (probably partial installed) virtual environment in %s, "