-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shuffle Service with Scheduler Logic #6007
Conversation
This manages memory more smoothly. We still have issues though in that we're still passing around slices of arrow tables, which hold onto large references
This helps to reduce lots of extra unmanaged memory This flow pretty well right now. I'm finding that it's useful to blend between the disk and comm buffer sizes. The abstraction in multi_file and multi_comm are getting a little bit worn down (it would be awkward to shift back to pandas), but maybe that's ok.
Isn't solid yet
This avoids a race
We don't need a lot of comm buffer, we also don't want more connecitons than machines (too much sitting in buffers). We also improve some printing
To enable better diagnostics, it would be useful to allow worker extensions to piggy-back on the standard heartbeat. This adds an optional "heartbeat" method to extensions, and, if present, calls a custom method that gets sent to the scheduler and processed by an extension of the same name. This also starts to store the extensions on the worker in a named dictionary. Previously this was a list, but I'm not sure that it was actually used anywhere. This is a breaking change without deprecation, but in a space that I suspect no one will care about. I'm happy to provide a fallback if desired.
Tests are failing. I can't reproduce locally. This is just blind praying that it fixes the problem. It should be innocuous.
Make sensitive to failed workers during input phase
This should hope to avoid some windows permissions errors
Tests are green(ish). This could use some review and some help in breaking things. I haven't gotten too creative in trying to break this, but so far it holds up pretty well (better than what's currently in main) in situations where we don't lose any workers. |
This is at a point where, I think, it could be merged. It does not succeed in cases where workers fail, or where outputs shift during execution (although neither does the solution in main), but it feels pretty solid in the common case. There is a proposed plan for a next step in #6105 . This PR hasn't had deep review yet (no one is keeping me honest around docstrings, code cleanliness, and so on) but it's been about a month so far without that review, and I do plan to keep working on this. I'm inclined to strip out link to my mrocklin/dask fork, merge that in, and then merge this in. I won't do this solo (this is big enough that that doesn't seem right), but I'll probably start pestering people to get this in by early next week. |
Merging in one week if there are no further comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made it mostly through shuffle_extension.py
so far. These are some preliminary comments. Some are nits/tweaks/asks for documentation, but there are at least 2-3 that I think are serious issues (generally around not handling errors in concurrency #6201) that could result in deadlocks or silently incorrect results.
ShuffleWorkerExtension, | ||
) | ||
|
||
__all__ = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
__all__
usually makes flake8 and typecheckers happier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about this. My understanding is that there isn't anything defined in this module that isn't listed in __all__
. Why would __all__
be informative in this case?
if file.tell() == 0: | ||
file.write(schema.serialize()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem thread-safe. Probably good to mention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Done.
bio = io.BytesIO() | ||
bio.write(schema.serialize()) | ||
for batch in data: | ||
bio.write(batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a lot more memory copying than I would expect with Arrow. I assume improving this isn't important here, and would just be in scope for future performance tuning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You and me both. Yeah, Arrow is great if you use all Arrow primitives, but if you want to compose it with other things it's not that great because of all of the views and the lack of a deserialize function. If we stick with Arrow (maybe?) then we should upstream a bunch of issues for this use case so that it gets cleaner in future releases.
I actually did a bit of performance tuning here. Memory copies aren't significant yet, but I've reduced them to the extent that I can.
|
||
def shuffle_inputs_done(self, comm: object, shuffle_id: ShuffleId) -> None: | ||
shuffle = await self._get_shuffle(shuffle_id) | ||
future = asyncio.ensure_future(shuffle.receive(data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if this task fails, but nothing's awaiting it? receive
can raise (and even intentionally raises) exceptions. Something should be done to keep track of it. I think that piece of data would just be silently lost, producing incorrect output.
This can also allow memory to build up in the receive
tasks, before they've started writing to the multi_file. I don't think there's any limitation on concurrent receive calls? At the beginning of a shuffle with 1000s of peer workers, couldn't you even start backing up behind the offload threadpool?
Those two issues make me wonder if we should always be awaiting something here. Both because if the data can't be successfully written to disk, that should probably be an error that's propagated back to the sender, and because it would give more useful backpressure.
I get not wanting to block sends on data fully writing to disk. That would probably slows us down a bit, especially at the beginning of a large shuffle. But as I understand, performance tuning is a separate, later step, and not always awaiting shuffle.receive
here feels like a tricky optimization that currently risks both incorrectness and blowing up memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a valid concern. I don't yet have a great answer to this yet. We can raise the exception here early if we want to (avoids hiding the exceptions that we're intentionally raising, but doesn't handle the unknown ones).
If you're cool with it, I'd like to put in a TODO here so that we don't forget, and then defer this to future work.
) | ||
|
||
def close(self): | ||
self.executor.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd think there's more to do than this? Stop any active shuffles? Clean up disk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good thought. I've changed this to close all shuffles (which in turn clean up multi_files (which in turn clean up disk)).
There is now also a test for closing workers mid-shuffle. (with a couple of TODOs as well for the next round)
except KeyError: | ||
queue = asyncio.Queue() | ||
for _ in range(MultiComm.max_connections): | ||
queue.put_nowait(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me a bit to understand that this queue works the opposite way you'd expect—it's not pushing data in, it's pushing permission to write more back out. Sort of a form of CapacityLimiter
? I think this would help to document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a note in the docstring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the comment. Took me a bit to understand this myself.
I think factoring this out should not be scope of this PR but I could see value in doing it regardless.
I think this is a case for a follow up ticket. @gjoseph92 would you mind opening one?
Co-authored-by: Gabe Joseph <[email protected]>
@gjoseph92 I know that you're travelling, so no pressure, but if you have time for another pass that would be welcome. |
except KeyError: | ||
queue = asyncio.Queue() | ||
for _ in range(MultiComm.max_connections): | ||
queue.put_nowait(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the comment. Took me a bit to understand this myself.
I think factoring this out should not be scope of this PR but I could see value in doing it regardless.
I think this is a case for a follow up ticket. @gjoseph92 would you mind opening one?
from distributed.utils import log_errors | ||
|
||
|
||
class MultiFile: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with both. I'm not entirely sure if the classes themselves should be the same or if some common functionality can be shared (e.g. factor our the queue stuff as a CapacityLimiter
(xref https://github.com/dask/distributed/pull/6007/files#r862239876)
Good to go? |
Builds off of #5976 which in terms builds off of #5957
Commit message of the first novel commit:
Previously we assigned a partition to a worker based on a simple
formula. This was good, but also error prone in a few ways:
Now we as the scheduler to manage this process, and all of the workers
check in with it to get assignments. Currently the actual logic is the
same, but now things get to live in a single location where we can make
smarter decisions to avoid conflicts.
This removes the previous shuffle setup steps on the worker, simplifying
the code a bit (subjectively anyway). It makes the compute steps a bit
worse because now we have a small pandas join in the middle of things.
It should already avoid excess labor. It does not yet avoid restriction
conflicts.