-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve type annotations in worker.py #5814
Conversation
@@ -1021,7 +1039,7 @@ def __init__( | |||
) | |||
self.periodic_callbacks["keep-alive"] = pc | |||
|
|||
pc = PeriodicCallback(self.find_missing, 1000) | |||
pc = PeriodicCallback(self.find_missing, 1000) # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Work around bug in tornado type annotations that declare that callback can only be a sync function, whereas we pass async functions (and they work fine).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unit Test Results 12 files ±0 12 suites ±0 7h 7m 58s ⏱️ + 3m 19s For more details on these failures, see this check. Results for commit 02ede76. ± Comparison against base commit a86f4bb. ♻️ This comment has been updated with latest results. |
@@ -1021,7 +1039,7 @@ def __init__( | |||
) | |||
self.periodic_callbacks["keep-alive"] = pc | |||
|
|||
pc = PeriodicCallback(self.find_missing, 1000) | |||
pc = PeriodicCallback(self.find_missing, 1000) # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to use a more specific ignore here:
pc = PeriodicCallback(self.find_missing, 1000) # type: ignore | |
pc = PeriodicCallback(self.find_missing, 1000) # type: ignore[arg-type] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm unsure how I feel around this. IMHO the generic ignore is already a lot of burden and having it fully specified would just detract from readability.
This PR conflicts with #5820. Whichever is merged last will need to be hammered a bit. |
a9ec0ab
to
e3401f0
Compare
Feel free to merge yours first |
Agreed during an offline chat to merge #5820 first |
This is ready for final review and merge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: state machine
I think I didn't find any changes to the logic. You were concerned about this, did I missing anything? The one place I found with a potential impact is the run_spec deserialization but as I commented, that's OK.
Re: active threads
I'm pretty sure this will break and I doubt we have a test for this race condition. If you revert this change, we're good to merge from my POV
if ts.run_spec is None: | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to be the only logical change in here. I checked the code again and this is safe right now.
There is an optimization possible where we store the deserialized runspec on this attribute (function, args, kwargs, see below) in case the task needs to be recomputed and is not forgotten in between. We're not doing this right now, maybe we did in the past. For the current state of the code this is fine
active_threads = self.active_threads.copy() | ||
frames = {k: frames[ident] for ident, k in active_threads.items()} | ||
sys_frames = sys._current_frames() | ||
frames = {key: sys_frames[tid] for tid, key in self.active_threads.items()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the active_threads
copy was necessary due to a threading race condition. I believe the active_threads
dict is modified by a thread and this modification is not thread safe. The modification would then raise a "changed size during iteration" exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the whole purpose of with self.active_threads_lock:
to avoid this?
Also, if the lock were not effective, you'd be relying on dict.copy()
to hold the GIL for the whole duration of the operation which - even if true - would be an implementation detail of CPython which can change at any moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pushed a minor tweak to the method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the whole purpose of with self.active_threads_lock: to avoid this?
Sorry, I missed the lock.
I found the issue I remembered and it was a different section of the code and indeed not protected by a lock, see #4729
Sorry for holding this up
@fjetter merging this as soon as tests are finished |
No description provided.