Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support non-blocking MailboxReceivedOperator #9615

Closed
Tracked by #9273
walterddr opened this issue Oct 18, 2022 · 6 comments · Fixed by #9753
Closed
Tracked by #9273

Support non-blocking MailboxReceivedOperator #9615

walterddr opened this issue Oct 18, 2022 · 6 comments · Fixed by #9753
Assignees
Labels
Design Review PEP-Request Pinot Enhancement Proposal request to be reviewed.

Comments

@walterddr
Copy link
Contributor

walterddr commented Oct 18, 2022

Currently MailboxReceivedOperator

  • does a busy wait on all mailbox
  • always pick the next mailbox to check in exact order instead of round-robin pick or random

propose to create a non-blocking mailbox receive operator
several thoughts:

  • consolidate multiple ArrayBlockingQueue in each observer into just sharing a single queue thus the mailbox receive operator waits for signals from the blocking queue offer
  • create a signaling mechanism to info mailbox receive operator thread to start reading directly from the notifying observer

also the same mechanism should also work for the local passthrough mailbox design see #9484

@walterddr walterddr changed the title Support non-blocking MailboxReceivedOperator and non-blocking final mailbox reducer at broker Support non-blocking MailboxReceivedOperator Oct 18, 2022
@agavra
Copy link
Contributor

agavra commented Oct 18, 2022

I think we should consider our thread utilization end-to-end. Specifically, in our multistage engine if we consider all the operators between a MailboxReceiveOperator and a MailboxSendOperator as a single "stage" we need to make sure that the stage only runs when there's work to be done and that the threads that are doing that work are efficiently shared between potentially many concurrent queries.

While investigating #9563 I discovered that in order to really get non-blocking behavior we need the entire stage to be non-blocking (e.g. the entire stage should be deferred while there's no work to do, so that the thread pool running it can be freed up to schedule another task)

Ideally, I'd suggest something like:

image

This gives us a few improvements over the existing architecture.

  1. the obvious advantage is that it's reactive and will only stage execution when there's work to be done (we don't block threads/work pools unless work is being done)
  2. it allows us to control memory and thread utilization across multiple queries that are running on a single node, while applying backpressure on servers if we aren't able to keep up (e.g. we're doing some complex aggregation on the intermediate stage nodes)
  3. I'm not totally sure this is possible with GRPC the way it's written today, but if we could figure out a way to reuse byte buffers from incoming MailboxContent we could likely be more efficient in terms of GC
  4. Debug-ability and observability are pretty awesome if this is done correctly because you can easily introspect each part of the pipeline (data was received, a task was scheduled, a task completed the data that was passed to it, a task is idle)

We could certainly do this in stages - commenting on your specific considerations:

consolidate multiple ArrayBlockingQueue in each observer into just sharing a single queue thus the mailbox receive operator waits for signals from the blocking queue offer

I think this is exactly right - I'd suggest we even consolidate it across queries (and then use a simple in-memory scheduler to forward the right data to the right tasks) but that's probably a bigger change.

create a signaling mechanism to info mailbox receive operator thread to start reading directly from the notifying observer

also +1, but the subtle difference with what I suggest here is that it doesn't signal to the mailbox receive operator specifically, but rather the executor that schedules the receiver operator in first place. basically, there would be no mailbox receive operator thread allocated unless there's work to be done.

Happy to start prototyping some of this stuff up :)

@siddharthteotia
Copy link
Contributor

May be this is also a good time / opportunity to rethink from the operator level. Here are some thoughts...

  • Think of operator chain as a pipeline and data can be pumped up / down in / out of operators.
  • Some operators can produce / output data (technically data blocks / record batches) upon input / consume. Example PROJECT.
  • Some operators need to consume everything before produce / output data. Example GROUP BY

Operators have a state machine something along the lines .....

  • SETUP / INIT
  • CAN_CONSUME
  • CAN_CONSUME_FROM_LEFT
  • CAN_CONSUME_FROM_RIGHT
  • CAN_PRODUCE
  • BLOCKED
  • DONE

Operator abstraction with sub-interfaces

  • Producer (e.g API produceData, outputData)
  • SingleConsumer (e.g API consumeData, noMoreToConsume). Can also be a Producer
  • DualConsumer (e.g HashJoin). Can also be a Producer

Pipe:

  • An abstraction representing a collection of operators to enable pumping data between operators
  • provides method pump()
  • Works with source / sink concept for upstream / downstream
  • pump() can return
    -- DONE essentially also setting noMoreToConsume on sink because upstream is done producing
    -- PUMPED / IN_PROGRESS (upstream / source has output batch for downstream / sink to consume)
    -- UPSTREAM_NOT_READY if sink is CAN_CONSUME, but source is not ready to produce
    -- DOWNSTREAM_NOT_READY if sink itself is blocked or is holding batches that it is yet to produce / output and thus can't consume more (yet) from upstream

During execution planning when physical operators are created is when we setup the Pipeline.

we need to make sure that the stage only runs when there's work to be done and that the threads that are doing that work are efficiently shared between potentially many concurrent queries

+1

  • Yes, I think we need executors at the inner stage level to execute the pipelines inside the Stage.
  • Each such executor will execute a pipeline inside a single thread. It calls pump() on the pipeline (described above).
  • It has the capability to be scheduled, rescheduled, yield etc.
  • Through the state machine, we need to know when work is available
  • We need to wake up this executor when work is available to be done for the pipeline

@61yao
Copy link
Contributor

61yao commented Oct 18, 2022

I like the idea to make operator a state machine. It is more extensible. If an operator needs to be retried, timed out or checkpoint in the future, we can easily extend that.

Since we are thinking about the architecture point of view, we should consider isolation/priority in the thread pool. Otherwise, it will easily exhaust the server resources.

This sounds like a fit for actor model? :)

@agavra
Copy link
Contributor

agavra commented Oct 18, 2022

Had some a quick discussion with @walterddr and wanted to jot down those thoughts here so I don't forget:

  • there's two orthogonal design considerations: there's parallelism within a pipe/stage and there's scheduling across different pipe/stages (potentially across queries as well). The comment that I had posted is mostly regarding the latter. I think @siddharthteotia's comment is mostly about the former (though it has good thoughts on both).
  • implementing backpressure via GRPC on a per-mailbox level is possible, but we need to take care to make sure that the backpressure is piped all the way back to the leaf server execution itself. If you look at QueryRunner, there's currently no way to apply backpressure as it executes the entire leaf node requests before creating/sending data in the MailboxSendOperator. This is possible, but just requires some work
  • "Some operators need to consume everything before produce / output data. Example GROUP BY", for now I'll refer to these operators as stateful as opposed to stateless operators (SORT and the broadcast part of HASH_JOIN fall into this category as well).
  • I think it's important that all pipes/stages support partial execution - basically you can schedule an op chain and it will do all the work that it can do and then terminate, even if it doesn't see an EOS block or produce any data (in the case of stateful operators, stateless ones will always produce data). The partial state is maintained so that when it is rescheduled (when another block is available) it can continue when it left off.

Some thoughts on @siddharthteotia's comments specifically:

Yes, I think we need executors at the inner stage level to execute the pipelines inside the Stage.

I'm a bit confused about the terminology here. Perhaps we can standardize on "operator" as a single unit of work, "operator chain" as the smallest schedule-able set of operators and "stage" as a complete remote-receive-to-remote-send set of pipelines. Today "operator chain" is always equivalent to a stage. I think that was how you were using that terminology.

With these definitions, I think we want executors to be independent of the number of stages/chains that are currently running on a multistage intermediate server. That might make QoS (quality of service) and thread pool management a little difficult - IMO one fixed thread pool and a priority-aware scheduler can get us pretty far so long as each pipeline can be scheduled independently and we have (see below) a mechanism to split chains.

Operators have a state machine something along the lines

I really like this idea, this will also help us in the case where we want to split stages into additional local stages - and we can leverage the work @ankitsultana was working on in #9484 to increase parallelism without needing to introduce a parallel processing framework within a single stage/task.

@agavra
Copy link
Contributor

agavra commented Oct 19, 2022

Adding notes from offline discussion with @walterddr @61yao @siddharthteotia. Feel free to add additional details if I missed anything.

Design notes:

  1. the shared buffer itself should have a notion of fairness to make sure that it doesn't fill up with data all from a single query
  2. the operator chain task pool should wake up on either (a) new incoming data for the corresponding mailbox or (b) a new operator chain is registered for execution to make sure that there is no race between registering a new operator chain and receiving data from an upstream sending node
  3. the shared buffer should have configurable limits, ideally on both data size as well as number of blocks

Discussion around out-of-scope considerations:

  1. retrying failed operator chains/queries (note: cascading a failure is in scope and should leverage Query Preemption)
  2. implementing parallelism within an operator-chain
  3. implementing pipelining or partition-level parallelism of operator chains
  4. for v1, the operator chain scheduler will be round-robin. it should be pluggable to support priority scheduling that can ensure fairness but those implementations are out of scope

Next steps, @agavra to come up with a PEP document with more implementation and design details after some prototyping.

@agavra
Copy link
Contributor

agavra commented Oct 27, 2022

Putting a link to a design doc here: https://docs.google.com/document/d/1XAMHAlhFbINvX-kK1ANlzbRz4_RkS0map4qhqs1yDtE

I'll schedule a design review, if anyone following this ticket would like to be a part of it just ping me on the OSS Slack with your email and I can add you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Design Review PEP-Request Pinot Enhancement Proposal request to be reviewed.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants