From ae01b474101076a3af0545006ea599724981967c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Antonio=20Bre=C3=B1a=20Moral?= Date: Mon, 27 May 2024 20:47:45 +0000 Subject: [PATCH] Upgrade --- README.md | 18 +- .../java/info/jab/fp/async/CFExamples.java | 108 ++++++------ .../java/info/jab/fp/async/CoresDemo.java | 12 +- .../java/info/jab/fp/async/SimpleCurl.java | 50 ------ .../info/jab/fp/async/WebAddressService.java | 62 ------- .../info/jab/fp/async/CFCompositionTest.java | 110 ------------ .../info/jab/fp/async/CFExamplesTest.java | 83 +++++----- .../info/jab/fp/async/CFMonadLawsTest.java | 5 + .../java/info/jab/fp/async/CFTimeoutTest.java | 4 + .../fp/async/ParallelStreamProcesingTest.java | 156 ------------------ .../async => utils}/TestLoggerExtension.java | 2 +- 11 files changed, 116 insertions(+), 494 deletions(-) delete mode 100644 training/src/main/java/info/jab/fp/async/SimpleCurl.java delete mode 100644 training/src/main/java/info/jab/fp/async/WebAddressService.java delete mode 100644 training/src/test/java/info/jab/fp/async/CFCompositionTest.java delete mode 100644 training/src/test/java/info/jab/fp/async/ParallelStreamProcesingTest.java rename training/src/test/java/info/jab/{fp/async => utils}/TestLoggerExtension.java (95%) diff --git a/README.md b/README.md index 57a94b0..9c8ca81 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ A repository to review the main concepts about Functional Programming with Java. ```bash sdk env install ./mvnw clean test -DexcludedGroups=performance,endtoend -./mvnw clean test -DexcludedGroups=performance,endtoend -Dtest=CFBasicsTest -pl training +./mvnw clean test -DexcludedGroups=performance,endtoend -Dtest=CFExamplesTest -pl training ./mvnw clean test -Dgroups=performance ./mvnw clean test -Dgroups=endtoend @@ -24,14 +24,14 @@ sdk env install ## Functional programming features in Java -- Lambda Expressions -- Optionals -- Stream API -- CompletableFuture & Structural Concurrency -- Immutable Lists -- Sealed Classes -- Pattern Matching for Switch -- Records & Record Patterns +- [ ] Lambda Expressions +- [ ] Optionals +- [ ] Stream API +- [x] CompletableFuture & [ ] Structural Concurrency +- [ ] Immutable Lists +- [ ] Sealed Classes +- [ ] Pattern Matching for Switch +- [ ] Records & Record Patterns ## Functional programming timeline in Java diff --git a/training/src/main/java/info/jab/fp/async/CFExamples.java b/training/src/main/java/info/jab/fp/async/CFExamples.java index 794355c..a08972f 100644 --- a/training/src/main/java/info/jab/fp/async/CFExamples.java +++ b/training/src/main/java/info/jab/fp/async/CFExamples.java @@ -90,12 +90,6 @@ public Integer callingFourAsyncTasks() { .reduce(0 , (i1, i2) -> i1 + i2); } - CompletableFuture> asyncCallOptional(Integer param) { - CompletableFuture> cf1 = CompletableFuture - .supplyAsync(() -> Optional.of(this.compute.apply(param))); - return cf1; - } - CompletableFuture asyncCallFailed() { CompletableFuture cf1 = CompletableFuture .supplyAsync(() -> { @@ -104,35 +98,14 @@ CompletableFuture asyncCallFailed() { return cf1; } - CompletableFuture asyncCallFailedProtected() { - CompletableFuture cf1 = CompletableFuture - .supplyAsync(() -> { - throw new RuntimeException("Katakroker"); - }) - .handle((result, ex) -> { - return 100; - }); - return cf1; - } - - Supplier katakroker = () -> { - delay(1); - throw new RuntimeException("Katakroker"); - }; - - CompletableFuture> asyncCallFailedOptional() { - CompletableFuture> cf1 = CompletableFuture - .supplyAsync(() -> Optional.of(katakroker.get())); - return cf1; - } - - public Integer myForthCF() { + public Integer callingFourAsyncTasksAndOneFails() { CompletableFuture request1 = asyncCall(1); CompletableFuture request2 = asyncCallFailed(); CompletableFuture request3 = asyncCall(1); + CompletableFuture request4 = asyncCall(1); - List> futuresList = List.of(request1, request2, request3); + List> futuresList = List.of(request1, request2, request3, request4); return futuresList.stream() .filter(cf -> !cf.isCompletedExceptionally()) @@ -140,15 +113,7 @@ public Integer myForthCF() { .reduce(0 , (i1, i2) -> i1 + i2); } - /** - * If an exception occurs in a stage and we do not do anything - * to handle that exception then execution to the further stages is abandon. - * - * @return - */ - public Integer myFifthCF() { - - logger.info("Defining"); + public Integer callingFourAsyncTasksAndThreeFails() { CompletableFuture request1 = asyncCallFailed(); CompletableFuture request2 = asyncCallFailed(); @@ -157,28 +122,31 @@ public Integer myFifthCF() { List> futuresList = List.of(request1, request2, request3, request4); - logger.info("Running"); - - var futuresList2 = Stream.of(request1, request2, request3, request4); - - - return futuresList2 + return futuresList.stream() .filter(not(CompletableFuture::isCompletedExceptionally)) .map(CompletableFuture::join) .reduce(0 , (i1, i2) -> i1 + i2); } - public Integer mySixthCF() { + CompletableFuture asyncCallFailedWithDefaultResult() { + CompletableFuture cf1 = CompletableFuture + .supplyAsync(() -> { + throw new RuntimeException("Katakroker"); + }) + .handle((result, ex) -> { + return 100; + }); + return cf1; + } - logger.info("Defining"); + public Integer callingFourAsyncTasksWithDefaultValues() { CompletableFuture request1 = asyncCall(1); - CompletableFuture request2 = asyncCallFailed(); - CompletableFuture request3 = asyncCallFailedProtected(); - - List> futuresList = List.of(request1, request2, request3); + CompletableFuture request2 = asyncCall(1); + CompletableFuture request3 = asyncCallFailed(); + CompletableFuture request4 = asyncCallFailedWithDefaultResult(); - logger.info("Running"); + List> futuresList = List.of(request1, request2, request3, request4); return futuresList.stream() .filter(not(CompletableFuture::isCompletedExceptionally)) @@ -186,9 +154,24 @@ public Integer mySixthCF() { .reduce(0 , (i1, i2) -> i1 + i2); } - public Integer mySeventhCF() { + CompletableFuture> asyncCallOptional(Integer param) { + CompletableFuture> cf1 = CompletableFuture + .supplyAsync(() -> Optional.of(this.compute.apply(param))); + return cf1; + } + + Supplier katakroker = () -> { + delay(1); + throw new RuntimeException("Katakroker"); + }; - logger.info("Defining"); + CompletableFuture> asyncCallFailedOptional() { + CompletableFuture> cf1 = CompletableFuture + .supplyAsync(() -> Optional.of(katakroker.get())); + return cf1; + } + + public Integer callingFourAsyncTasksWithOptionals() { CompletableFuture> request1 = asyncCallOptional(1); CompletableFuture> request2 = asyncCallFailedOptional(); @@ -197,12 +180,19 @@ public Integer mySeventhCF() { List>> futuresList = List.of(request1, request2, request3, request4); - logger.info("Running"); - return futuresList.stream() - .filter(CompletableFuture::isCompletedExceptionally) + .map(cf -> cf.handle((result, ex) -> { + if (ex != null) { + // Log the exception if needed + System.out.println("Exception: " + ex.getMessage()); + return Optional.empty(); + } else { + return result; + } + })) .map(CompletableFuture::join) - .map(o -> o.get()) - .reduce(0 , (i1, i2) -> i1 + i2); + .filter(Optional::isPresent) + .map(Optional::get) + .reduce(0, Integer::sum); } } diff --git a/training/src/main/java/info/jab/fp/async/CoresDemo.java b/training/src/main/java/info/jab/fp/async/CoresDemo.java index 9d870d6..fa7719c 100644 --- a/training/src/main/java/info/jab/fp/async/CoresDemo.java +++ b/training/src/main/java/info/jab/fp/async/CoresDemo.java @@ -13,7 +13,7 @@ public class CoresDemo { - private static final Logger LOGGER = LoggerFactory.getLogger(CoresDemo.class); + private static final Logger logger = LoggerFactory.getLogger(CoresDemo.class); public static void main(String... args) { @@ -27,7 +27,7 @@ public static void main(String... args) { long stopTime = System.currentTimeMillis(); long elapsedTime = stopTime - startTime; - LOGGER.info("{}", elapsedTime); + logger.info("{}", elapsedTime); }; Supplier process = () -> { @@ -56,10 +56,8 @@ static class Client { private int x; - Logger LOGGER = LoggerFactory.getLogger(Client.class); - public Client(int x) { - LOGGER.info("new Instance: {}", x); + logger.info("new Instance: {}", x); this.x = x; } @@ -70,14 +68,14 @@ CompletableFuture runAsync() { .orTimeout(60, TimeUnit.SECONDS) .handle((response, ex) -> { if (!Objects.isNull(ex)) { - LOGGER.error(ex.getLocalizedMessage(), ex); + logger.error(ex.getLocalizedMessage(), ex); } return response; }); } private Integer longProcess() { - LOGGER.info("Running: {}", this.x); + logger.info("Running: {}", this.x); sleep(2); return 0; } diff --git a/training/src/main/java/info/jab/fp/async/SimpleCurl.java b/training/src/main/java/info/jab/fp/async/SimpleCurl.java deleted file mode 100644 index 56b57ff..0000000 --- a/training/src/main/java/info/jab/fp/async/SimpleCurl.java +++ /dev/null @@ -1,50 +0,0 @@ -package info.jab.fp.async; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.URL; -import java.net.URLConnection; -import java.nio.charset.Charset; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SimpleCurl { - - private static final Logger LOGGER = LoggerFactory.getLogger(SimpleCurl.class); - - public static String fetch(URL myURL) { - - LOGGER.info("Thread: {}", Thread.currentThread().getName()); - LOGGER.info("Requeted URL: {}", myURL); - - StringBuilder sb = new StringBuilder(); - - try { - - InputStreamReader in = null; - URLConnection urlConn = myURL.openConnection(); - if (urlConn != null) { - urlConn.setReadTimeout(5 * 1000); - } - if (urlConn != null && urlConn.getInputStream() != null) { - in = new InputStreamReader(urlConn.getInputStream(), Charset.defaultCharset()); - BufferedReader bufferedReader = new BufferedReader(in); - if (bufferedReader != null) { - int cp; - while ((cp = bufferedReader.read()) != -1) { - sb.append((char) cp); - } - bufferedReader.close(); - } - } - in.close(); - } catch (Exception e) { - LOGGER.error(e.getLocalizedMessage(), e); - throw new RuntimeException("Exception while calling URL: " + myURL, e); - } - - return sb.toString(); - } - -} diff --git a/training/src/main/java/info/jab/fp/async/WebAddressService.java b/training/src/main/java/info/jab/fp/async/WebAddressService.java deleted file mode 100644 index e5d5dae..0000000 --- a/training/src/main/java/info/jab/fp/async/WebAddressService.java +++ /dev/null @@ -1,62 +0,0 @@ -package info.jab.fp.async; - -/* -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.List; -import java.util.stream.Collectors; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; - - -import static io.vavr.API.$; -import static io.vavr.API.Case; -import static io.vavr.API.Match; -import static io.vavr.Patterns.$Left; -import static io.vavr.Patterns.$Right; -import io.vavr.control.Either; -import io.vavr.control.Try; - -public class WebAddressService { - - private List loadData() { - - return Try.of(() -> { - ObjectMapper objectMapper = new ObjectMapper(); - String readContent = new String(Files.readAllBytes(Paths.get(getClass().getClassLoader() - .getResource("data/webAddresses.json") - .toURI()))); - - List deserializedData = objectMapper.readValue(readContent, new TypeReference>() {}); - return deserializedData; - }).getOrElseThrow(ex -> { - //LOGGER.error(ex.getLocalizedMessage(), ex); - throw new IllegalArgumentException("It was impossible to load the data"); - }); - - } - - public List getRawData() { - return loadData(); - } - - public List search() { - - return loadData().stream() - .filter(this::isValid) - .peek(System.out::println) - .collect(Collectors.toList()); - } - - private boolean isValid(String address) { - - Either result = Try.of(() -> new URL(address)).toEither(); - return Match(result).of( - Case($Right($()), true), - Case($Left($()), false) - ); - } -} - */ diff --git a/training/src/test/java/info/jab/fp/async/CFCompositionTest.java b/training/src/test/java/info/jab/fp/async/CFCompositionTest.java deleted file mode 100644 index c5d3c35..0000000 --- a/training/src/test/java/info/jab/fp/async/CFCompositionTest.java +++ /dev/null @@ -1,110 +0,0 @@ -package info.jab.fp.async; - -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -import static org.assertj.core.api.BDDAssertions.then; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CFCompositionTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(CFCompositionTest.class); - - private Integer method1(Integer param) { - - LOGGER.info("Thread: {}", Thread.currentThread().getName()); - - delay(3); - - return 1 + param; - } - - private Integer method2(Integer param) { - - LOGGER.info("Thread: {}", Thread.currentThread().getName()); - - delay(1); - - return 1 + param; - } - - private void delay(int seconds) { - try { - Thread.sleep(seconds * 1000); - } catch (InterruptedException ex) { - } - } - - private CompletableFuture cf(Integer param) { - - return new CompletableFuture<>() - .supplyAsync(() -> 1 + param) - .handle((result, ex) -> { - if(!Objects.isNull(ex)) { - LOGGER.info("{}", 99); - return 99; - } - LOGGER.info("{}", result); - return result; - }); - } - - @Test - public void composeTest() { - - then(this.cf(1) - .thenCompose(cfResult -> cf(cfResult)) - .thenCompose(cfResult2 -> cf(cfResult2)) - .join()) - .isEqualTo(4); - } - - Function> cf1 = (param) -> { - - return new CompletableFuture<>() - .supplyAsync(() -> method1(param)) - .handle((result, ex) -> { - if(!Objects.isNull(ex)) { - LOGGER.info("{}", 99); - return 99; - } - LOGGER.info("{}", result); - return result; - }); - }; - - @Test - public void composeTest2() { - - then(cf1.apply(1) - .thenCompose(cfResult -> cf1.apply(cfResult)) - .thenCompose(cfResult2 -> cf1.apply(cfResult2)) - .join()) - .isEqualTo(4); - } - - @Test - public void composeTest3() { - - CompletableFuture cf2 = new CompletableFuture<>() - .supplyAsync(() -> 1) - .handle((result, ex) -> { - if(!Objects.isNull(ex)) { - LOGGER.info("{}", 99); - return 99; - } - LOGGER.info("{}", result); - return result; - }); - - then(cf1.apply(1) - .thenCompose(cfResult -> cf1.apply(cfResult)) - .thenCompose(cfResult2 -> cf2) - .join()) - .isEqualTo(1); - } - -} \ No newline at end of file diff --git a/training/src/test/java/info/jab/fp/async/CFExamplesTest.java b/training/src/test/java/info/jab/fp/async/CFExamplesTest.java index 60959c2..6329e3b 100644 --- a/training/src/test/java/info/jab/fp/async/CFExamplesTest.java +++ b/training/src/test/java/info/jab/fp/async/CFExamplesTest.java @@ -1,21 +1,17 @@ package info.jab.fp.async; import java.time.Duration; -import java.util.Optional; import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import static java.util.function.Predicate.not; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.assertj.core.api.BDDAssertions.then; import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.equalTo; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import info.jab.utils.TestLoggerExtension; + @ExtendWith(TestLoggerExtension.class) public class CFExamplesTest { @@ -104,60 +100,67 @@ public void should_call_fourAsyncTasks() { } @Test - public void given_CF4_when_Call_then_returnExpectedValue() { + public void should_call_fourAsyncTasksAndOneFails() { - Callable demo = () -> cfExamples.myForthCF(); + //Given + var expectedComputationTime = (cfExamples.getDelay() * 4) + 1; + var expectedResult = 2 + 2 + 2; + + //When + Callable demo = () -> cfExamples.callingFourAsyncTasksAndOneFails(); + //Then await() - .atMost(Duration.ofSeconds(7)) - .until(demo, equalTo(4)); + .atMost(Duration.ofSeconds(expectedComputationTime)) + .until(demo, equalTo(expectedResult)); } - @Disabled @Test - public void given_CF5_when_Call_then_returnExpectedValue() throws Exception { + public void should_call_fourAsyncTasksAndThreeFails() { + + //Given + var expectedComputationTime = (cfExamples.getDelay() * 4) + 1; + var expectedResult = 2; + + //When + Callable demo = () -> cfExamples.callingFourAsyncTasksAndThreeFails(); - then(cfExamples.myFifthCF()).isEqualTo(2); + //Then + await() + .atMost(Duration.ofSeconds(expectedComputationTime)) + .until(demo, equalTo(expectedResult)); } @Test - public void given_CF6_when_Call_then_returnExpectedValue() { + public void should_call_fourAsyncTasksWithDefaultValues() { + //Given + var expectedComputationTime = (cfExamples.getDelay() * 4) + 1; + var expectedResult = 104; - Callable demo = () -> cfExamples.mySixthCF(); + //When + Callable demo = () -> cfExamples.callingFourAsyncTasksWithDefaultValues(); + //Then await() - .atMost(Duration.ofSeconds(7)) - .until(demo, equalTo(102)); + .atMost(Duration.ofSeconds(expectedComputationTime)) + .until(demo, equalTo(expectedResult)); } @Test - public void given_CF7_when_Call_then_returnExpectedValue() { + public void should_call_fourAsyncTasksWithOptionals() { - Callable demo = () -> cfExamples.mySeventhCF(); + //Given + var expectedComputationTime = (cfExamples.getDelay() * 4) + 1; + var expectedResult = 2; - await() - .atMost(Duration.ofSeconds(7)) - .until(demo, equalTo(0)); + //When + Callable demo = () -> cfExamples.callingFourAsyncTasksWithOptionals(); - //then(example.mySeventhCF()).isEqualTo(0); + //Then + await() + .atMost(Duration.ofSeconds(expectedComputationTime)) + .until(demo, equalTo(expectedResult)); } - @Disabled - @Test - public void testWithErrorHandledInStream() { - - CompletableFuture> future1 = CompletableFuture.supplyAsync(() -> Optional.of("1")); - CompletableFuture> futureEx = CompletableFuture.supplyAsync(() -> Optional.of((2/0)+"")); - CompletableFuture> future2 = CompletableFuture.supplyAsync(() -> Optional.of("2")); - CompletableFuture> future3 = CompletableFuture.supplyAsync(() -> Optional.of("3")); - String output = Stream.of(future1, futureEx, future2, future3) - //.filter(x -> !x.isCompletedExceptionally()) - .filter(not(CompletableFuture::isCompletedExceptionally)) - .map(CompletableFuture::join) - .map(x -> x.get()) - .collect(Collectors.joining(" ")); - - then(output).isEqualTo("1 2 3"); - } } \ No newline at end of file diff --git a/training/src/test/java/info/jab/fp/async/CFMonadLawsTest.java b/training/src/test/java/info/jab/fp/async/CFMonadLawsTest.java index 4fc6e89..a29171c 100644 --- a/training/src/test/java/info/jab/fp/async/CFMonadLawsTest.java +++ b/training/src/test/java/info/jab/fp/async/CFMonadLawsTest.java @@ -2,13 +2,18 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; + import org.junit.jupiter.api.Test; import static org.assertj.core.api.BDDAssertions.then; +import org.junit.jupiter.api.extension.ExtendWith; + +import info.jab.utils.TestLoggerExtension; /** * Original code from: https://gist.github.com/lestard/e28fb8a340737ffd9623 */ +@ExtendWith(TestLoggerExtension.class) public class CFMonadLawsTest { /** diff --git a/training/src/test/java/info/jab/fp/async/CFTimeoutTest.java b/training/src/test/java/info/jab/fp/async/CFTimeoutTest.java index bee72d0..71df691 100644 --- a/training/src/test/java/info/jab/fp/async/CFTimeoutTest.java +++ b/training/src/test/java/info/jab/fp/async/CFTimeoutTest.java @@ -13,9 +13,13 @@ import static org.assertj.core.api.BDDAssertions.then; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import info.jab.utils.TestLoggerExtension; + +@ExtendWith(TestLoggerExtension.class) public class CFTimeoutTest { private static final Logger LOGGER = LoggerFactory.getLogger(CFTimeoutTest.class); diff --git a/training/src/test/java/info/jab/fp/async/ParallelStreamProcesingTest.java b/training/src/test/java/info/jab/fp/async/ParallelStreamProcesingTest.java deleted file mode 100644 index 13e1519..0000000 --- a/training/src/test/java/info/jab/fp/async/ParallelStreamProcesingTest.java +++ /dev/null @@ -1,156 +0,0 @@ -package info.jab.fp.async; - -/* -import java.net.URL; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import static java.util.stream.Collectors.toList; - -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.vavr.Function1; -import io.vavr.Tuple; -import io.vavr.Tuple2; -import io.vavr.control.Either; -import io.vavr.control.Try; - -public class ParallelStreamProcesingTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(ParallelStreamProcesingTest.class); - - private List getValidAddressList() { - - //In another context, the object is instanced by DI - WebAddressService obj = new WebAddressService(); - - Function1> toURL = - address -> Try.of(() -> new URL(address)).toEither(); - - return obj.getRawData().stream() - .map(toURL) - .filter(Either::isRight) - .map(Either::get) - .collect(toList()); - } - - @Test - public void getValidAddressListTest() { - - assertThat(this.getValidAddressList().size()).isGreaterThan(0); - } - - //Example 1: Exist cases where the endpoint doesn´t open a connection and the performance is poor - @Disabled - @Test - public void fetchAddressInSequenceTest() { - - Consumer print = System.out::println; - Function1 fetch = Function1.of(SimpleCurl::fetch); - - this.getValidAddressList().stream() - .map(fetch) - .map(this::getTitle) - .forEach(print); - } - - //Example 2: In this example, we are blocking the thread and it is not a good practice. - @Test - public void fetchAddressAsyncTest() { - - Consumer print = System.out::println; - - this.getValidAddressList().stream() - .map(x -> curlAsync((URL) x)) - .filter(Either::isRight) - .map(Either::get) - .map(this::getTitle) - .forEach(System.out::println); - } - - private Either curlAsync(URL address) { - - ExecutorService executor = Executors.newFixedThreadPool(100); - CompletableFuture future = CompletableFuture.supplyAsync(() -> SimpleCurl.fetch(address), executor); - - return Try.of(() -> future.get(5, TimeUnit.SECONDS)).toEither(); - } - - private static ExecutorService executor = Executors.newFixedThreadPool(10); - - @Test - public void fetchAddressAsync3Test() { - - LOGGER.info("Thread: {}", Thread.currentThread().getName()); - - Consumer> print = System.out::println; - - List> result = this.getValidAddressList().stream() - .map(this::curlAsync4) - .map(CompletableFuture::join) - .peek(print) - .collect(toList()); - - assertThat(result.size()).isEqualTo(4); - } - - @Test - public void fetchAddressAsync4Test() throws Exception { - - LOGGER.info("Thread: {}", Thread.currentThread().getName()); - - Consumer> print = System.out::println; - - List>> futureRequests = this.getValidAddressList().stream() - .map(x -> curlAsync4(x)) - .collect(toList()); - - List> result2 = futureRequests.stream() - .map(CompletableFuture::join) - .peek(print) - .collect(toList()); - - assertThat(result2.size()).isEqualTo(4); - } - - private CompletableFuture> curlAsync4(URL address) { - - LOGGER.info("Thread: {}", Thread.currentThread().getName()); - CompletableFuture> future = CompletableFuture - .supplyAsync(() -> fetchWrapper(address), executor) - .exceptionally(ex -> { - LOGGER.error(ex.getLocalizedMessage(), ex); - return Tuple.of(address, "FETCH_BAD_RESULT"); - }) - .completeOnTimeout(Tuple.of(address, "FETCH_BAD_RESULT"),5, TimeUnit.SECONDS); - - return future; - } - - private Tuple2 fetchWrapper(URL address) { - return Tuple.of(address, getTitle(SimpleCurl.fetch(address))); - } - - private String getTitle(String html) { - - Pattern p = Pattern.compile("(.*?)"); - Matcher m = p.matcher(html); - - if (m.find()) { - return m.group(1); - }else { - return "No title"; - } - } - -} - */ diff --git a/training/src/test/java/info/jab/fp/async/TestLoggerExtension.java b/training/src/test/java/info/jab/utils/TestLoggerExtension.java similarity index 95% rename from training/src/test/java/info/jab/fp/async/TestLoggerExtension.java rename to training/src/test/java/info/jab/utils/TestLoggerExtension.java index c3b402e..04d0af5 100644 --- a/training/src/test/java/info/jab/fp/async/TestLoggerExtension.java +++ b/training/src/test/java/info/jab/utils/TestLoggerExtension.java @@ -1,4 +1,4 @@ -package info.jab.fp.async; +package info.jab.utils; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext;