-
Notifications
You must be signed in to change notification settings - Fork 157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature request: RestartWithBackOffAndFallback for Flow & Sink #980
Comments
You can do this already due to this change #252 which has already landed. You can just make your own supervision decider, i.e. val decider: Supervision.Decider = { e =>
logger.error("Unhandled exception in stream", e)
Supervision.Stop
} or am I missing something? |
So, my initial problem was that I have a substream which has a fileIO in it, the fileIO was only a safety measure, so if we loose the db connection we don't loose (too much) data. Then one day the fileIO failed (we couldn't recreate it bcs it opened a file which failed due to hosting provider issues), and it brought down the whole stream, and we lost hours of data when we finally noticed that the stream is down... My idea was to lazily create a sink, if it fails I can recreate it, if it fails multiple times I can simply change it to a |
@tg44 So if by Sub Stream you mean What I mean by partially is that as is noted in the migration notes we don't support @He-Pin Aside from what I just said I don't think we need to add anything else for Pekko, it appears that @tg44 just wants support for automatic restarting of |
@mdedetrich I think what's @tg44 needs is :
Data Source X
|
\|/
Datas --> Process A ----> Process FileIO Sink (main, can retry with backoff restart)
|
| (only switch after the main sink die and restart up to the max limit)
|
|----> fallback Ignore Sink(logging) maybe.
And I think this is what he really want. |
@He-Pin But he is talking about Then you can just do val restartingDecider: Supervision.Decider = { e =>
logger.error("Unhandled exception in stream, restarting", e)
Supervision.Restart
}
source
.splitAfter(_ == somePredicate)
.withAttributes(ActorAttributes.supervisionStrategy(restartingDecider)) |
@mdedetrich But your proposal can not swith to an I see you just updated a pr, what about extends |
An out of box seems better, I think some kind of ZIO ZSink's |
My usecase looks like this;
I think we don't have a Sink which can handle the right side of the graph. It could be something like; def sinkWithFallback[T, Mat](factory: () => (exception: Option[Throwable]) => Sink[T, Mat]): Sink[T, Mat] And it should be easily implemented from (We need the factory to be like in () => {
var retries = 0
(exception: Option[Throwable]) => {
retries += 1
if(retries > 100) {
Sink.ignore
} else {
someProbablyFailingSink
}
}
} also, sorry if I'm not using proper scala syntax, I code in ts a lot recently :( ) |
@tg44 So just to confirm, buy sub stream you didn't mean Also is there a reason why you aren't using |
@mdedetrich Nah, when I started to work with akka streams we tend to call every "part of the stream after a fanout" as a substream, and For me the fileIO is kinda a log to a file, so I choose wireTap with an intention, I have buffers and logrotator and everything in between the wireTap and the actual fileIO, I can allow to not write everything to the fileLog, but I need to write everything to the database as fast as possible. |
Yes, I love your contritions @tg44 and I used it too. And I think what's you did is some kind of Isolate strong and weak dependencies to prevent strong dependencies from being affected by weak dependencies |
Okay so in the end this seems to be a request for an easier dsl for constructing a specific Now that I think of it, this honestly seems to be asking for a |
@mdedetrich That's true, but we can extends to |
Motivation:
I saw in akka/akka#30267 @nvollmar added this feature for actor, but I think it would be nice to have this in pekko-stream, where user can logging the error with a restart/resume, and currently ,we just simply ignore it.
I found the related case from @tg44 in https://discuss.lightbend.com/t/recover-from-sink-exceptions/10554
I think this should be very useful for real production usecase.
Result:
More handy control over the exceptions
The text was updated successfully, but these errors were encountered: