Skip to content

Commit 0ee08f1

Browse files
committed
wip: review iteration
1 parent ca4c621 commit 0ee08f1

File tree

3 files changed

+12
-13
lines changed

3 files changed

+12
-13
lines changed

pulpcore/app/models/repository.py

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
get_domain_pk,
2626
cache_key,
2727
reverse,
28+
force_deferred_execution,
2829
)
2930
from pulpcore.constants import ALL_KNOWN_CONTENT_CHECKSUMS, PROTECTED_REPO_VERSION_MESSAGE
3031
from pulpcore.download.factory import DownloaderFactory
@@ -379,6 +380,7 @@ def pull_through_add_content(self, content_artifact):
379380
from pulpcore.plugin.tasking import dispatch, add_and_remove
380381

381382
body = {"repository_pk": self.pk, "add_content_units": [cpk], "remove_content_units": []}
383+
force_deferred_execution.set(True)
382384
return dispatch(add_and_remove, kwargs=body, exclusive_resources=[self], immediate=True)
383385

384386
@hook(AFTER_UPDATE, when="retain_repo_versions", has_changed=True)

pulpcore/app/util.py

+1
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,7 @@ def get_artifact_url(artifact, headers=None, http_method=None):
544544
return url
545545

546546

547+
force_deferred_execution = ContextVar("force_deferred_execution", default=False)
547548
current_task = ContextVar("current_task", default=None)
548549
_current_user_func = ContextVar("current_user", default=lambda: None)
549550

pulpcore/tasking/tasks.py

+9-13
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import sys
88
import traceback
99
import tempfile
10-
import threading
1110
from asgiref.sync import sync_to_async
1211
from datetime import timedelta
1312
from gettext import gettext as _
@@ -18,7 +17,13 @@
1817
from django_guid import get_guid
1918
from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS
2019
from pulpcore.app.models import Task, TaskGroup
21-
from pulpcore.app.util import current_task, get_domain, get_prn, deprecation_logger
20+
from pulpcore.app.util import (
21+
current_task,
22+
get_domain,
23+
get_prn,
24+
deprecation_logger,
25+
force_deferred_execution,
26+
)
2227
from pulpcore.constants import (
2328
TASK_FINAL_STATES,
2429
TASK_INCOMPLETE_STATES,
@@ -126,12 +131,6 @@ def _execute_task(task):
126131
send_task_notification(task)
127132

128133

129-
def running_from_thread_pool() -> bool:
130-
thread_name = threading.current_thread().name
131-
# ThreadPoolExecutor names threads like: "ThreadPoolExecutor-0_0"
132-
return "ThreadPoolExecutor" in thread_name
133-
134-
135134
def dispatch(
136135
func,
137136
args=None,
@@ -179,13 +178,10 @@ def dispatch(
179178
ValueError: When `resources` is an unsupported type.
180179
"""
181180

181+
# Can't run short tasks immediately from certain contexts (e.g, if running from thread pool)
182+
immediate = immediate and not force_deferred_execution
182183
assert deferred or immediate, "A task must be at least `deferred` or `immediate`."
183184

184-
# Can't run short tasks immediately if running from thread pool
185-
force_defer = True if running_from_thread_pool() else False
186-
short = immediate
187-
immediate = short and not force_defer
188-
189185
if callable(func):
190186
function_name = f"{func.__module__}.{func.__name__}"
191187
else:

0 commit comments

Comments
 (0)