Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flux getting into hang state on any java.lang.Error or its subclasses #3036

Closed
kushagraThapar opened this issue May 8, 2022 · 2 comments
Closed
Labels
status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor

Comments

@kushagraThapar
Copy link

While a thread is getting data from Flux, it it encounters any java.lang.Error or its subclasses, Flux goes into a hang state causing the whole application to hang.

This is the sample thread dump where we would see the hang:

"main" #1 prio=5 os_prio=31 tid=0x00007fb7d1808800 nid=0x1003 waiting on condition [0x000070000279e000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000076be96458> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:179)
	at com.example.common.FluxHangIssue.main(FluxHangIssue.java:42)

Expected Behavior

Flux should not get into a hang state, rather should bubble up the error, or terminate / crash the process.

Actual Behavior

Goes into an infinite hang state.

Steps to Reproduce

This shows how to simulate the hang - https://github.com/kushagraThapar/cosmos-java-sdk-testing/blob/master/src/main/java/com/example/common/FluxHangIssue.java

public static void main(String[] args) throws InterruptedException {

        Flux<Integer> integerFlux = Flux.range(0, 7).map(number -> {
            logger.info("Number is : {}", number);
            if (number > 5) {
                throw new OutOfMemoryError("Custom GC Failure");
            }
            return number;
        }).doOnError(ex -> {
            logger.error("Completed exceptionally", ex);
        }).doOnNext(next -> {
            logger.info("Next is : {}", next);
        }).doOnComplete(() -> {
            logger.info("Completed successfully");
        }).doFinally(signalType -> {
            logger.info("Finally signal is : {}", signalType);
        }).onErrorMap(throwable -> {
            logger.info("On error map", throwable);
            return throwable;
        }).onErrorContinue((throwable, object) -> {
            logger.error("on error continue : {}", object, throwable);
        }).onErrorStop().onErrorReturn(6).onErrorResume(throwable -> {
            logger.info("on error resume", throwable);
            return Mono.error(throwable);
        }).subscribeOn(Schedulers.boundedElastic());

        Iterator<Integer> integers = integerFlux.toIterable().iterator();
        while(integers.hasNext()) {
            logger.info("Next value is : {}", integers.next());
        }

        logger.info("Going to sleep now");

        Thread.sleep(5000);

        logger.info("I woke up");
    }

Possible Solution

One solution that I think might work is to configure the timeout API on such flux, to avoid the hang and let reactor propagate the TimeoutException
This method shows how the hang can be avoided - https://github.com/kushagraThapar/cosmos-java-sdk-testing/blob/master/src/main/java/com/example/common/FluxHangIssue.java#L59

  • Reactor version(s) used: 3.4.14
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label May 8, 2022
@simonbasle simonbasle added status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels May 10, 2022
@simonbasle
Copy link
Contributor

Reactor doesn't attempt to recover from a VirtualMachineError, of which OutOfMemoryError is a subclass. When such _Error_s occur, there is no telling in which state the JVM is and wether or not all components of the application will be able to recover. Generally, they won't.

It is not the case with all Error subclasses, only the ones flagged by Exceptions.throwIfFatal(Throwable) and Exceptions.throwIfJvmFatal(Throwable) (including VirtualMachineError and subclasses, LinkageError, etc...)

@zhou-hao
Copy link

zhou-hao commented Jun 2, 2023

@simonbasle
In my case, I cannot fully control the logic of some code, how can I detect JvmError in Flux and then interrupt Flux instead of hanging.

Is it okay to use it this way?

Hooks.onOperatorError((err, val) -> {
        if (Exceptions.isJvmFatal(err)) {
            return new JvmErrorException(err);
        }
        return err;
    });
    Hooks.onNextError((err, val) -> {
        if (Exceptions.isJvmFatal(err)) {
            return new JvmErrorException(err);
        }
        return err;
    });
``

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor
Projects
None yet
Development

No branches or pull requests

4 participants