Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explore support for multiproducer Sinks.Many with heavy contention #2850

Open
simonbasle opened this issue Dec 3, 2021 · 4 comments
Open
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...) status/need-decision This needs a decision from the team status/need-design This needs more in depth design work type/enhancement A general enhancement

Comments

@simonbasle
Copy link
Contributor

simonbasle commented Dec 3, 2021

Motivation

Currently, Sinks.many() produces sinks that fail fast in case parallel use is detected.

  • This allows to get immediate and explicit feedback from the tryEmitXxx API.
  • The downside is that it puts the onus on the user to ensure calls do not overlap.
    • So far, general recommendation has been to busy-loop.

Processors' FluxSink API on the contrary didn't have explicit feedback.
The default FluxSink were hardened against parallel usage by offering elements to a queue and ensuring a single producer would "win" and drain that queue.
This only worked because of the "no feedback" aspect, which was also the greatest drawback of this approach:

  • You could think the value you just emitted had been correctly processed, when it fact it was just offered to a queue.
  • Said queue could fail to drain entirely, due to eg. underlying FluxSink termination.

But we've received feedback that for Sinks.Many, the busy-loop approach is problematic if there is a lot of contention...

So we gained predictability and insight into each individual tryEmitXxx calls, at the expense of an all-purpose contention-heavy-compatible multithreaded approach.

Desired solution

Offer an alternative to the busy loop strategy to deal with multiple producers, without having each user reinvent the wheel.

Considered alternatives

This can come in the form of either of:

  1. guidance and documentation?
  2. new Many factory methods (eg. an overload of Sinks.many().multicast().onBackpressureBuffer())
  3. a new category under ManySpec (like multicast(), replay(), etc...)
  4. a new ManySpec (like unsafe())
  5. utility class(es) or wrappers (to some extent, a new API)
  6. adding methods to the API

Additional Context

Option 2: Add partial support as factory methods

It would likely mean we can incrementally offer partial support based on multiproducer queues.
unicast() spec already has an option which can use Queues.multiproducerUnbounded().

In multicast(), direct* flavors would be excluded. onBackpressureBuffer flavors could support multiproducer case but we would need a bounded MPSC queue implementation (since onBackpressureBuffer uses the bounded aspect of the queue).

In replay(), we'd probably need a complete parallel implementation of the concrete class, which implements a linked-list-like datastructure (or at least make 100% sure this structure is thread safe and lock-free).

Option 5a: Lightweight queue-draining utility

This could maybe take the form of a MPSC queue + a single-threaded object which holds the Sinks.Many and drains the queue to it.

This assumes that in case of contention, producers just offer to the queue and so they DON'T get the immediate feedback aspect of tryEmitNext (which is invoked by the drainer).

The drainer can be customized by users in order to decide what to do in case of EmitResult.isFailure() (with access to the remainder of the queue for retrying, etc...).

Option 5b: Heavyweight new multiproducer API

(explored in https://github.com/reactor/reactor-core/compare/sinksMultiproducer?expand=1)

We could devise an API that is close to Sinks.Many but focuses on solving the multiproducer case using an MPSC queue to accommodate a lot of contention, but WITH support for immediate feedback.

With a dedicated API, tryEmitNext can return something that represents the fact that failures don't necessarily apply to the T that was just passed to the sink. Instead, it represent the fact that the current thread was draining the queue and encountered a failure EmitResult on some arbitrary element currently in the queue.

By having this dedicated API we can return a pair of EmitResult and T, which gives the caller access to which value was left unprocessed. We can also expose the remainder of the queue, or decide to automatically clear it for terminated/cancelled cases...

When competing for the multiproducer sink, callers that DON'T end up draining the queue just receive a notification that their value has been submitted to a batch.

This better represent the work-stealing aspect of the approach to the user.

One drawback is that this approach assumes any caller can deal with an EmitResult.isFailure() for an arbitrary value that might have come from another caller. ie. processing of emit failures is the same across all usages of such a multiproducer sink.

Option 6: Adding methods to Sinks.Many

The idea above could maybe be integrated into the existing API in the form of a tryEmitNext variant that return a pair of EmitResult and <T>.

Challenge here is to avoid paying a cost of dealing with MPSC queue in the case of Sinks.many().unsafe().
Most implementations could default to delegating to tryEmitNext(T value) and simply always return value as the <T> part of the pair? Only when explicitly enabling MPSC-safe trait would we obtain something that can return EmitResult.SUBMITTED_TO_BATCH or a failure with a <T> != value...

@simonbasle simonbasle added type/enhancement A general enhancement status/need-decision This needs a decision from the team status/need-design This needs more in depth design work for/team-attention This issue needs team attention or action labels Dec 3, 2021
@simonbasle simonbasle added this to the 3.4.x Backlog milestone Dec 3, 2021
@jryan128
Copy link

Any news on this? I'm finding it really hard to take multiple Web Sockets' messages into a single Sink to process each message one-by-one.

@chemicL chemicL modified the milestones: 3.4.x Backlog, 3.6.x Backlog Mar 18, 2024
@deepak5127
Copy link

Is there any update on this issue ?

@chemicL
Copy link
Member

chemicL commented Dec 18, 2024

@jryan128 @deepak5127 there have been other priority things that we focused on so far, but this subject might require revisiting. Can you please elaborate on the proposal in the description and describe the user perspective and expectations? Several options are listed, each with different UX and design trade-offs. Were we to invest into this, we'd need to consider more thorough descriptions of the use cases, current API limitations and desired outcome from both the user-friendliness aspect but also "correctness" as to ACK-modes and notification of failures/success. Thanks for the input in case this is a needed feature.

@chemicL chemicL added for/user-attention This issue needs user attention (feedback, rework, etc...) and removed for/team-attention This issue needs team attention or action labels Dec 18, 2024
@jryan128
Copy link

I think for me getting a callback with failed emissions is good enough. I can at least retry them later or log them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...) status/need-decision This needs a decision from the team status/need-design This needs more in depth design work type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

5 participants