-
Notifications
You must be signed in to change notification settings - Fork 121
[PULP-467] Re-add timeout to immediate task with regression fix #6463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PULP-467] Re-add timeout to immediate task with regression fix #6463
Conversation
I'm hitting a wall here on implementing an async dispatch. Not sure its a good ideal at all. The image below illustrates how I perceive the issue. If we didnt use ExitStack, we could do some hacking to handle executing the cleanup code in both sync/async context, but I'm not confident in doing the proper error handling there. |
I was thinking in another approach to avoid having to implement async dispatch, at least for now. But I wonder if that wouldn't add too much overhead to the pull-through content serving branch. |
pulpcore/app/models/repository.py
Outdated
return dispatch(add_and_remove, kwargs=body, exclusive_resources=[self], immediate=True) | ||
task = dispatch(add_and_remove, kwargs=body, exclusive_resources=[self], immediate=True) | ||
# TODO: check if task completed succesfully, as its run in the worker now | ||
return task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we return anything at all? Should this not be a dispatch and forget action?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maybe it should.
I assumed that the request would fail when the task failed because it is immediate, but actually it may fail or silently ignore, as it may get defered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "request" is not interested in this task. It already got the artifact delivered and is happy at this point.
967eae6
to
ca4c621
Compare
pulpcore/tasking/tasks.py
Outdated
thread_name = threading.current_thread().name | ||
# ThreadPoolExecutor names threads like: "ThreadPoolExecutor-0_0" | ||
return "ThreadPoolExecutor" in thread_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish there was a better way, not using magic values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another approach is to use some context variables.
I think we could use threading.local()
or something like that.
https://docs.python.org/3/library/threading.html#thread-local-data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or even context variables. I'm not sure the best fit.
Do you prefer that we use something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I don't know. But after all, we want to be able to schedule the async task back to the main thread loop (once we figure out how two reliably...). So we should not over engineer this stop gap here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good reminder.
I've quickly tried contextvars. It passed the manual test locally but failed CI, so I reverted it.
The other requested changes are in place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make this comment much more scary. We really want to improve this part ASAP.
pulpcore/tasking/tasks.py
Outdated
@@ -174,6 +181,11 @@ def dispatch( | |||
|
|||
assert deferred or immediate, "A task must be at least `deferred` or `immediate`." | |||
|
|||
# Can't run short tasks immediately if running from thread pool | |||
force_defer = True if running_from_thread_pool() else False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True if True else False? This is not wrong, but way too complicated...
Where's the suggest feature when you need it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it reads better, but not big deal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well your function returns a boolean already. And you translate a boolean into the same boolean using the ternary operator. This is very verbose for nothing. Sorry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair
pulpcore/tasking/tasks.py
Outdated
# Can't run short tasks immediately if running from thread pool | ||
force_defer = True if running_from_thread_pool() else False | ||
short = immediate | ||
immediate = short and not force_defer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What this all means is
immediate &= not running_in_thread_pool()
Also we should move the assert from above below this or we might produce improper tasks again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was trying to enforce some semantics around that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also we should move the assert from above below this or we might produce improper tasks again.
Good point
0ee08f1
to
4e522da
Compare
FWIW, I'm testing this manually with the script in #6429 in an environment with npm plugin installed. |
4e522da
to
2c3a969
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you rebase this work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has this revert not been released?
Then we should not add any changelog at all for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this file should not be there anymore at this point in time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That has been released. I would also think that this should't be here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was just outdated
pulpcore/app/tasks/test.py
Outdated
@@ -14,6 +14,13 @@ def sleep(interval): | |||
sleep(interval) | |||
|
|||
|
|||
async def asleep(interval): | |||
"""Async function that sleeps.""" | |||
import asyncio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way we do imports in this file is unfortunate. Would you mind cleaning it up?
Let's do "import time" at the top, e.g..
This reverts commit 2b3c88a.
2c3a969
to
d004162
Compare
@@ -111,3 +113,29 @@ def general_multi_delete(instance_ids): | |||
with transaction.atomic(): | |||
for instance in instances: | |||
instance.delete() | |||
|
|||
|
|||
async def ageneral_update(instance_id, app_label, serializer_name, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I keep coming up with more and more stuff...
But i think we should deprecate the old tasks for deletion in pulp 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a deprecation message makes the CI unhappy.
Not sure how to handle that.
@@ -28,5 +31,5 @@ def dummy_group_task(inbetween=3, intervals=None): | |||
task_group = TaskGroup.current() | |||
for interval in intervals: | |||
dispatch(sleep, args=(interval,), task_group=task_group) | |||
sleep(inbetween) | |||
time.sleep(inbetween) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we are calling the proper function here. Thank you!
pulpcore/tasking/tasks.py
Outdated
thread_name = threading.current_thread().name | ||
# ThreadPoolExecutor names threads like: "ThreadPoolExecutor-0_0" | ||
return "ThreadPoolExecutor" in thread_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make this comment much more scary. We really want to improve this part ASAP.
d004162
to
db95127
Compare
Immediate/short tasks are now run inside an event loop, but that doesnt work inside a thread from a thread pool, like it happens with on the pull-through workflow from the content-app. This enforces that the short task can't run immediately if the dispatch is done from inside such a context. Closes: pulp#6429
db95127
to
8dcbc8c
Compare
def running_from_thread_pool() -> bool: | ||
# TODO: this needs an alternative approach ASAP! | ||
# Currently we rely on the weak fact that ThreadPoolExecutor names threads like: | ||
# "ThreadPoolExecutor-0_0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
Experiment with creating async version of dispatch to make async timeout work from the content app.