Skip to content

Commit 6eda2af

Browse files
pedro-psbmdellweg
authored andcommitted
Prevent dispatch from non-main thread to run immediately
Immediate/short tasks are now run inside an event loop, but that doesnt work inside a thread from a thread pool, like it happens with on the pull-through workflow from the content-app. This enforces that the short task can't run immediately if the dispatch is done from inside such a context. Closes: #6429
1 parent e16ecdb commit 6eda2af

File tree

4 files changed

+32
-9
lines changed

4 files changed

+32
-9
lines changed

CHANGES/plugin_api/6429.bugfix

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
Re-added* timeout to tasks dispatched as immediate for both deferred and non-deferred runs.
2+
3+
Also, immediate tasks must be coroutines from now on.
4+
This is to enable immediate tasks to run on the workers foreground without completely blocking heartbeats.
5+
Support for legacy non-coroutines immediate task will be dropped in pulpcore 3.85.
6+
7+
\* This was added in pulpcore 3.75.0, reverted in 3.75.1 due to a regression and re-applied here with the regression fix.

pulpcore/app/tasks/base.py

+9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from pulpcore.app.apps import get_plugin_config
44
from pulpcore.app.models import CreatedResource
5+
from pulpcore.app.loggers import deprecation_logger
56
from pulpcore.plugin.models import MasterModel
67

78
from asgiref.sync import sync_to_async
@@ -65,6 +66,10 @@ def general_update(instance_id, app_label, serializer_name, *args, **kwargs):
6566
due to validation error. This theoretically should never occur since validation is
6667
performed before the task is dispatched.
6768
"""
69+
deprecation_logger.warning(
70+
"`pulpcore.app.tasks.base.general_update` is deprecated and will be removed in Pulp 4. "
71+
"Use `pulpcore.app.tasks.base.ageneral_update` instead."
72+
)
6873
data = kwargs.pop("data", None)
6974
partial = kwargs.pop("partial", False)
7075
serializer_class = get_plugin_config(app_label).named_serializers[serializer_name]
@@ -87,6 +92,10 @@ def general_delete(instance_id, app_label, serializer_name):
8792
app_label (str): the Django app label of the plugin that provides the model
8893
serializer_name (str): name of the serializer class for the model
8994
"""
95+
deprecation_logger.warning(
96+
"`pulpcore.app.tasks.base.general_delete` is deprecated and will be removed in Pulp 4. "
97+
"Use `pulpcore.app.tasks.base.ageneral_delete` instead."
98+
)
9099
serializer_class = get_plugin_config(app_label).named_serializers[serializer_name]
91100
instance = serializer_class.Meta.model.objects.get(pk=instance_id)
92101
if isinstance(instance, MasterModel):

pulpcore/app/tasks/test.py

+5-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import asyncio
12
import backoff
3+
import time
24
from pulpcore.app.models import TaskGroup
35
from pulpcore.tasking.tasks import dispatch
46

@@ -9,24 +11,18 @@ def dummy_task():
911

1012

1113
def sleep(interval):
12-
from time import sleep
13-
14-
sleep(interval)
14+
time.sleep(interval)
1515

1616

1717
async def asleep(interval):
1818
"""Async function that sleeps."""
19-
import asyncio
20-
2119
await asyncio.sleep(interval)
2220

2321

2422
@backoff.on_exception(backoff.expo, BaseException)
2523
def gooey_task(interval):
2624
"""A sleep task that tries to avoid being killed by ignoring all exceptions."""
27-
from time import sleep
28-
29-
sleep(interval)
25+
time.sleep(interval)
3026

3127

3228
def dummy_group_task(inbetween=3, intervals=None):
@@ -35,5 +31,5 @@ def dummy_group_task(inbetween=3, intervals=None):
3531
task_group = TaskGroup.current()
3632
for interval in intervals:
3733
dispatch(sleep, args=(interval,), task_group=task_group)
38-
sleep(inbetween)
34+
time.sleep(inbetween)
3935
task_group.finish()

pulpcore/tasking/tasks.py

+11
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import sys
88
import traceback
99
import tempfile
10+
import threading
1011
from asgiref.sync import sync_to_async
1112
from datetime import timedelta
1213
from gettext import gettext as _
@@ -130,6 +131,14 @@ def _execute_task(task):
130131
send_task_notification(task)
131132

132133

134+
def running_from_thread_pool() -> bool:
135+
# TODO: this needs an alternative approach ASAP!
136+
# Currently we rely on the weak fact that ThreadPoolExecutor names threads like:
137+
# "ThreadPoolExecutor-0_0"
138+
thread_name = threading.current_thread().name
139+
return "ThreadPoolExecutor" in thread_name
140+
141+
133142
def dispatch(
134143
func,
135144
args=None,
@@ -177,6 +186,8 @@ def dispatch(
177186
ValueError: When `resources` is an unsupported type.
178187
"""
179188

189+
# Can't run short tasks immediately if running from thread pool
190+
immediate = immediate and not running_from_thread_pool()
180191
assert deferred or immediate, "A task must be at least `deferred` or `immediate`."
181192

182193
if callable(func):

0 commit comments

Comments
 (0)