Skip to content

Commit

Permalink
Added a CompletionStageUtils class
Browse files Browse the repository at this point in the history
* Added a CompletionStageUtils class which simplifies to recover from exceptions when using CompletableFutures.
* Added missing  Apache Headers.
* Added two default methods to ThreadPoolBulkhead
* Improved JavaDoc of ThreadPoolBulkhead.
  • Loading branch information
RobWin authored Jan 29, 2020
1 parent 2508cd0 commit 2c7325d
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,29 @@
public interface ThreadPoolBulkhead {

/**
* Returns a callable which is decorated by a bulkhead.
* Returns a supplier which submits a value-returning task for execution to the ThreadPoolBulkhead and
* returns a CompletionStage representing the pending results of the task.
*
* @param bulkhead the bulkhead
* @param callable the original Callable
* @param <T> the result type of callable
* @return a supplier which is decorated by a Bulkhead.
* @param <T> the result type of the callable
* @return a supplier which submits a value-returning task for execution and returns a CompletionStage representing the pending
* results of the task.
*/
static <T> Callable<CompletionStage<T>> decorateCallable(ThreadPoolBulkhead bulkhead,
static <T> Supplier<CompletionStage<T>> decorateCallable(ThreadPoolBulkhead bulkhead,
Callable<T> callable) {
return () -> bulkhead.submit(callable);
}

/**
* Returns a supplier which is decorated by a bulkhead.
* Returns a supplier which submits a value-returning task for execution to the ThreadPoolBulkhead
* and returns a CompletionStage representing the pending results of the task.
*
* @param bulkhead the bulkhead
* @param supplier the original supplier
* @param <T> the type of results supplied by this supplier
* @return a supplier which is decorated by a Bulkhead.
* @param supplier the original Supplier
* @param <T> the result type of the supplier
* @return a supplier which submits a value-returning task for execution and returns a CompletionStage representing the pending
* results of the task.
*/
static <T> Supplier<CompletionStage<T>> decorateSupplier(ThreadPoolBulkhead bulkhead,
Supplier<T> supplier) {
Expand Down Expand Up @@ -173,10 +177,38 @@ static ThreadPoolBulkhead of(String name,
ThreadPoolBulkheadEventPublisher getEventPublisher();

/**
* Decorates and executes the decorated Supplier.
* Returns a supplier which submits a value-returning task for execution to the ThreadPoolBulkhead and returns a CompletionStage representing the pending
* results of the task.
*
* @param supplier the original Supplier
* @param <T> the type of results supplied by this supplier
* @param <T> the result type of the supplier
* @return a supplier which submits a value-returning task for execution and returns a CompletionStage representing the pending
* results of the task.
*/
default <T> Supplier<CompletionStage<T>> decorateSupplier(Supplier<T> supplier) {
return decorateSupplier(this, supplier);
}

/**
* Returns a supplier which submits a value-returning task for execution to the ThreadPoolBulkhead and returns a CompletionStage representing the pending
* results of the task.
*
* @param callable the original Callable
* @param <T> the result type of the callable
* @return a supplier which submits a value-returning task for execution and returns a CompletionStage representing the pending
* results of the task.
*/
default <T> Supplier<CompletionStage<T>> decorateCallable(Callable<T> callable) {
return decorateCallable(this, callable);
}


/**
* Submits a value-returning task for execution to the ThreadPoolBulkhead and returns a CompletionStage representing the pending
* results of the task.
*
* @param supplier the value-returning task
* @param <T> the result type of the supplier
* @return the result of the decorated Supplier.
* @throws BulkheadFullException if the no permits
*/
Expand All @@ -185,15 +217,15 @@ default <T> CompletionStage<T> executeSupplier(Supplier<T> supplier) {
}

/**
* Decorates and executes the decorated Callable.
* Submits a value-returning task for execution to the ThreadPoolBulkhead and returns a CompletionStage representing the pending
* results of the task.
*
* @param callable the original Callable
* @param <T> the result type of callable
* @param callable the value-returning task
* @param <T> the result type of the callable
* @return the result of the decorated Callable.
* @throws Exception if unable to compute a result
*/
default <T> CompletionStage<T> executeCallable(Callable<T> callable) throws Exception {
return decorateCallable(this, callable).call();
default <T> CompletionStage<T> executeCallable(Callable<T> callable) {
return decorateCallable(this, callable).get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
*
* Copyright 2020: Robert Winkler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package io.github.resilience4j.core;

import java.util.concurrent.Callable;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
/*
*
* Copyright 2020: Robert Winkler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package io.github.resilience4j.core;

import io.github.resilience4j.core.lang.Nullable;

import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;

public class ClassUtils {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
*
* Copyright 2020: Robert Winkler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package io.github.resilience4j.core;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

public class CompletionStageUtils {

private CompletionStageUtils() {
}

/**
* Returns a CompletionStage that is recovered from any exception.
*
* @param <T> return type of after
* @param exceptionHandler the function applied after callable has failed
* @return a CompletionStage that is recovered from any exception.
*/
public static <T> CompletionStage<T> recover(CompletionStage<T> completionStage, Function<Throwable, T> exceptionHandler){
CompletableFuture<T> promise = new CompletableFuture<>();
completionStage.whenComplete((result, throwable) -> {
if (throwable != null) {
try {
promise.complete(exceptionHandler.apply(throwable));
} catch (Exception fallbackException) {
promise.completeExceptionally(fallbackException);
}
} else {
promise.complete(result);
}
});
return promise;
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
*
* Copyright 2020: Robert Winkler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package io.github.resilience4j.core;

import io.github.resilience4j.core.lang.Nullable;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
*
* Copyright 2020: Robert Winkler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package io.github.resilience4j.core;

import java.util.function.BiFunction;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
*
* Copyright 2020: Robert Winkler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package io.github.resilience4j.core.exception;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.github.resilience4j.core;

import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static io.github.resilience4j.core.CompletionStageUtils.recover;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class CompletionStageUtilsTest {

@Test
public void shouldReturnResult() throws Exception {
CompletableFuture<String> future = CompletableFuture.completedFuture("result");

String result = recover(future, (e) -> "fallback").toCompletableFuture()
.get(1, TimeUnit.SECONDS);

assertThat(result).isEqualTo("result");
}

@Test
public void shouldRecoverException() throws Exception {
CompletableFuture<String> future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException());

String result = recover(future, (e) -> "fallback").toCompletableFuture()
.get(1, TimeUnit.SECONDS);

assertThat(result).isEqualTo("fallback");
}

@Test
public void shouldReturnExceptionFromRecoveryMethod() {
CompletableFuture<String> future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("bla"));

RuntimeException exception = new RuntimeException("blub");

Function<Throwable, String> fallback = (e) -> {
throw exception;
};

assertThatThrownBy(() -> recover(future, fallback).toCompletableFuture()
.get(1, TimeUnit.SECONDS)).hasCause(exception);
}

}

0 comments on commit 2c7325d

Please sign in to comment.