Skip to content

Commit

Permalink
Issue ReactiveX#718: Removed blocking code in Spring Reactor RetryOpe…
Browse files Browse the repository at this point in the history
…rator. (ReactiveX#814)
  • Loading branch information
alex-pumpkin authored Jan 29, 2020
1 parent 2c7325d commit ba36c42
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 89 deletions.
2 changes: 2 additions & 0 deletions libraries.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ext {
validationApiVersion = '2.0.1.Final'
kotlinCoroutinesVersion = '1.3.2'
springBootOpenFeignVersion = '2.1.2.RELEASE'
blockhoundVersion = '1.0.1.RELEASE'

libraries = [
// compile
Expand All @@ -53,6 +54,7 @@ ext {
reactor_test: "io.projectreactor:reactor-test:${reactorVersion}",
reactive_streams_tck: "org.reactivestreams:reactive-streams-tck:${reactiveStreamsVersion}",
mock_clock: "com.statemachinesystems:mock-clock:1.0",
blockhound: "io.projectreactor.tools:blockhound:${blockhoundVersion}",

// Vert.x addon
vertx: "io.vertx:vertx-core:${vertxVersion}",
Expand Down
1 change: 1 addition & 0 deletions resilience4j-reactor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ dependencies {
testCompile(libraries.reactor_test)
testCompile(libraries.assertj)
testCompile(libraries.reactive_streams_tck)
testCompile(libraries.blockhound)
}
ext.moduleName = 'io.github.resilience4j.reactor'
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

import io.github.resilience4j.reactor.IllegalPublisherException;
import io.github.resilience4j.retry.Retry;
import io.vavr.control.Try;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.Consumer;
import java.time.Duration;
import java.util.function.UnaryOperator;

/**
Expand All @@ -48,97 +49,71 @@ public static <T> RetryOperator<T> of(Retry retry) {
return new RetryOperator<>(retry);
}

/**
* to handle checked exception handling in reactor Function java 8 doOnNext
*/
private static <T> Consumer<T> throwingConsumerWrapper(
ThrowingConsumer<T, Exception> throwingConsumer) {

return i -> {
try {
throwingConsumer.accept(i);
} catch (Exception ex) {
throw new RetryExceptionWrapper(ex);
}
};
}

@Override
public Publisher<T> apply(Publisher<T> publisher) {
if (publisher instanceof Mono) {
Context<T> context = new Context<>(retry.context());
Context<T> context = new Context<>(retry.asyncContext());
Mono<T> upstream = (Mono<T>) publisher;
return upstream.doOnNext(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(throwingConsumerWrapper(context::onError)))
return upstream.doOnNext(context::handleResult)
.retryWhen(errors -> errors.flatMap(context::handleErrors))
.doOnSuccess(t -> context.onComplete());
} else if (publisher instanceof Flux) {
Context<T> context = new Context<>(retry.context());
Context<T> context = new Context<>(retry.asyncContext());
Flux<T> upstream = (Flux<T>) publisher;
return upstream.doOnNext(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(throwingConsumerWrapper(context::onError)))
return upstream.doOnNext(context::handleResult)
.retryWhen(errors -> errors.flatMap(context::handleErrors))
.doOnComplete(context::onComplete);
} else {
throw new IllegalPublisherException(publisher);
}
}

/**
* @param <T> input
* @param <E> possible thrown exception
*/
@FunctionalInterface
public interface ThrowingConsumer<T, E extends Exception> {

void accept(T t) throws E;
}

private static class Context<T> {

private final Retry.Context<T> context;
private final Retry.AsyncContext<T> retryContext;

Context(Retry.Context<T> context) {
this.context = context;
Context(Retry.AsyncContext<T> retryContext) {
this.retryContext = retryContext;
}

void onComplete() {
this.context.onComplete();
this.retryContext.onComplete();
}

void throwExceptionToForceRetryOnResult(T value) {
if (context.onResult(value)) {
throw new RetryDueToResultException();
void handleResult(T result) {
long waitingDurationMillis = retryContext.onResult(result);
if (waitingDurationMillis != -1) {
throw new RetryDueToResultException(waitingDurationMillis);
}
}

void onError(Throwable throwable) throws Exception {
Mono<Long> handleErrors(Throwable throwable) {
if (throwable instanceof RetryDueToResultException) {
return;
long waitDurationMillis = ((RetryDueToResultException) throwable).waitDurationMillis;
return Mono.delay(Duration.ofMillis(waitDurationMillis));
}
// Filter Error to not retry on it
if (throwable instanceof Error) {
throw (Error) throwable;
}
try {
if (throwable instanceof RetryExceptionWrapper) {
context.onError(castToException(throwable.getCause()));
} else {
context.onError(castToException(throwable));
}

} catch (Throwable t) {
throw castToException(t);

long waitingDurationMillis = Try.of(() -> retryContext
.onError(throwable))
.get();

if (waitingDurationMillis == -1) {
Try.failure(throwable).get();
}
}

private Exception castToException(Throwable throwable) {
return throwable instanceof Exception ? (Exception) throwable
: new Exception(throwable);
return Mono.delay(Duration.ofMillis(waitingDurationMillis));
}

private static class RetryDueToResultException extends RuntimeException {
private final long waitDurationMillis;

RetryDueToResultException() {
RetryDueToResultException(long waitDurationMillis) {
super("retry due to retryOnResult predicate");
this.waitDurationMillis = waitDurationMillis;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import io.github.resilience4j.test.HelloWorldException;
import io.github.resilience4j.test.HelloWorldService;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import reactor.blockhound.BlockHound;
import reactor.blockhound.integration.ReactorIntegration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.io.IOException;
Expand All @@ -39,6 +43,11 @@ public class RetryOperatorTest {

private HelloWorldService helloWorldService;

@BeforeClass
public static void beforeClass() {
BlockHound.install(new ReactorIntegration());
}

@Before
public void setUp() {
helloWorldService = mock(HelloWorldService.class);
Expand All @@ -55,10 +64,16 @@ public void returnOnCompleteUsingMono() {
.willThrow(new HelloWorldException())
.willReturn("Hello world");

Mono.fromCallable(helloWorldService::returnHelloWorld).compose(retryOperator)
.block(Duration.ofMillis(100));
Mono.fromCallable(helloWorldService::returnHelloWorld).compose(retryOperator)
.block(Duration.ofMillis(100));
StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld).compose(retryOperator)
.subscribeOn(Schedulers.single()))
.expectNext("Hello world")
.expectComplete()
.verify(Duration.ofMillis(50));
StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld).compose(retryOperator)
.subscribeOn(Schedulers.single()))
.expectNext("Hello world")
.expectComplete()
.verify(Duration.ofMillis(50));

then(helloWorldService).should(times(4)).returnHelloWorld();
Retry.Metrics metrics = retry.getMetrics();
Expand All @@ -82,11 +97,6 @@ public void shouldNotRetryUsingMonoStackOverFlow() {
.expectSubscription()
.expectError(StackOverflowError.class)
.verify(Duration.ofMillis(50));

then(helloWorldService).should().returnHelloWorld();
Retry.Metrics metrics = retry.getMetrics();
assertThat(metrics.getNumberOfFailedCallsWithoutRetryAttempt()).isEqualTo(0);
assertThat(metrics.getNumberOfFailedCallsWithRetryAttempt()).isEqualTo(0);
}

@Test
Expand Down Expand Up @@ -119,15 +129,17 @@ public void returnOnErrorUsingMono() {
.willThrow(new HelloWorldException());

StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld)
.subscribeOn(Schedulers.single())
.compose(retryOperator))
.expectSubscription()
.expectError(RetryExceptionWrapper.class)
.expectError(HelloWorldException.class)
.verify(Duration.ofMillis(50));

StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld)
.subscribeOn(Schedulers.single())
.compose(retryOperator))
.expectSubscription()
.expectError(RetryExceptionWrapper.class)
.expectError(HelloWorldException.class)
.verify(Duration.ofMillis(50));

then(helloWorldService).should(times(6)).returnHelloWorld();
Expand All @@ -149,7 +161,7 @@ public void doNotRetryFromPredicateUsingMono() {
StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld)
.compose(RetryOperator.of(retry)))
.expectSubscription()
.expectError(RetryExceptionWrapper.class)
.expectError(HelloWorldException.class)
.verify(Duration.ofMillis(50));

then(helloWorldService).should().returnHelloWorld();
Expand All @@ -162,14 +174,15 @@ public void doNotRetryFromPredicateUsingMono() {
public void retryOnResultUsingMono() {
RetryConfig config = RetryConfig.<String>custom()
.retryOnResult("retry"::equals)
.waitDuration(Duration.ofMillis(50))
.waitDuration(Duration.ofMillis(10))
.maxAttempts(3).build();
Retry retry = Retry.of("testName", config);
given(helloWorldService.returnHelloWorld())
.willReturn("retry")
.willReturn("success");

StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld)
.subscribeOn(Schedulers.single())
.compose(RetryOperator.of(retry)))
.expectSubscription()
.expectNext("success")
Expand All @@ -185,13 +198,14 @@ public void retryOnResultUsingMono() {
public void retryOnResultFailAfterMaxAttemptsUsingMono() {
RetryConfig config = RetryConfig.<String>custom()
.retryOnResult("retry"::equals)
.waitDuration(Duration.ofMillis(50))
.waitDuration(Duration.ofMillis(10))
.maxAttempts(3).build();
Retry retry = Retry.of("testName", config);
given(helloWorldService.returnHelloWorld())
.willReturn("retry");

StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld)
.subscribeOn(Schedulers.single())
.compose(RetryOperator.of(retry)))
.expectSubscription()
.expectNextCount(1)
Expand All @@ -206,9 +220,11 @@ public void shouldFailWithExceptionFlux() {
Retry retry = Retry.of("testName", config);
RetryOperator<Object> retryOperator = RetryOperator.of(retry);

StepVerifier.create(Flux.error(new HelloWorldException()).compose(retryOperator))
StepVerifier.create(Flux.error(new HelloWorldException())
.subscribeOn(Schedulers.single())
.compose(retryOperator))
.expectSubscription()
.expectError(RetryExceptionWrapper.class)
.expectError(HelloWorldException.class)
.verify(Duration.ofMillis(50));

Retry.Metrics metrics = retry.getMetrics();
Expand All @@ -222,16 +238,18 @@ public void shouldFailWithExceptionFlux() {
public void retryOnResultUsingFlux() {
RetryConfig config = RetryConfig.<String>custom()
.retryOnResult("retry"::equals)
.waitDuration(Duration.ofMillis(50))
.waitDuration(Duration.ofMillis(10))
.maxAttempts(3).build();
Retry retry = Retry.of("testName", config);

StepVerifier.create(Flux.just("retry", "success")
.subscribeOn(Schedulers.single())
.compose(RetryOperator.of(retry)))
.expectSubscription()
.expectNext("retry")
.expectNext("success")
.expectComplete().verify(Duration.ofMillis(50));
.expectComplete()
.verify(Duration.ofMillis(100));

Retry.Metrics metrics = retry.getMetrics();
assertThat(metrics.getNumberOfFailedCallsWithoutRetryAttempt()).isEqualTo(0);
Expand All @@ -242,11 +260,12 @@ public void retryOnResultUsingFlux() {
public void retryOnResultFailAfterMaxAttemptsUsingFlux() {
RetryConfig config = RetryConfig.<String>custom()
.retryOnResult("retry"::equals)
.waitDuration(Duration.ofMillis(50))
.waitDuration(Duration.ofMillis(10))
.maxAttempts(3).build();
Retry retry = Retry.of("testName", config);

StepVerifier.create(Flux.just("retry")
.subscribeOn(Schedulers.single())
.compose(RetryOperator.of(retry)))
.expectSubscription()
.expectNextCount(1)
Expand All @@ -258,6 +277,6 @@ public void retryOnResultFailAfterMaxAttemptsUsingFlux() {
}

private RetryConfig retryConfig() {
return RetryConfig.custom().waitDuration(Duration.ofMillis(50)).build();
return RetryConfig.custom().waitDuration(Duration.ofMillis(10)).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static Retry of(String name, Supplier<RetryConfig> retryConfigSupplier) {
* @return a Retry with a custom Retry configuration.
*/
static Retry of(String name, Supplier<RetryConfig> retryConfigSupplier,
Map<String, String> tags) {
Map<String, String> tags) {
return new RetryImpl(name, retryConfigSupplier.get(), tags);
}

Expand Down Expand Up @@ -129,7 +129,7 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(
* @return a retryable function
*/
static <T> CheckedFunction0<T> decorateCheckedSupplier(Retry retry,
CheckedFunction0<T> supplier) {
CheckedFunction0<T> supplier) {
return () -> {
Retry.Context<T> context = retry.context();
do {
Expand Down Expand Up @@ -179,7 +179,7 @@ static CheckedRunnable decorateCheckedRunnable(Retry retry, CheckedRunnable runn
* @return a retryable function
*/
static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(Retry retry,
CheckedFunction1<T, R> function) {
CheckedFunction1<T, R> function) {
return (T t) -> {
Retry.Context<R> context = retry.context();
do {
Expand Down Expand Up @@ -232,7 +232,7 @@ static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier) {
* @return a retryable function
*/
static <E extends Exception, T> Supplier<Either<E, T>> decorateEitherSupplier(Retry retry,
Supplier<Either<E, T>> supplier) {
Supplier<Either<E, T>> supplier) {
return () -> {
Retry.Context<T> context = retry.context();
do {
Expand Down Expand Up @@ -483,7 +483,7 @@ default void executeRunnable(Runnable runnable) {
* @return the decorated CompletionStage.
*/
default <T> CompletionStage<T> executeCompletionStage(ScheduledExecutorService scheduler,
Supplier<CompletionStage<T>> supplier) {
Supplier<CompletionStage<T>> supplier) {
return decorateCompletionStage(this, scheduler, supplier).get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private void throwOrSleepAfterRuntimeException() {
}

private void waitIntervalAfterFailure(int currentNumOfAttempts,
@Nullable Throwable throwable) {
@Nullable Throwable throwable) {
// wait interval until the next attempt should start
long interval = intervalFunction.apply(numOfAttempts.get());
publishRetryEvent(
Expand Down
Loading

0 comments on commit ba36c42

Please sign in to comment.