Skip to content

Commit

Permalink
Merge pull request #1873 from HenrikJannsen/Improve-peer-management
Browse files Browse the repository at this point in the history
Improve peer management
  • Loading branch information
alvasw authored Mar 23, 2024
2 parents a994449 + 538b04a commit dc691db
Show file tree
Hide file tree
Showing 22 changed files with 266 additions and 128 deletions.
1 change: 1 addition & 0 deletions apps/desktop/desktop-app/src/main/resources/desktop.conf
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ application {

peerGroup {
minNumConnectedPeers=8
minNumOutboundConnectedPeers=3
maxNumConnectedPeers=12
minNumReportedPeers=1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ application {

peerGroup {
minNumConnectedPeers=8
minNumOutboundConnectedPeers=3
maxNumConnectedPeers=12
minNumReportedPeers=1
}
Expand Down
1 change: 1 addition & 0 deletions apps/rest-api-app/src/main/resources/rest_api.conf
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ application {

peerGroup {
minNumConnectedPeers=8
minNumOutboundConnectedPeers=3
maxNumConnectedPeers=12
minNumReportedPeers=1
}
Expand Down
3 changes: 2 additions & 1 deletion apps/seed-node-app/src/main/resources/seed_node.conf
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ application {
}

peerGroup {
minNumConnectedPeers=8
minNumConnectedPeers=12
minNumOutboundConnectedPeers=4
maxNumConnectedPeers=20
minNumReportedPeers=1
}
Expand Down
1 change: 1 addition & 0 deletions network/network/src/integrationTest/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ bisq {
# Apply to i2p and tor services.
peerGroupConfig {
minNumConnectedPeers=8
minNumOutboundConnectedPeers=3
maxNumConnectedPeers=12
minNumReportedPeers=1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.util.Comparator;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;
Expand All @@ -52,6 +54,14 @@
*/
@Slf4j
public abstract class Connection {
public static Comparator<Connection> comparingDateDescending() {
return comparingDate().reversed();
}

public static Comparator<Connection> comparingDate() {
return Comparator.comparingLong(Connection::getCreated);
}

protected interface Handler {
void handleNetworkMessage(EnvelopePayloadMessage envelopePayloadMessage,
AuthorizationToken authorizationToken,
Expand Down Expand Up @@ -168,9 +178,20 @@ public boolean isRunning() {
return !isStopped();
}

public long getCreated() {
return getConnectionMetrics().getCreated();
}

public Date getCreationDate() {
return getConnectionMetrics().getCreationDate();
}

public boolean createdBefore(long date) {
return getCreated() < date;
}
@Override
public String toString() {
return "'" + getClass().getSimpleName() + " [peerAddress=" + getPeersCapability().getAddress() +
return "'" + getClass().getSimpleName() + " [peerAddress=" + getPeerAddress() +
", keyId=" + getId() + "]'";
}

Expand Down Expand Up @@ -277,7 +298,7 @@ boolean isStopped() {
///////////////////////////////////////////////////////////////////////////////////////////////////

private String getThreadNameId() {
return StringUtils.truncate(getPeersCapability().getAddress().toString() + "-" + id.substring(0, 8));
return StringUtils.truncate(getPeerAddress().toString() + "-" + id.substring(0, 8));
}

private boolean isInputStreamActive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void handleInboundConnection(SocketChannel socketChannel) {
peerAddress, myCapability.getAddress());

if (isAlreadyConnectedToPeer(peerAddress)) {
log.warn("Node {} have already an InboundConnection from {}. This can happen when a " + "handshake was in progress while we received a new connection from that address. " + "We will close the socket of that new connection and use the existing instead.", this, peerAddress);
log.warn("{} have already an InboundConnection from {}. This can happen when a " + "handshake was in progress while we received a new connection from that address. " + "We will close the socket of that new connection and use the existing instead.", this, peerAddress);
closeChannel(networkEnvelopeSocketChannel);
} else {
connectionByChannel.put(socketChannel, inboundConnection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class ConnectionMetrics {
private final List<Long> rrtList = new CopyOnWriteArrayList<>();

public ConnectionMetrics() {
created = new Date().getTime();
created = System.currentTimeMillis();
}

public Date getCreationDate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public CompletableFuture<BroadcastResult> doBroadcast(BroadcastMessage broadcast
allConnections.stream()
.limit(numBroadcasts)
.forEach(connection -> {
log.debug("Node {} broadcast to {}", node, connection.getPeerAddress());
log.debug("{} broadcast to {}", node, connection.getPeerAddress());
try {
node.send(broadcastMessage, connection);
numSuccess.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void onNetworkMessage(EnvelopePayloadMessage envelopePayloadMessage) {
connection.getConnectionMetrics().addRtt(System.currentTimeMillis() - ts);
future.complete(response.getInventory());
} else {
log.warn("Node {} received InventoryResponse from {} with invalid nonce {}. Request nonce was {}. Connection={}",
log.warn("{} received InventoryResponse from {} with invalid nonce {}. Request nonce was {}. Connection={}",
node, connection.getPeerAddress(), response.getRequestNonce(), nonce, connection.getId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -150,7 +148,12 @@ private void requestInventory() {
.whenComplete((list, throwable) -> {
requestsPending.set(false);
if (throwable != null) {
log.error("requestFromPeers failed", throwable);
if (throwable instanceof CompletionException &&
throwable.getCause() instanceof CancellationException) {
log.debug("requestFromPeers failed", throwable);
} else {
log.error("requestFromPeers failed", throwable);
}
} else if (list == null) {
log.error("requestFromPeers completed with result list = null");
} else {
Expand Down Expand Up @@ -213,7 +216,11 @@ private List<CompletableFuture<Inventory>> requestFromPeers() {
});
}
if (throwable != null) {
log.warn("Inventory request failed.", throwable);
if (throwable instanceof CancellationException) {
log.debug("Inventory request failed.", throwable);
} else {
log.info("Inventory request failed.", throwable);
}
}
});
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,35 @@
import javax.annotation.Nonnull;
import java.util.Date;

/**
* We use only the address for EqualsAndHashCode as unique identifier as the other data can be
* updated and we do not consider those updates as a new peer instance.
*/
@Getter
@ToString
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public final class Peer implements NetworkProto, Comparable<Peer> {
@EqualsAndHashCode.Include
private final Address address;

private final Capability capability;
private final NetworkLoad networkLoad;
private final boolean isOutboundConnection;
private final long created;


public Peer(Capability capability, NetworkLoad networkLoad, boolean isOutboundConnection) {
this(capability, networkLoad, isOutboundConnection, System.currentTimeMillis());
}

public Peer(Capability capability, NetworkLoad networkLoad, boolean isOutboundConnection, long created) {
private Peer(Capability capability, NetworkLoad networkLoad, boolean isOutboundConnection, long created) {
this.capability = capability;
this.networkLoad = networkLoad;
this.isOutboundConnection = isOutboundConnection;
this.created = created;

address = capability.getAddress();

verify();
}

Expand Down
Loading

0 comments on commit dc691db

Please sign in to comment.