From 36cfb1381579836c53da9952c15ff4c7cda717c3 Mon Sep 17 00:00:00 2001 From: alkum Date: Mon, 28 Mar 2022 22:13:42 +0200 Subject: [PATCH 1/2] Extend CompletableFutureUtils with boolean future utils and add tests Include util for boolean futures which has two completion conditions. --- .../common/util/CompletableFutureUtils.java | 47 ++++++- .../util/CompletableFutureUtilsTest.java | 116 ++++++++++++++++++ 2 files changed, 161 insertions(+), 2 deletions(-) create mode 100644 common/src/test/java/bisq/common/util/CompletableFutureUtilsTest.java diff --git a/common/src/main/java/bisq/common/util/CompletableFutureUtils.java b/common/src/main/java/bisq/common/util/CompletableFutureUtils.java index d17144a1bf..544763af35 100644 --- a/common/src/main/java/bisq/common/util/CompletableFutureUtils.java +++ b/common/src/main/java/bisq/common/util/CompletableFutureUtils.java @@ -48,12 +48,17 @@ public static CompletableFuture> allOf(CompletableFuture... list) ); } - public static CompletableFuture> anyOf(Collection> collection) { + public static CompletableFuture anyOfBooleanMatchingFilterOrAll(boolean filter, Collection> collection) { + //noinspection unchecked + return anyOfBooleanMatchingFilterOrAll(filter, collection.toArray(new CompletableFuture[0])); + } + + public static CompletableFuture anyOf(Collection> collection) { //noinspection unchecked return anyOf(collection.toArray(new CompletableFuture[0])); } - public static CompletableFuture> anyOf(Stream> collection) { + public static CompletableFuture anyOf(Stream> collection) { return anyOf(collection.collect(Collectors.toList())); } @@ -66,6 +71,44 @@ public static CompletableFuture anyOf(CompletableFuture... list) { .orElseThrow()); } + /** + * Completes on any one of the following conditions: + *
+ * a) ANY of the CompletableFutures completes with the given filter, or + *
+ * b) ALL CompletableFutures complete + *

+ * + * Useful in situations such as + *
+ * "complete when any boolean future in the list completes with true, else complete when all complete with false" + * + * @param filter + * @param list + * @return + * + * @see "https://stackoverflow.com/a/58999999" + */ + @SafeVarargs + public static CompletableFuture anyOfBooleanMatchingFilterOrAll( + boolean filter, CompletableFuture... list) { + CompletableFuture allWithFailFast = CompletableFuture + .allOf(list) + .thenApply(__ -> { + Stream.of(list) + .map(CompletableFuture::join); + return filter; + } + ); + Stream.of(list) + .forEach(f -> f.thenAccept(result -> { + if (result == filter) { + allWithFailFast.complete(result); + } + })); + return allWithFailFast; + } + // CompletableFuture.applyToEither has some undesired error handling behavior (if first fail result fails). // This method provides the expected behaviour that if one of the 2 futures completes we complete our // result future. If both fail the result fail as well. diff --git a/common/src/test/java/bisq/common/util/CompletableFutureUtilsTest.java b/common/src/test/java/bisq/common/util/CompletableFutureUtilsTest.java new file mode 100644 index 0000000000..8b83c97a62 --- /dev/null +++ b/common/src/test/java/bisq/common/util/CompletableFutureUtilsTest.java @@ -0,0 +1,116 @@ +package bisq.common.util; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +@Slf4j +public class CompletableFutureUtilsTest { + + @Test + public void testAllOf() throws ExecutionException, InterruptedException { + + CompletableFuture cfA = createCompletableFuture(1000, "A"); + CompletableFuture cfB = createCompletableFuture(2000, "B"); + CompletableFuture cfC = createCompletableFuture(3000, "C"); + + CompletableFutureUtils.allOf(cfA, cfB, cfC) + .thenApply(res -> { + log.info("CompletableFutureUtils.allOf(A, B, C) completed"); + return null; + }) + .get(); + } + + @Test + public void testAnyOf_allSucceed() throws ExecutionException, InterruptedException { + + CompletableFuture cfA = createCompletableFuture(1000, "A"); + CompletableFuture cfB = createCompletableFuture(2000, "B"); + CompletableFuture cfC = createCompletableFuture(3000, "C"); + + // Completes as soon as the fastest future in the args + CompletableFutureUtils.anyOf(cfA, cfB, cfC) + .thenApply(res -> { + log.info("CompletableFutureUtils.anyOf(A, B, C) completed"); + return null; + }) + .get(); + } + + @Test + public void testAnyOfBoolean() throws ExecutionException, InterruptedException { + + CompletableFuture cfA = createCompletableFutureBool(1000, "A", false); + CompletableFuture cfB = createCompletableFutureBool(2000, "B", true); + CompletableFuture cfC = createCompletableFutureBool(3000, "C", true); + + // Completes as soon as the fastest future in the args + // Has the value returned by the first future that completes + CompletableFutureUtils.anyOf(cfA, cfB, cfC) + .thenApply(res -> { + log.info("CompletableFutureUtils.anyOf(A, B, C) completed: {}", res); + return res; + }) + .get(); + } + + @Test + public void testAnyOfBoolean_filterVal() throws ExecutionException, InterruptedException { + boolean expectedValue = true; + + // Completes when C completes (C = first future that completes with true) + CompletableFutureUtils.anyOfBooleanMatchingFilterOrAll( + expectedValue, + createCompletableFutureBool(1000, "A", false), + createCompletableFutureBool(2000, "B", false), + createCompletableFutureBool(3000, "C", true) + ) + .thenApply(res -> { + log.info("CompletableFutureUtils.anyOfBooleanFiltered(A, B, C) completed: {}", res); + return res; + }) + .get(); + + // Completes when B completes (B = first future that completes with true) + CompletableFutureUtils.anyOfBooleanMatchingFilterOrAll( + expectedValue, + createCompletableFutureBool(1000, "A", false), + createCompletableFutureBool(2000, "B", true), + createCompletableFutureBool(3000, "C", true) + ) + .thenApply(res -> { + log.info("CompletableFutureUtils.anyOfBooleanFiltered(A, B, C) completed: {}", res); + return res; + }) + .get(); + } + + private CompletableFuture createCompletableFutureBool(long sleepMs, String msg, boolean val) { + return CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(sleepMs); + log.info("{} (waited {} ms: {})", msg, sleepMs, val); + return val; + } catch (InterruptedException e) { + log.error("Interrupted: " + e.getMessage(), e); + return false; + } + }); + } + + private CompletableFuture createCompletableFuture(long sleepMs, String msg) { + return CompletableFuture.runAsync(() -> { + try { + Thread.sleep(sleepMs); + log.info("{} (waited {} ms)", msg, sleepMs); + } catch (InterruptedException e) { + log.error("Interrupted: " + e.getMessage(), e); + } +// return null; + }); + } + +} From 1ff0caebb2f187ce500932a6dd8b330fb019f9e8 Mon Sep 17 00:00:00 2001 From: alkum Date: Mon, 28 Mar 2022 22:19:11 +0200 Subject: [PATCH 2/2] Extend PeerExchangeService with optimistic peer exchange Extend the peer exchange logic with ability to complete future when first peer exchange succeeds (optimistic scenario). Alternatively, wait until all peer connections have been tried. --- .../peergroup/exchange/PeerExchangeService.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java b/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java index 11902d2fd8..e411d7c3a3 100644 --- a/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java +++ b/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java @@ -77,12 +77,14 @@ private CompletableFuture doPeerExchange(List
candidates) { } log.info("Node {} starts peer exchange with: {}", node, StringUtils.truncate(candidates.stream() - .map(Address::toString) - .collect(Collectors.toList()).toString())); + .map(Address::toString).toList().toString())); List> allFutures = candidates.stream() .map(this::doPeerExchangeAsync) .collect(Collectors.toList()); - return CompletableFutureUtils.allOf(allFutures) + + // When ALL futures complete (successfully or not), + // then consider peer exchange complete and decide whether it should be re-done, in case of too few peers + CompletableFutureUtils.allOf(allFutures) .thenApply(resultList -> { int numSuccess = (int) resultList.stream().filter(e -> e).count(); log.info("Node {} completed peer exchange to {} candidates. {} requests successfully completed.", @@ -100,6 +102,14 @@ private CompletableFuture doPeerExchange(List
candidates) { } return null; }); + + // Complete when the first future completes (the first peer exchange succeeds), or when all futures fail + // This helps the initialization phase (and subsequent peer exchanges) complete as soon peers have been exchanged with at least one peer + return CompletableFutureUtils.anyOfBooleanMatchingFilterOrAll(true, allFutures) + .thenApply(result -> { + log.info("Node {} completed peer exchange to at least one candidate", node); + return null; + }); } private CompletableFuture doPeerExchangeAsync(Address peerAddress) {