Skip to content

Commit

Permalink
Extend PeerExchangeService with optimistic peer exchange
Browse files Browse the repository at this point in the history
Extend the peer exchange logic with ability to complete future when first peer exchange succeeds (optimistic scenario). Alternatively, wait until all peer connections have been tried.
  • Loading branch information
alkum committed Mar 28, 2022
1 parent 36cfb13 commit 1ff0cae
Showing 1 changed file with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,14 @@ private CompletableFuture<Void> doPeerExchange(List<Address> candidates) {
}
log.info("Node {} starts peer exchange with: {}", node,
StringUtils.truncate(candidates.stream()
.map(Address::toString)
.collect(Collectors.toList()).toString()));
.map(Address::toString).toList().toString()));
List<CompletableFuture<Boolean>> allFutures = candidates.stream()
.map(this::doPeerExchangeAsync)
.collect(Collectors.toList());
return CompletableFutureUtils.allOf(allFutures)

// When ALL futures complete (successfully or not),
// then consider peer exchange complete and decide whether it should be re-done, in case of too few peers
CompletableFutureUtils.allOf(allFutures)
.thenApply(resultList -> {
int numSuccess = (int) resultList.stream().filter(e -> e).count();
log.info("Node {} completed peer exchange to {} candidates. {} requests successfully completed.",
Expand All @@ -100,6 +102,14 @@ private CompletableFuture<Void> doPeerExchange(List<Address> candidates) {
}
return null;
});

// Complete when the first future completes (the first peer exchange succeeds), or when all futures fail
// This helps the initialization phase (and subsequent peer exchanges) complete as soon peers have been exchanged with at least one peer
return CompletableFutureUtils.anyOfBooleanMatchingFilterOrAll(true, allFutures)
.thenApply(result -> {
log.info("Node {} completed peer exchange to at least one candidate", node);
return null;
});
}

private CompletableFuture<Boolean> doPeerExchangeAsync(Address peerAddress) {
Expand Down

0 comments on commit 1ff0cae

Please sign in to comment.