Skip to content

Commit

Permalink
Merge pull request #171 from alkum/i2p-fix-client-needs-2-seeds-online
Browse files Browse the repository at this point in the history
Extend PeerExchangeService with optimistic peer exchange
  • Loading branch information
chimp1984 authored Mar 29, 2022
2 parents 6e4c572 + 1ff0cae commit 385aaea
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 5 deletions.
47 changes: 45 additions & 2 deletions common/src/main/java/bisq/common/util/CompletableFutureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,17 @@ public static <T> CompletableFuture<List<T>> allOf(CompletableFuture<T>... list)
);
}

public static <T> CompletableFuture<List<T>> anyOf(Collection<CompletableFuture<T>> collection) {
public static CompletableFuture<Boolean> anyOfBooleanMatchingFilterOrAll(boolean filter, Collection<CompletableFuture<Boolean>> collection) {
//noinspection unchecked
return anyOfBooleanMatchingFilterOrAll(filter, collection.toArray(new CompletableFuture[0]));
}

public static <T> CompletableFuture<T> anyOf(Collection<CompletableFuture<T>> collection) {
//noinspection unchecked
return anyOf(collection.toArray(new CompletableFuture[0]));
}

public static <T> CompletableFuture<List<T>> anyOf(Stream<CompletableFuture<T>> collection) {
public static <T> CompletableFuture<T> anyOf(Stream<CompletableFuture<T>> collection) {
return anyOf(collection.collect(Collectors.toList()));
}

Expand All @@ -66,6 +71,44 @@ public static <T> CompletableFuture<T> anyOf(CompletableFuture<T>... list) {
.orElseThrow());
}

/**
* Completes on any one of the following conditions:
* <br/>
* a) ANY of the CompletableFutures completes with the given filter, or
* <br/>
* b) ALL CompletableFutures complete
* <br/><br/>
*
* Useful in situations such as
* <br/>
* "complete when any boolean future in the list completes with true, else complete when all complete with false"
*
* @param filter
* @param list
* @return
*
* @see "https://stackoverflow.com/a/58999999"
*/
@SafeVarargs
public static CompletableFuture<Boolean> anyOfBooleanMatchingFilterOrAll(
boolean filter, CompletableFuture<Boolean>... list) {
CompletableFuture<Boolean> allWithFailFast = CompletableFuture
.allOf(list)
.thenApply(__ -> {
Stream.of(list)
.map(CompletableFuture::join);
return filter;
}
);
Stream.of(list)
.forEach(f -> f.thenAccept(result -> {
if (result == filter) {
allWithFailFast.complete(result);
}
}));
return allWithFailFast;
}

// CompletableFuture.applyToEither has some undesired error handling behavior (if first fail result fails).
// This method provides the expected behaviour that if one of the 2 futures completes we complete our
// result future. If both fail the result fail as well.
Expand Down
116 changes: 116 additions & 0 deletions common/src/test/java/bisq/common/util/CompletableFutureUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package bisq.common.util;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Slf4j
public class CompletableFutureUtilsTest {

@Test
public void testAllOf() throws ExecutionException, InterruptedException {

CompletableFuture<Void> cfA = createCompletableFuture(1000, "A");
CompletableFuture<Void> cfB = createCompletableFuture(2000, "B");
CompletableFuture<Void> cfC = createCompletableFuture(3000, "C");

CompletableFutureUtils.allOf(cfA, cfB, cfC)
.thenApply(res -> {
log.info("CompletableFutureUtils.allOf(A, B, C) completed");
return null;
})
.get();
}

@Test
public void testAnyOf_allSucceed() throws ExecutionException, InterruptedException {

CompletableFuture<Void> cfA = createCompletableFuture(1000, "A");
CompletableFuture<Void> cfB = createCompletableFuture(2000, "B");
CompletableFuture<Void> cfC = createCompletableFuture(3000, "C");

// Completes as soon as the fastest future in the args
CompletableFutureUtils.anyOf(cfA, cfB, cfC)
.thenApply(res -> {
log.info("CompletableFutureUtils.anyOf(A, B, C) completed");
return null;
})
.get();
}

@Test
public void testAnyOfBoolean() throws ExecutionException, InterruptedException {

CompletableFuture<Boolean> cfA = createCompletableFutureBool(1000, "A", false);
CompletableFuture<Boolean> cfB = createCompletableFutureBool(2000, "B", true);
CompletableFuture<Boolean> cfC = createCompletableFutureBool(3000, "C", true);

// Completes as soon as the fastest future in the args
// Has the value returned by the first future that completes
CompletableFutureUtils.anyOf(cfA, cfB, cfC)
.thenApply(res -> {
log.info("CompletableFutureUtils.anyOf(A, B, C) completed: {}", res);
return res;
})
.get();
}

@Test
public void testAnyOfBoolean_filterVal() throws ExecutionException, InterruptedException {
boolean expectedValue = true;

// Completes when C completes (C = first future that completes with true)
CompletableFutureUtils.anyOfBooleanMatchingFilterOrAll(
expectedValue,
createCompletableFutureBool(1000, "A", false),
createCompletableFutureBool(2000, "B", false),
createCompletableFutureBool(3000, "C", true)
)
.thenApply(res -> {
log.info("CompletableFutureUtils.anyOfBooleanFiltered(A, B, C) completed: {}", res);
return res;
})
.get();

// Completes when B completes (B = first future that completes with true)
CompletableFutureUtils.anyOfBooleanMatchingFilterOrAll(
expectedValue,
createCompletableFutureBool(1000, "A", false),
createCompletableFutureBool(2000, "B", true),
createCompletableFutureBool(3000, "C", true)
)
.thenApply(res -> {
log.info("CompletableFutureUtils.anyOfBooleanFiltered(A, B, C) completed: {}", res);
return res;
})
.get();
}

private CompletableFuture<Boolean> createCompletableFutureBool(long sleepMs, String msg, boolean val) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(sleepMs);
log.info("{} (waited {} ms: {})", msg, sleepMs, val);
return val;
} catch (InterruptedException e) {
log.error("Interrupted: " + e.getMessage(), e);
return false;
}
});
}

private CompletableFuture<Void> createCompletableFuture(long sleepMs, String msg) {
return CompletableFuture.runAsync(() -> {
try {
Thread.sleep(sleepMs);
log.info("{} (waited {} ms)", msg, sleepMs);
} catch (InterruptedException e) {
log.error("Interrupted: " + e.getMessage(), e);
}
// return null;
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,14 @@ private CompletableFuture<Void> doPeerExchange(List<Address> candidates) {
}
log.info("Node {} starts peer exchange with: {}", node,
StringUtils.truncate(candidates.stream()
.map(Address::toString)
.collect(Collectors.toList()).toString()));
.map(Address::toString).toList().toString()));
List<CompletableFuture<Boolean>> allFutures = candidates.stream()
.map(this::doPeerExchangeAsync)
.collect(Collectors.toList());
return CompletableFutureUtils.allOf(allFutures)

// When ALL futures complete (successfully or not),
// then consider peer exchange complete and decide whether it should be re-done, in case of too few peers
CompletableFutureUtils.allOf(allFutures)
.thenApply(resultList -> {
int numSuccess = (int) resultList.stream().filter(e -> e).count();
log.info("Node {} completed peer exchange to {} candidates. {} requests successfully completed.",
Expand All @@ -100,6 +102,14 @@ private CompletableFuture<Void> doPeerExchange(List<Address> candidates) {
}
return null;
});

// Complete when the first future completes (the first peer exchange succeeds), or when all futures fail
// This helps the initialization phase (and subsequent peer exchanges) complete as soon peers have been exchanged with at least one peer
return CompletableFutureUtils.anyOfBooleanMatchingFilterOrAll(true, allFutures)
.thenApply(result -> {
log.info("Node {} completed peer exchange to at least one candidate", node);
return null;
});
}

private CompletableFuture<Boolean> doPeerExchangeAsync(Address peerAddress) {
Expand Down

0 comments on commit 385aaea

Please sign in to comment.