Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Fine grained serialization #4897

Closed
wants to merge 5 commits into from

Conversation

madsbk
Copy link
Contributor

@madsbk madsbk commented Jun 9, 2021

Warning, this is very much work-in-progress

This PR implements fine grained serialization by only serializing none-msgpack-serializable objects. E.g.:

task = (add, (add, 1, 2), 3)  # Nested task
ser = (SerializedCallable(add), (SerializedCallable(add), 1, 2), 3) # Serialized 

Motivation

In main we serialize:

  • Nested tasks using to_serialize() on both the function and all its arguments
  • Non-nested tasks using dumps_function() on the function and pickle.dumps() on its arguments.

This means that once serialized, we cannot access or modify the function arguments, which can be a problem: #4673.
Also, this means that we have to separate code paths for nested and non-nested tasks in the Scheduler to the Worker.

The Protocol

  • Fined grained serialization -- only none-msgpack-serializable objects are serialized
  • Never de-serialize on the Scheduler (other than the implicit msgpack de-serialization)
  • Always de-serialize on the Client
  • Delay de-serialize on the Worker until task execution:
    • Except when receiving scattered data from the Scheduler or Client
    • or when receiving data from other Workers e.g. when gathering data dependencies.

Notice

  • Since we do not serialize task arguments, we have to handle the implicit convert of lists to tuples by msgpack (see msgpack_persist_lists())
  • We do not necessarily serialize all Computations thus a task graph such as {"x": None} will result in a task just containing None. This is a potential problem for the Scheduler, which we have to handle.
  • The goal here is not performance, the goal is to simplify the serialization and make it clear what we need from the HLG pack and unpack functions.
  • This PR doesn't require any changes to Dask but in a follow up PR I will use this new clean serialization protocol to simplify the HLG pack and unpack functions.
  • With this PR, it is my hope that we can improve performance by implementing single pass serialization.

@mrocklin
Copy link
Member

Some things that feel good and not good about this approach:

Good

We're reducing the number of approaches / specifications of tasks in the client/scheduler/worker. This is a big step in the right direction.

Bad

We're exposing more structural information to Dask infrastructure than we need to. What was previously a black box that got passed around is now very transparent. As mentioned this introduces some pain around tuple/list distinctions, and possibly other interactions between tasks and our messaging / serialization. In general this feels like a step in the wrong direction.

Thoughts

To me it seems like there is a tension between the following two desires:

  1. We want the client and worker to pass each other tasks as a black box, an opaque blob of bytes. This makes message passing simple
  2. The scheduler is now getting involved in task creation due to high level graphs

So we want something that is both a black box to the scheduler and also not a black box to the scheduler. That seems unfortunate. This PR takes the approach that "well, given that the scheduler is involved, let's just accept that and expose the Dask task spec to the scheduler so that it can make tasks like the client (except of course that things like callables will be pre-serialized". This makes sense. It feels better than what we have today.

However, if we're going to make a big change like this, then it makes me wonder what other big changes are available to us. This is a step in a good direction, but is there a step in a better direction?

Alternative

As an alternative let me propose the following. While Dask has a task specification that looks like this (add, (sum, [1, 2, 3]), "x") we don't need to use that task specification in the Dask.distributed machinery. We can do something totally different.

I don't know what would be better necessarily, but to start a conversation I'll propose something different. Dask.distributed represents a task as a bytestring payload and a msgpack serializable dictionary of kwargs. Using Python syntax:

class Task:
    payload: bytes
    kwargs: dict

In a common case without high level graphs, probably kwargs would be empty most of the time. In the case of high level graph layers we would have a payload from the client, which would deserialize into some callable, and then the worker would **splat the kwargs into that callable. The scheduler would be confined to the parametrizing within this dictionary, rather than within the full task.

I don't think that this is necessarily a good plan though, it's just a different one. I think it would be good for us to think through the different options out in the open before we go down one path or another.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[DISCUSSION] Can the scheduler use pickle.dumps? [Discussion] Serialize objects within tasks
2 participants