Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

W-16941297: Scatter Gather timeout exception #14192

Merged
merged 27 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5c2b698
handling timeout exception
anandkarandikar Jan 22, 2025
d1b7b3a
Attempt with ExecutorService to process connection closing in the bac…
anandkarandikar Jan 27, 2025
5806098
Remove changes from AbstractEventContext
anandkarandikar Jan 28, 2025
ec561bd
Make ChildEventContext accessible and handle timeout exception in Abs…
anandkarandikar Jan 28, 2025
89d854e
Switch to ioScheduler
anandkarandikar Jan 31, 2025
59dbd95
Change visibility to private for ChildEventContext
anandkarandikar Jan 31, 2025
fce0083
Make AbstractEventContext public
anandkarandikar Jan 31, 2025
7540dc8
Use AbstractEventContext instead of ChildEventContext
anandkarandikar Jan 31, 2025
f3d2532
Remove unused import
anandkarandikar Jan 31, 2025
b1b1390
Handling ClassCastException failure
anandkarandikar Feb 3, 2025
3a90f7d
Merge branch 'master' into fix/W-16941297
anandkarandikar Feb 6, 2025
e66cf94
Add delay to sayMagicWords
anandkarandikar Feb 6, 2025
abadb33
Add tests for Scatter Gather timeout and non timeout scenario
anandkarandikar Feb 7, 2025
835e844
Apply formatter
anandkarandikar Feb 7, 2025
eceee29
Merge branch 'master' into fix/W-16941297
anandkarandikar Feb 7, 2025
8c3e66f
Add assertPayloadIsIteratorProvider check
anandkarandikar Feb 7, 2025
7c2045a
Replace Thread.sleep with CountDownLatch
anandkarandikar Feb 7, 2025
b24c317
Fixing model and schema file for marvel extension
anandkarandikar Feb 7, 2025
82e2d38
Revert the version to @mule.runtime.version@
anandkarandikar Feb 7, 2025
80b3412
Using assertThrows
anandkarandikar Feb 7, 2025
069b02f
Merge branch 'master' into fix/W-16941297
anandkarandikar Feb 8, 2025
fa265ea
Fixing test to make sure it fails when the fix is not present
asanguinetti Feb 11, 2025
282b83b
Avoiding submit, completing the child contexts on timeout emission
asanguinetti Feb 12, 2025
1ade55f
Comment
asanguinetti Feb 12, 2025
eb7d5f3
Fixing nested routes and adding more tests
asanguinetti Feb 12, 2025
ec2489b
Improve tests
asanguinetti Feb 12, 2025
1e076e1
Kill switch
asanguinetti Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
*
* @since 4.0
*/
abstract class AbstractEventContext implements SpanContextAware, BaseEventContext {
public abstract class AbstractEventContext implements SpanContextAware, BaseEventContext {
anandkarandikar marked this conversation as resolved.
Show resolved Hide resolved

private static final int STATE_READY = 0;
private static final int STATE_RESPONSE_RECEIVED = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import static org.mule.runtime.core.api.event.CoreEvent.builder;
import static org.mule.runtime.core.internal.exception.ErrorHandlerContextManager.ERROR_HANDLER_CONTEXT;
import static org.mule.runtime.core.internal.routing.ForkJoinStrategy.RoutingPair.of;
import static org.mule.runtime.core.privileged.processor.MessageProcessors.processWithChildContextDontComplete;

import static java.lang.Boolean.parseBoolean;
import static java.lang.Long.MAX_VALUE;
import static java.util.Optional.empty;
import static java.lang.System.getProperty;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -74,6 +74,10 @@
*/
public abstract class AbstractForkJoinStrategyFactory implements ForkJoinStrategyFactory {

// TODO W-17814249: Remove kill switch
private static final boolean COMPLETE_CHILDREN_ON_TIMEOUT =
parseBoolean(getProperty("mule.forkJoin.completeChildContextsOnTimeout", "true"));

public static final String TIMEOUT_EXCEPTION_DESCRIPTION = "Route Timeout";
public static final String TIMEOUT_EXCEPTION_DETAILED_DESCRIPTION_PREFIX = "Timeout while processing route/part:";
private final boolean mergeVariables;
Expand Down Expand Up @@ -171,30 +175,34 @@ private Function<RoutingPair, Publisher<Pair<CoreEvent, EventProcessingException
return pair -> {
ReactiveProcessor route = publisher -> from(publisher)
.transform(pair.getRoute());
return from(processWithChildContextDontComplete(pair.getEvent(),
applyProcessingStrategy(processingStrategy, route, maxConcurrency),
empty()))
.timeout(timeout,
onTimeout(processingStrategy, delayErrors, timeoutErrorType,
pair),
timeoutScheduler)
.map(coreEvent -> new Pair<CoreEvent, EventProcessingException>(
((DefaultEventBuilder) CoreEvent
.builder(coreEvent))
.removeInternalParameter(ERROR_HANDLER_CONTEXT)
.build(),
null))
.onErrorResume(MessagingException.class,
me -> getPublisher(delayErrors, me));
route = applyProcessingStrategy(processingStrategy, route, maxConcurrency);

// TODO W-17814249: Remove kill switch
RoutePairPublisherAssemblyHelper routePairPublisherAssemblyHelper = COMPLETE_CHILDREN_ON_TIMEOUT
? new DefaultRoutePairPublisherAssemblyHelper(pair.getEvent(), route)
: new LegacyRoutePairPublisherAssemblyHelper(pair.getEvent(), route);

return from(routePairPublisherAssemblyHelper.getPublisherOnChildContext())
.timeout(timeout,
routePairPublisherAssemblyHelper
.decorateTimeoutPublisher(onTimeout(processingStrategy, delayErrors, timeoutErrorType, pair)),
timeoutScheduler)
.map(this::eventToPair)
.onErrorResume(MessagingException.class, me -> getPublisher(delayErrors, me));
};
}

private Pair<CoreEvent, EventProcessingException> eventToPair(CoreEvent coreEvent) {
return new Pair<>(((DefaultEventBuilder) CoreEvent.builder(coreEvent))
.removeInternalParameter(ERROR_HANDLER_CONTEXT)
.build(), null);
}

private Publisher<Pair<CoreEvent, EventProcessingException>> getPublisher(boolean delayErrors, EventProcessingException me) {
Pair<CoreEvent, EventProcessingException> pair = new Pair<>(me.getEvent(), me);
return delayErrors ? just(pair) : error(me);
}


private Mono<CoreEvent> onTimeout(ProcessingStrategy processingStrategy, boolean delayErrors, ErrorType timeoutErrorType,
RoutingPair pair) {
return defer(() -> delayErrors ? just(createTimeoutErrorEvent(timeoutErrorType, pair))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.runtime.core.internal.routing.forkjoin;

import static org.mule.runtime.core.internal.event.EventQuickCopy.quickCopy;
import static org.mule.runtime.core.privileged.processor.MessageProcessors.newChildContext;
import static org.mule.runtime.core.privileged.processor.MessageProcessors.processWithChildContext;

import static java.util.Optional.empty;

import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.internal.event.AbstractEventContext;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.exception.MessagingException;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

/**
* Default implementation of {@link RoutePairPublisherAssemblyHelper} which takes care of completing the child contexts in case of
* timeouts.
*/
class DefaultRoutePairPublisherAssemblyHelper implements RoutePairPublisherAssemblyHelper {

private final Publisher<CoreEvent> publisherWithChildContext;
private final BaseEventContext childContext;

DefaultRoutePairPublisherAssemblyHelper(CoreEvent routeEvent, ReactiveProcessor chain) {
// This sequence is equivalent to processWithChildContextDontComplete (used in the legacy version), only this way we can keep
// track of the child context, which we will need later on.
this.childContext = newChildContext(routeEvent, empty());
this.publisherWithChildContext = processWithChildContext(routeEvent, chain, childContext);
}

@Override
public Publisher<CoreEvent> getPublisherOnChildContext() {
return publisherWithChildContext;
}

@Override
public Publisher<CoreEvent> decorateTimeoutPublisher(Publisher<CoreEvent> timeoutPublisher) {
// When the timeout happens, the subscription to the original publisher is cancelled, so the inner MessageProcessorChains
// never finishes and the child contexts are never completed, hence we have to complete them manually on timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// never finishes and the child contexts are never completed, hence we have to complete them manually on timeout
// never finish and the child contexts are never completed, hence we have to complete them manually on timeout

return Mono.from(timeoutPublisher)
.doOnSuccess(completeRecursively(childContext, BaseEventContext::error));
}

private Consumer<CoreEvent> completeRecursively(BaseEventContext eventContext,
BiConsumer<BaseEventContext, MessagingException> forEachChild) {
return some -> {
// Tracks the child contexts first and then completes them, that way we are not retaining the read lock all the time.
Deque<BaseEventContext> allContexts = new ArrayDeque<>();
allContexts.push(eventContext);
((AbstractEventContext) eventContext).forEachChild(allContexts::push);

while (!allContexts.isEmpty()) {
BaseEventContext ctx = allContexts.pop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth clarifying why we iterate in reverse order in this part

// Some context may already be terminated, as the children can propagate the termination up to their parents.
if (!ctx.isTerminated()) {
// It is important to swap the context so it has the right flow stack among other things.
forEachChild.accept(ctx, new MessagingException(quickCopy(ctx, some), some.getError().get().getCause()));
}
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.runtime.core.internal.routing.forkjoin;

import static org.mule.runtime.core.privileged.processor.MessageProcessors.processWithChildContextDontComplete;

import static java.util.Optional.empty;

import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;

import org.reactivestreams.Publisher;

/**
* Implementation of {@link RoutePairPublisherAssemblyHelper} which does not complete the child contexts on timeout.
* <p>
* Exists as a fallback for the kill switch COMPLETE_CHILDREN_ON_TIMEOUT.
*/
// TODO W-17814249: Remove once the kill switch is removed
class LegacyRoutePairPublisherAssemblyHelper implements RoutePairPublisherAssemblyHelper {

private final Publisher<CoreEvent> publisherWithChildContext;

LegacyRoutePairPublisherAssemblyHelper(CoreEvent routeEvent, ReactiveProcessor chain) {
this.publisherWithChildContext = processWithChildContextDontComplete(routeEvent, chain, empty());
}

@Override
public Publisher<CoreEvent> getPublisherOnChildContext() {
return publisherWithChildContext;
}

@Override
public Publisher<CoreEvent> decorateTimeoutPublisher(Publisher<CoreEvent> timeoutPublisher) {
// Adds nothing to the timeout publisher
return timeoutPublisher;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.runtime.core.internal.routing.forkjoin;

import org.mule.runtime.core.api.event.CoreEvent;

import org.reactivestreams.Publisher;

/**
* Helper class to assist in assembling the routing pair publisher.
*/
interface RoutePairPublisherAssemblyHelper {

Publisher<CoreEvent> getPublisherOnChildContext();

Publisher<CoreEvent> decorateTimeoutPublisher(Publisher<CoreEvent> timeoutPublisher);
}
Loading