Skip to content

Commit 7b5962e

Browse files
authored
Refactor cloudpickle support in Python operators/decorators (#39270)
* Refactor cloudpickle support in Python operators/decorators * Fixup missing marker * Return back skip TestPythonVirtualenvOperator::test_airflow_context for dill * TestPythonVirtualenvOperator::test_airflow_context xfail instead of skip * Catch only on ModuleNotFound error and simple reraise with warning * Limit test_airflow_context only for python 3.11
1 parent 377a915 commit 7b5962e

File tree

7 files changed

+566
-513
lines changed

7 files changed

+566
-513
lines changed

airflow/decorators/__init__.pyi

+48-13
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,15 @@ class TaskDecoratorCollection:
111111
# _PythonVirtualenvDecoratedOperator.
112112
requirements: None | Iterable[str] | str = None,
113113
python_version: None | str | int | float = None,
114-
use_dill: bool = False,
114+
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
115115
system_site_packages: bool = True,
116116
templates_dict: Mapping[str, Any] | None = None,
117117
pip_install_options: list[str] | None = None,
118118
skip_on_exit_code: int | Container[int] | None = None,
119119
index_urls: None | Collection[str] | str = None,
120120
venv_cache_path: None | str = None,
121121
show_return_value_in_logs: bool = True,
122+
use_dill: bool = False,
122123
**kwargs,
123124
) -> TaskDecorator:
124125
"""Create a decorator to convert the decorated callable to a virtual environment task.
@@ -129,6 +130,13 @@ class TaskDecoratorCollection:
129130
"requirements file" as specified by pip.
130131
:param python_version: The Python version to run the virtual environment with. Note that
131132
both 2 and 2.7 are acceptable forms.
133+
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
134+
135+
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
136+
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
137+
this requires to include cloudpickle in your requirements.
138+
- ``"dill"``: Use dill for serialize more complex types,
139+
this requires to include dill in your requirements.
132140
:param use_dill: Whether to use dill to serialize
133141
the args and result (pickle is default). This allow more complex types
134142
but requires you to include dill in your requirements.
@@ -154,6 +162,9 @@ class TaskDecoratorCollection:
154162
logs. Defaults to True, which allows return value log output.
155163
It can be set to False to prevent log output of return value when you return huge data
156164
such as transmission a large amount of XCom to TaskAPI.
165+
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
166+
the args and result (pickle is default). This allows more complex types
167+
but requires you to include dill in your requirements.
157168
"""
158169
@overload
159170
def virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
@@ -164,9 +175,10 @@ class TaskDecoratorCollection:
164175
multiple_outputs: bool | None = None,
165176
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
166177
# _PythonVirtualenvDecoratedOperator.
167-
use_dill: bool = False,
178+
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
168179
templates_dict: Mapping[str, Any] | None = None,
169180
show_return_value_in_logs: bool = True,
181+
use_dill: bool = False,
170182
**kwargs,
171183
) -> TaskDecorator:
172184
"""Create a decorator to convert the decorated callable to a virtual environment task.
@@ -176,9 +188,13 @@ class TaskDecoratorCollection:
176188
(so usually start with "/" or "X:/" depending on the filesystem/os used).
177189
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
178190
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
179-
:param use_dill: Whether to use dill to serialize
180-
the args and result (pickle is default). This allow more complex types
181-
but requires you to include dill in your requirements.
191+
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
192+
193+
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
194+
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
195+
this requires to include cloudpickle in your requirements.
196+
- ``"dill"``: Use dill for serialize more complex types,
197+
this requires to include dill in your requirements.
182198
:param templates_dict: a dictionary where the values are templates that
183199
will get templated by the Airflow engine sometime between
184200
``__init__`` and ``execute`` takes place and are made available
@@ -187,6 +203,9 @@ class TaskDecoratorCollection:
187203
logs. Defaults to True, which allows return value log output.
188204
It can be set to False to prevent log output of return value when you return huge data
189205
such as transmission a large amount of XCom to TaskAPI.
206+
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
207+
the args and result (pickle is default). This allows more complex types
208+
but requires you to include dill in your requirements.
190209
"""
191210
@overload
192211
def branch( # type: ignore[misc]
@@ -211,14 +230,15 @@ class TaskDecoratorCollection:
211230
# _PythonVirtualenvDecoratedOperator.
212231
requirements: None | Iterable[str] | str = None,
213232
python_version: None | str | int | float = None,
214-
use_dill: bool = False,
233+
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
215234
system_site_packages: bool = True,
216235
templates_dict: Mapping[str, Any] | None = None,
217236
pip_install_options: list[str] | None = None,
218237
skip_on_exit_code: int | Container[int] | None = None,
219238
index_urls: None | Collection[str] | str = None,
220239
venv_cache_path: None | str = None,
221240
show_return_value_in_logs: bool = True,
241+
use_dill: bool = False,
222242
**kwargs,
223243
) -> TaskDecorator:
224244
"""Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator.
@@ -232,9 +252,13 @@ class TaskDecoratorCollection:
232252
"requirements file" as specified by pip.
233253
:param python_version: The Python version to run the virtual environment with. Note that
234254
both 2 and 2.7 are acceptable forms.
235-
:param use_dill: Whether to use dill to serialize
236-
the args and result (pickle is default). This allow more complex types
237-
but requires you to include dill in your requirements.
255+
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
256+
257+
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
258+
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
259+
this requires to include cloudpickle in your requirements.
260+
- ``"dill"``: Use dill for serialize more complex types,
261+
this requires to include dill in your requirements.
238262
:param system_site_packages: Whether to include
239263
system_site_packages in your virtual environment.
240264
See virtualenv documentation for more information.
@@ -253,6 +277,9 @@ class TaskDecoratorCollection:
253277
logs. Defaults to True, which allows return value log output.
254278
It can be set to False to prevent log output of return value when you return huge data
255279
such as transmission a large amount of XCom to TaskAPI.
280+
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
281+
the args and result (pickle is default). This allows more complex types
282+
but requires you to include dill in your requirements.
256283
"""
257284
@overload
258285
def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
@@ -264,9 +291,10 @@ class TaskDecoratorCollection:
264291
multiple_outputs: bool | None = None,
265292
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
266293
# _PythonVirtualenvDecoratedOperator.
267-
use_dill: bool = False,
294+
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
268295
templates_dict: Mapping[str, Any] | None = None,
269296
show_return_value_in_logs: bool = True,
297+
use_dill: bool = False,
270298
**kwargs,
271299
) -> TaskDecorator:
272300
"""Create a decorator to wrap the decorated callable into a BranchExternalPythonOperator.
@@ -279,9 +307,13 @@ class TaskDecoratorCollection:
279307
(so usually start with "/" or "X:/" depending on the filesystem/os used).
280308
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
281309
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
282-
:param use_dill: Whether to use dill to serialize
283-
the args and result (pickle is default). This allow more complex types
284-
but requires you to include dill in your requirements.
310+
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
311+
312+
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
313+
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
314+
this requires to include cloudpickle in your requirements.
315+
- ``"dill"``: Use dill for serialize more complex types,
316+
this requires to include dill in your requirements.
285317
:param templates_dict: a dictionary where the values are templates that
286318
will get templated by the Airflow engine sometime between
287319
``__init__`` and ``execute`` takes place and are made available
@@ -290,6 +322,9 @@ class TaskDecoratorCollection:
290322
logs. Defaults to True, which allows return value log output.
291323
It can be set to False to prevent log output of return value when you return huge data
292324
such as transmission a large amount of XCom to TaskAPI.
325+
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
326+
the args and result (pickle is default). This allows more complex types
327+
but requires you to include dill in your requirements.
293328
"""
294329
@overload
295330
def branch_external_python(

airflow/example_dags/tutorial_taskflow_api_virtualenv.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def tutorial_taskflow_api_virtualenv():
3838
"""
3939

4040
@task.virtualenv(
41-
use_dill=True,
41+
serializer="dill", # Use `dill` for advanced serialization.
4242
system_site_packages=False,
4343
requirements=["funcsigs"],
4444
)

0 commit comments

Comments
 (0)