From fddede1d37d54691286b57396b076df6b205ec98 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 10:07:27 +0700 Subject: [PATCH 01/19] Increase minNumConnectedPeers to 12 for seed node --- apps/seed-node-app/src/main/resources/seed_node.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/seed-node-app/src/main/resources/seed_node.conf b/apps/seed-node-app/src/main/resources/seed_node.conf index d887feb6d8..d0d77a11b2 100644 --- a/apps/seed-node-app/src/main/resources/seed_node.conf +++ b/apps/seed-node-app/src/main/resources/seed_node.conf @@ -75,7 +75,7 @@ application { } peerGroup { - minNumConnectedPeers=8 + minNumConnectedPeers=12 maxNumConnectedPeers=20 minNumReportedPeers=1 } From 5cebdd204429b9cbf30be9a925224cae49d8101d Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 10:08:45 +0700 Subject: [PATCH 02/19] Add minNumOutboundConnectedPeers config item --- apps/desktop/desktop-app/src/main/resources/desktop.conf | 1 + apps/rest-api-app/src/main/resources/rest_api.conf | 1 + apps/seed-node-app/src/main/resources/seed_node.conf | 1 + network/network/src/integrationTest/resources/test.conf | 1 + .../network/p2p/services/peergroup/PeerGroupService.java | 9 ++++++--- 5 files changed, 10 insertions(+), 3 deletions(-) diff --git a/apps/desktop/desktop-app/src/main/resources/desktop.conf b/apps/desktop/desktop-app/src/main/resources/desktop.conf index f30201f52c..f7995658ea 100644 --- a/apps/desktop/desktop-app/src/main/resources/desktop.conf +++ b/apps/desktop/desktop-app/src/main/resources/desktop.conf @@ -94,6 +94,7 @@ application { peerGroup { minNumConnectedPeers=8 + minNumOutboundConnectedPeers=3 maxNumConnectedPeers=12 minNumReportedPeers=1 } diff --git a/apps/rest-api-app/src/main/resources/rest_api.conf b/apps/rest-api-app/src/main/resources/rest_api.conf index d16f172314..2f5372fb5c 100644 --- a/apps/rest-api-app/src/main/resources/rest_api.conf +++ b/apps/rest-api-app/src/main/resources/rest_api.conf @@ -94,6 +94,7 @@ application { peerGroup { minNumConnectedPeers=8 + minNumOutboundConnectedPeers=3 maxNumConnectedPeers=12 minNumReportedPeers=1 } diff --git a/apps/seed-node-app/src/main/resources/seed_node.conf b/apps/seed-node-app/src/main/resources/seed_node.conf index d0d77a11b2..8172725af0 100644 --- a/apps/seed-node-app/src/main/resources/seed_node.conf +++ b/apps/seed-node-app/src/main/resources/seed_node.conf @@ -76,6 +76,7 @@ application { peerGroup { minNumConnectedPeers=12 + minNumOutboundConnectedPeers=4 maxNumConnectedPeers=20 minNumReportedPeers=1 } diff --git a/network/network/src/integrationTest/resources/test.conf b/network/network/src/integrationTest/resources/test.conf index 30055ba5a2..08e893fae7 100644 --- a/network/network/src/integrationTest/resources/test.conf +++ b/network/network/src/integrationTest/resources/test.conf @@ -34,6 +34,7 @@ bisq { # Apply to i2p and tor services. peerGroupConfig { minNumConnectedPeers=8 + minNumOutboundConnectedPeers=3 maxNumConnectedPeers=12 minNumReportedPeers=1 } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java index ff0da854d9..5173736e1f 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java @@ -17,7 +17,6 @@ package bisq.network.p2p.services.peergroup; -import bisq.common.util.MathUtils; import bisq.network.common.Address; import bisq.network.common.TransportType; import bisq.network.p2p.node.Connection; @@ -44,17 +43,20 @@ public class PeerGroupService implements PersistenceClient { @Getter public static class Config { private final int minNumConnectedPeers; + private final int minNumOutboundConnectedPeers; private final int maxNumConnectedPeers; private final int minNumReportedPeers; public Config() { - this(8, 12, 1); + this(8, 3, 12, 1); } public Config(int minNumConnectedPeers, + int minNumOutboundConnectedPeers, int maxNumConnectedPeers, int minNumReportedPeers) { this.minNumConnectedPeers = minNumConnectedPeers; + this.minNumOutboundConnectedPeers = minNumOutboundConnectedPeers; this.maxNumConnectedPeers = maxNumConnectedPeers; this.minNumReportedPeers = minNumReportedPeers; } @@ -62,6 +64,7 @@ public Config(int minNumConnectedPeers, public static Config from(com.typesafe.config.Config typesafeConfig) { return new PeerGroupService.Config( typesafeConfig.getInt("minNumConnectedPeers"), + typesafeConfig.getInt("minNumOutboundConnectedPeers"), typesafeConfig.getInt("maxNumConnectedPeers"), typesafeConfig.getInt("minNumReportedPeers")); } @@ -136,7 +139,7 @@ public boolean isSeed(Connection connection) { } public int getMinOutboundConnections() { - return MathUtils.roundDoubleToInt(config.getMinNumConnectedPeers() * 0.4); + return config.getMinNumOutboundConnectedPeers(); } public int getMaxInboundConnections() { From c0595c4b430b6c45b861dd038379347038e69af8 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 10:10:29 +0700 Subject: [PATCH 03/19] Refactor: Remove "Node " from logs where node is logged as we include the string "Node " in the node.toString() method. --- .../p2p/node/InboundConnectionsManager.java | 2 +- .../services/data/broadcast/Broadcaster.java | 2 +- .../data/inventory/InventoryHandler.java | 2 +- .../services/peergroup/PeerGroupManager.java | 20 +++++++++---------- .../exchange/PeerExchangeRequestHandler.java | 8 ++++---- .../exchange/PeerExchangeService.java | 14 ++++++------- .../peergroup/keepalive/KeepAliveHandler.java | 6 +++--- .../peergroup/keepalive/KeepAliveService.java | 4 ++-- .../NetworkLoadExchangeHandler.java | 6 +++--- .../NetworkLoadExchangeService.java | 4 ++-- 10 files changed, 34 insertions(+), 34 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/node/InboundConnectionsManager.java b/network/network/src/main/java/bisq/network/p2p/node/InboundConnectionsManager.java index 00ceb9a66c..38084a78e5 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/InboundConnectionsManager.java +++ b/network/network/src/main/java/bisq/network/p2p/node/InboundConnectionsManager.java @@ -121,7 +121,7 @@ public void handleInboundConnection(SocketChannel socketChannel) { peerAddress, myCapability.getAddress()); if (isAlreadyConnectedToPeer(peerAddress)) { - log.warn("Node {} have already an InboundConnection from {}. This can happen when a " + "handshake was in progress while we received a new connection from that address. " + "We will close the socket of that new connection and use the existing instead.", this, peerAddress); + log.warn("{} have already an InboundConnection from {}. This can happen when a " + "handshake was in progress while we received a new connection from that address. " + "We will close the socket of that new connection and use the existing instead.", this, peerAddress); closeChannel(networkEnvelopeSocketChannel); } else { connectionByChannel.put(socketChannel, inboundConnection); diff --git a/network/network/src/main/java/bisq/network/p2p/services/data/broadcast/Broadcaster.java b/network/network/src/main/java/bisq/network/p2p/services/data/broadcast/Broadcaster.java index 1e4a031f09..ad6eaa93a3 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/data/broadcast/Broadcaster.java +++ b/network/network/src/main/java/bisq/network/p2p/services/data/broadcast/Broadcaster.java @@ -88,7 +88,7 @@ public CompletableFuture doBroadcast(BroadcastMessage broadcast allConnections.stream() .limit(numBroadcasts) .forEach(connection -> { - log.debug("Node {} broadcast to {}", node, connection.getPeerAddress()); + log.debug("{} broadcast to {}", node, connection.getPeerAddress()); try { node.send(broadcastMessage, connection); numSuccess.incrementAndGet(); diff --git a/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryHandler.java b/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryHandler.java index 4a638805c5..65e16ff976 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryHandler.java +++ b/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryHandler.java @@ -78,7 +78,7 @@ public void onNetworkMessage(EnvelopePayloadMessage envelopePayloadMessage) { connection.getConnectionMetrics().addRtt(System.currentTimeMillis() - ts); future.complete(response.getInventory()); } else { - log.warn("Node {} received InventoryResponse from {} with invalid nonce {}. Request nonce was {}. Connection={}", + log.warn("{} received InventoryResponse from {} with invalid nonce {}. Request nonce was {}. Connection={}", node, connection.getPeerAddress(), response.getRequestNonce(), nonce, connection.getId()); } } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java index bf2d4565a6..87493cd4d1 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java @@ -174,7 +174,7 @@ public void shutdown() { } private void doInitialize() { - log.info("Node {} called initialize", node); + log.info("{} called initialize", node); String nodeInfo = node.getNodeInfo(); State state = getState().get(); switch (state) { @@ -223,7 +223,7 @@ public void removeSeedNodeAddress(Address seedNodeAddress) { /////////////////////////////////////////////////////////////////////////////////////////////////// private void doHouseKeeping() { - log.debug("Node {} called runBlockingTasks", node); + log.debug("{} called runBlockingTasks", node); try { closeBanned(); maybeCloseDuplicateConnections(); @@ -244,7 +244,7 @@ private void doHouseKeeping() { } private void closeBanned() { - log.debug("Node {} called closeBanned", node); + log.debug("{} called closeBanned", node); node.getAllActiveConnections() .filter(Connection::isRunning) .filter(connection -> banList.isBanned(connection.getPeerAddress())) @@ -256,7 +256,7 @@ private void closeBanned() { * Remove duplicate connections (inbound connections which have an outbound connection with the same address) */ private void maybeCloseDuplicateConnections() { - log.debug("Node {} called maybeCloseDuplicateConnections", node); + log.debug("{} called maybeCloseDuplicateConnections", node); Set
outboundAddresses = node.getActiveOutboundConnections() .map(Connection::getPeerAddress) .collect(Collectors.toSet()); @@ -270,7 +270,7 @@ private void maybeCloseDuplicateConnections() { } private void maybeCloseConnectionsToSeeds() { - log.debug("Node {} called maybeCloseConnectionsToSeeds", node); + log.debug("{} called maybeCloseConnectionsToSeeds", node); Comparator comparator = peerGroupService.getConnectionAgeComparator().reversed(); // reversed as we use skip node.getAllActiveConnections() .filter(this::mayDisconnect) @@ -284,7 +284,7 @@ private void maybeCloseConnectionsToSeeds() { } private void maybeCloseAgedConnections() { - log.debug("Node {} called maybeCloseAgedConnections", node); + log.debug("{} called maybeCloseAgedConnections", node); node.getAllActiveConnections() .filter(this::mayDisconnect) .filter(connection -> connection.getConnectionMetrics().getAge() > config.getMaxAge()) @@ -296,7 +296,7 @@ private void maybeCloseAgedConnections() { } private void maybeCloseExceedingInboundConnections() { - log.debug("Node {} called maybeCloseExceedingInboundConnections", node); + log.debug("{} called maybeCloseExceedingInboundConnections", node); Comparator comparator = peerGroupService.getConnectionAgeComparator().reversed(); node.getActiveInboundConnections() .filter(this::mayDisconnect) @@ -309,7 +309,7 @@ private void maybeCloseExceedingInboundConnections() { } private void maybeCloseExceedingConnections() { - log.debug("Node {} called maybeCloseExceedingConnections", node); + log.debug("{} called maybeCloseExceedingConnections", node); Comparator comparator = peerGroupService.getConnectionAgeComparator().reversed(); node.getAllActiveConnections() .filter(this::mayDisconnect) @@ -322,13 +322,13 @@ private void maybeCloseExceedingConnections() { } private void maybeCreateConnections() { - log.debug("Node {} called maybeCreateConnections", node); + log.debug("{} called maybeCreateConnections", node); int minNumConnectedPeers = peerGroupService.getMinNumConnectedPeers(); // We want to have at least 40% of our minNumConnectedPeers as outbound connections if (getMissingOutboundConnections() <= 0) { // We have enough outbound connections, lets check if we have sufficient connections in total if (node.getNumConnections() >= minNumConnectedPeers) { - log.debug("Node {} has sufficient connections", node); + log.debug("{} has sufficient connections", node); CompletableFuture.completedFuture(null); return; } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeRequestHandler.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeRequestHandler.java index 060b4589e8..a5822964f9 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeRequestHandler.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeRequestHandler.java @@ -48,7 +48,7 @@ class PeerExchangeRequestHandler implements Connection.Listener { } CompletableFuture> request(Set peersForPeerExchange) { - log.debug("Node {} send PeerExchangeRequest to {} with {} peers", + log.debug("{} send PeerExchangeRequest to {} with {} peers", node, connection.getPeerAddress(), peersForPeerExchange.size()); ts = System.currentTimeMillis(); try { @@ -69,15 +69,15 @@ public void onNetworkMessage(EnvelopePayloadMessage envelopePayloadMessage) { /* String addresses = StringUtils.truncate(response.peers().stream() .map(peer -> peer.getAddress().toString()) .collect(Collectors.toList()).toString()); - log.debug("Node {} received PeerExchangeResponse from {} with {}", + log.debug("{} received PeerExchangeResponse from {} with {}", node, connection.getPeerAddress(), addresses);*/ - log.info("Node {} received PeerExchangeResponse from {} with {} peers", + log.info("{} received PeerExchangeResponse from {} with {} peers", node, connection.getPeerAddress(), response.getPeers().size()); connection.getConnectionMetrics().addRtt(System.currentTimeMillis() - ts); removeListeners(); future.complete(new HashSet<>(response.getPeers())); } else { - log.warn("Node {} received a PeerExchangeResponse from {} with an invalid nonce. response.nonce()={}, nonce={}", + log.warn("{} received a PeerExchangeResponse from {} with an invalid nonce. response.nonce()={}, nonce={}", node, connection.getPeerAddress(), response.getNonce(), nonce); } } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java index 1fc8c49b52..237408c061 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java @@ -77,12 +77,12 @@ public void shutdown() { public void onMessage(EnvelopePayloadMessage envelopePayloadMessage, Connection connection, NetworkId networkId) { if (envelopePayloadMessage instanceof PeerExchangeRequest) { PeerExchangeRequest request = (PeerExchangeRequest) envelopePayloadMessage; - //log.debug("Node {} received PeerExchangeRequest with myPeers {}", node, request.peers()); + //log.debug("{} received PeerExchangeRequest with myPeers {}", node, request.peers()); Address peerAddress = connection.getPeerAddress(); List myPeers = new ArrayList<>(peerExchangeStrategy.getPeersForReporting(peerAddress)); peerExchangeStrategy.addReportedPeers(new HashSet<>(request.getPeers()), peerAddress); NETWORK_IO_POOL.submit(() -> node.send(new PeerExchangeResponse(request.getNonce(), myPeers), connection)); - log.debug("Node {} sent PeerExchangeResponse with my myPeers {}", node, myPeers); + log.debug("{} sent PeerExchangeResponse with my myPeers {}", node, myPeers); } } @@ -126,7 +126,7 @@ private CompletableFuture doPeerExchange(List
candidates) { "was not completed")); } - log.info("Node {} starts peer exchange with: {}", node, + log.info("{} starts peer exchange with: {}", node, StringUtils.truncate(candidates.stream() .map(Address::toString) .collect(Collectors.toList()) @@ -157,10 +157,10 @@ private CompletableFuture doPeerExchange(List
candidates) { } if (numFailures.get() + numSuccess.get() == candidates.size()) { - log.info("Node {} completed peer exchange to {} candidates. {} requests successfully completed.", + log.info("{} completed peer exchange to {} candidates. {} requests successfully completed.", node, candidates.size(), numSuccess); if (peerExchangeStrategy.shouldRedoInitialPeerExchange(numSuccess.get(), candidates.size())) { - log.info("Node {} repeats the initial peer exchange after {} sec as it has not reached sufficient connections " + + log.info("{} repeats the initial peer exchange after {} sec as it has not reached sufficient connections " + "or received sufficient peers", node, doInitialPeerExchangeDelaySec); scheduler.ifPresent(Scheduler::stop); scheduler = Optional.of(Scheduler.run(this::startInitialPeerExchange) @@ -204,7 +204,7 @@ private boolean doPeerExchange(Address peerAddress) { // We request and wait for response Set reportedPeers = handler.request(myPeers).join(); - log.info("Node {} completed peer exchange with {} and received {} reportedPeers.", + log.info("{} completed peer exchange with {} and received {} reportedPeers.", node, peerAddress, reportedPeers.size()); peerExchangeStrategy.addReportedPeers(reportedPeers, peerAddress); requestHandlerMap.remove(key); @@ -216,7 +216,7 @@ private boolean doPeerExchange(Address peerAddress) { requestHandlerMap.remove(key); } } - log.debug("Node {} failed to do a peer exchange with {}.", + log.debug("{} failed to do a peer exchange with {}.", node, peerAddress, throwable); return false; } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/keepalive/KeepAliveHandler.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/keepalive/KeepAliveHandler.java index 68012851a3..886751a3a2 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/keepalive/KeepAliveHandler.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/keepalive/KeepAliveHandler.java @@ -48,7 +48,7 @@ class KeepAliveHandler implements Connection.Listener { } CompletableFuture request() { - log.info("Node {} send Ping to {} with nonce {}. Connection={}", + log.info("{} send Ping to {} with nonce {}. Connection={}", node, connection.getPeerAddress(), nonce, connection.getId()); ts = System.currentTimeMillis(); supplyAsync(() -> node.send(new Ping(nonce), connection), NetworkService.NETWORK_IO_POOL) @@ -66,13 +66,13 @@ public void onNetworkMessage(EnvelopePayloadMessage envelopePayloadMessage) { if (envelopePayloadMessage instanceof Pong) { Pong pong = (Pong) envelopePayloadMessage; if (pong.getRequestNonce() == nonce) { - log.info("Node {} received Pong from {} with nonce {}. Connection={}", + log.info("{} received Pong from {} with nonce {}. Connection={}", node, connection.getPeerAddress(), pong.getRequestNonce(), connection.getId()); removeListeners(); connection.getConnectionMetrics().addRtt(System.currentTimeMillis() - ts); future.complete(null); } else { - log.warn("Node {} received Pong from {} with invalid nonce {}. Request nonce was {}. Connection={}", + log.warn("{} received Pong from {} with invalid nonce {}. Request nonce was {}. Connection={}", node, connection.getPeerAddress(), pong.getRequestNonce(), nonce, connection.getId()); } } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/keepalive/KeepAliveService.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/keepalive/KeepAliveService.java index 1329ec7edf..350ddc8de3 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/keepalive/KeepAliveService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/keepalive/KeepAliveService.java @@ -101,9 +101,9 @@ public void sendPing(Connection connection) { public void onMessage(EnvelopePayloadMessage envelopePayloadMessage, Connection connection, NetworkId networkId) { if (envelopePayloadMessage instanceof Ping) { Ping ping = (Ping) envelopePayloadMessage; - log.debug("Node {} received Ping with nonce {} from {}", node, ping.getNonce(), connection.getPeerAddress()); + log.debug("{} received Ping with nonce {} from {}", node, ping.getNonce(), connection.getPeerAddress()); NetworkService.NETWORK_IO_POOL.submit(() -> node.send(new Pong(ping.getNonce()), connection)); - log.debug("Node {} sent Pong with nonce {} to {}. Connection={}", node, ping.getNonce(), connection.getPeerAddress(), connection.getId()); + log.debug("{} sent Pong with nonce {} to {}. Connection={}", node, ping.getNonce(), connection.getPeerAddress(), connection.getId()); } } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/network_load/NetworkLoadExchangeHandler.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/network_load/NetworkLoadExchangeHandler.java index e0e00dd0f3..d0c2cc543c 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/network_load/NetworkLoadExchangeHandler.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/network_load/NetworkLoadExchangeHandler.java @@ -50,7 +50,7 @@ class NetworkLoadExchangeHandler implements Connection.Listener { CompletableFuture request() { NetworkLoad myNetworkLoad = node.getNetworkLoadSnapshot().getCurrentNetworkLoad(); - log.info("Node {} send NetworkLoadRequest to {} with nonce {} and my networkLoad {}. Connection={}", + log.info("{} send NetworkLoadRequest to {} with nonce {} and my networkLoad {}. Connection={}", node, connection.getPeerAddress(), nonce, myNetworkLoad, connection.getId()); ts = System.currentTimeMillis(); supplyAsync(() -> node.send(new NetworkLoadExchangeRequest(nonce, myNetworkLoad), connection), NetworkService.NETWORK_IO_POOL) @@ -69,14 +69,14 @@ public void onNetworkMessage(EnvelopePayloadMessage envelopePayloadMessage) { NetworkLoadExchangeResponse response = (NetworkLoadExchangeResponse) envelopePayloadMessage; if (response.getRequestNonce() == nonce) { NetworkLoad peersNetworkLoad = response.getNetworkLoad(); - log.info("Node {} received NetworkLoadResponse from {} with nonce {} and peers networkLoad {}. Connection={}", + log.info("{} received NetworkLoadResponse from {} with nonce {} and peers networkLoad {}. Connection={}", node, connection.getPeerAddress(), response.getRequestNonce(), peersNetworkLoad, connection.getId()); removeListeners(); connection.getPeersNetworkLoadSnapshot().updateNetworkLoad(peersNetworkLoad); connection.getConnectionMetrics().addRtt(System.currentTimeMillis() - ts); future.complete(null); } else { - log.warn("Node {} received NetworkLoadResponse from {} with invalid nonce {}. Request nonce was {}. Connection={}", + log.warn("{} received NetworkLoadResponse from {} with invalid nonce {}. Request nonce was {}. Connection={}", node, connection.getPeerAddress(), response.getRequestNonce(), nonce, connection.getId()); } } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/network_load/NetworkLoadExchangeService.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/network_load/NetworkLoadExchangeService.java index cb9811eea4..f8007d9896 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/network_load/NetworkLoadExchangeService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/network_load/NetworkLoadExchangeService.java @@ -94,7 +94,7 @@ public void onMessage(EnvelopePayloadMessage envelopePayloadMessage, Connection if (envelopePayloadMessage instanceof NetworkLoadExchangeRequest) { NetworkLoadExchangeRequest request = (NetworkLoadExchangeRequest) envelopePayloadMessage; NetworkLoad peersNetworkLoad = request.getNetworkLoad(); - log.info("Node {} received NetworkLoadRequest with nonce {} and peers networkLoad {} from {}", + log.info("{} received NetworkLoadRequest with nonce {} and peers networkLoad {} from {}", node, request.getNonce(), peersNetworkLoad, connection.getPeerAddress()); connection.getPeersNetworkLoadSnapshot().updateNetworkLoad(peersNetworkLoad); NetworkLoad myNetworkLoad = node.getNetworkLoadSnapshot().getCurrentNetworkLoad(); @@ -102,7 +102,7 @@ public void onMessage(EnvelopePayloadMessage envelopePayloadMessage, Connection myNetworkLoad); NetworkService.NETWORK_IO_POOL.submit(() -> node.send(response, connection)); - log.info("Node {} sent NetworkLoadResponse with nonce {} and my networkLoad {} to {}. Connection={}", + log.info("{} sent NetworkLoadResponse with nonce {} and my networkLoad {} to {}. Connection={}", node, request.getNonce(), myNetworkLoad, connection.getPeerAddress(), connection.getId()); } } From e1706bfffa5c200f00f53f75e18c659f323d7616 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 10:12:47 +0700 Subject: [PATCH 04/19] Add minNumOutboundConnectedPeers config item --- .../oracle-node-app/src/main/resources/oracle_node.conf | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/oracle-node/oracle-node-app/src/main/resources/oracle_node.conf b/apps/oracle-node/oracle-node-app/src/main/resources/oracle_node.conf index f96df5a6f1..77a577600b 100644 --- a/apps/oracle-node/oracle-node-app/src/main/resources/oracle_node.conf +++ b/apps/oracle-node/oracle-node-app/src/main/resources/oracle_node.conf @@ -98,6 +98,7 @@ application { peerGroup { minNumConnectedPeers=8 + minNumOutboundConnectedPeers=3 maxNumConnectedPeers=12 minNumReportedPeers=1 } From b941860b555fd8f0a49f0e2db3af2f0ea972f9fa Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 10:13:26 +0700 Subject: [PATCH 05/19] Add util methods for Comparator and delegate methods to Connection --- .../bisq/network/p2p/node/Connection.java | 29 +++++++++++++++++-- .../node/network_load/ConnectionMetrics.java | 2 +- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/node/Connection.java b/network/network/src/main/java/bisq/network/p2p/node/Connection.java index e36d982d5f..c40db22ab3 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/Connection.java +++ b/network/network/src/main/java/bisq/network/p2p/node/Connection.java @@ -35,6 +35,8 @@ import java.io.EOFException; import java.io.IOException; import java.net.Socket; +import java.util.Comparator; +import java.util.Date; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Future; @@ -52,6 +54,14 @@ */ @Slf4j public abstract class Connection { + public static Comparator comparingDateDescending() { + return comparingDate().reversed(); + } + + public static Comparator comparingDate() { + return Comparator.comparingLong(Connection::getCreated); + } + protected interface Handler { void handleNetworkMessage(EnvelopePayloadMessage envelopePayloadMessage, AuthorizationToken authorizationToken, @@ -168,9 +178,24 @@ public boolean isRunning() { return !isStopped(); } + public Address getPeersAddress() { + return getPeersCapability().getAddress(); + } + + public long getCreated() { + return getConnectionMetrics().getCreated(); + } + + public Date getCreationDate() { + return getConnectionMetrics().getCreationDate(); + } + + public boolean createdBefore(long date) { + return getCreated() < date; + } @Override public String toString() { - return "'" + getClass().getSimpleName() + " [peerAddress=" + getPeersCapability().getAddress() + + return "'" + getClass().getSimpleName() + " [peerAddress=" + getPeersAddress() + ", keyId=" + getId() + "]'"; } @@ -277,7 +302,7 @@ boolean isStopped() { /////////////////////////////////////////////////////////////////////////////////////////////////// private String getThreadNameId() { - return StringUtils.truncate(getPeersCapability().getAddress().toString() + "-" + id.substring(0, 8)); + return StringUtils.truncate(getPeersAddress().toString() + "-" + id.substring(0, 8)); } private boolean isInputStreamActive() { diff --git a/network/network/src/main/java/bisq/network/p2p/node/network_load/ConnectionMetrics.java b/network/network/src/main/java/bisq/network/p2p/node/network_load/ConnectionMetrics.java index 3363e3860a..69357303ad 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/network_load/ConnectionMetrics.java +++ b/network/network/src/main/java/bisq/network/p2p/node/network_load/ConnectionMetrics.java @@ -46,7 +46,7 @@ public class ConnectionMetrics { private final List rrtList = new CopyOnWriteArrayList<>(); public ConnectionMetrics() { - created = new Date().getTime(); + created = System.currentTimeMillis(); } public Date getCreationDate() { From e09887ed235dddc635d560620ade914c15e208b4 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 10:14:54 +0700 Subject: [PATCH 06/19] Set log level for CancellationException to debug if inventory request failed --- .../services/data/inventory/InventoryRequestService.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryRequestService.java b/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryRequestService.java index d21a5dce46..0ffdc95a5f 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryRequestService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryRequestService.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -213,7 +214,11 @@ private List> requestFromPeers() { }); } if (throwable != null) { - log.warn("Inventory request failed.", throwable); + if (throwable instanceof CancellationException) { + log.debug("Inventory request failed.", throwable); + } else { + log.info("Inventory request failed.", throwable); + } } }); }) From 9e4acd1e8f4132879ce6451886b60499372f3a41 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 10:17:17 +0700 Subject: [PATCH 07/19] Refactor: Rename methods --- .../services/peergroup/PeerGroupManager.java | 18 +++++++++--------- .../exchange/PeerExchangeService.java | 4 ++-- .../exchange/PeerExchangeStrategy.java | 8 ++++---- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java index 87493cd4d1..486bb7af9b 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java @@ -261,7 +261,7 @@ private void maybeCloseDuplicateConnections() { .map(Connection::getPeerAddress) .collect(Collectors.toSet()); node.getActiveInboundConnections() - .filter(this::mayDisconnect) + .filter(this::allowDisconnect) .filter(inbound -> outboundAddresses.contains(inbound.getPeerAddress())) .peek(inbound -> log.info("{} -> {}: Send CloseConnectionMessage as we have an " + "outbound connection with the same address.", @@ -273,7 +273,7 @@ private void maybeCloseConnectionsToSeeds() { log.debug("{} called maybeCloseConnectionsToSeeds", node); Comparator comparator = peerGroupService.getConnectionAgeComparator().reversed(); // reversed as we use skip node.getAllActiveConnections() - .filter(this::mayDisconnect) + .filter(this::allowDisconnect) .filter(peerGroupService::isSeed) .sorted(comparator) .skip(config.getMaxSeeds()) @@ -286,7 +286,7 @@ private void maybeCloseConnectionsToSeeds() { private void maybeCloseAgedConnections() { log.debug("{} called maybeCloseAgedConnections", node); node.getAllActiveConnections() - .filter(this::mayDisconnect) + .filter(this::allowDisconnect) .filter(connection -> connection.getConnectionMetrics().getAge() > config.getMaxAge()) .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as the connection age " + "is too old.", @@ -299,7 +299,7 @@ private void maybeCloseExceedingInboundConnections() { log.debug("{} called maybeCloseExceedingInboundConnections", node); Comparator comparator = peerGroupService.getConnectionAgeComparator().reversed(); node.getActiveInboundConnections() - .filter(this::mayDisconnect) + .filter(this::allowDisconnect) .sorted(comparator) .skip(peerGroupService.getMaxInboundConnections()) .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as we have too many inbound connections.", @@ -312,7 +312,7 @@ private void maybeCloseExceedingConnections() { log.debug("{} called maybeCloseExceedingConnections", node); Comparator comparator = peerGroupService.getConnectionAgeComparator().reversed(); node.getAllActiveConnections() - .filter(this::mayDisconnect) + .filter(this::allowDisconnect) .sorted(comparator) .skip(peerGroupService.getMaxNumConnectedPeers()) .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as we have too many connections.", @@ -338,7 +338,7 @@ private void maybeCreateConnections() { // The calculation how many connections we need is done inside PeerExchangeService/PeerExchangeStrategy log.info("We have not sufficient connections and call peerExchangeService.doFurtherPeerExchange"); // It is an async call. We do not wait for the result. - peerExchangeService.startFurtherPeerExchange(); + peerExchangeService.extendPeerGroup(); } private void maybeRemoveReportedPeers() { @@ -390,11 +390,11 @@ private void setState(PeerGroupManager.State newState) { // Utils /////////////////////////////////////////////////////////////////////////////////////////////////// - private boolean mayDisconnect(Connection connection) { - return notBootstrapping(connection) && connection.isRunning(); + private boolean allowDisconnect(Connection connection) { + return isNotBootstrapping(connection) && connection.isRunning(); } - private boolean notBootstrapping(Connection connection) { + private boolean isNotBootstrapping(Connection connection) { return connection.getConnectionMetrics().getAge() > config.getBootstrapTime(); } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java index 237408c061..473eb9192b 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java @@ -103,8 +103,8 @@ public CompletableFuture startInitialPeerExchange() { return doPeerExchange(peerExchangeStrategy.getAddressesForInitialPeerExchange()).orTimeout(2, MINUTES); } - public void startFurtherPeerExchange() { - doPeerExchange(peerExchangeStrategy.getAddressesForFurtherPeerExchange()).orTimeout(2, MINUTES); + public void extendPeerGroup() { + doPeerExchange(peerExchangeStrategy.getAddressesForExtendingPeerGroup()).orTimeout(2, MINUTES); } /** diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java index 8370642f87..4afeaea9da 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java @@ -96,15 +96,15 @@ List
getAddressesForInitialPeerExchange() { // After bootstrap, we might want to add more connections and use the peer exchange protocol for that. // We do not want to use seed nodes or already existing connections in that case. - List
getAddressesForFurtherPeerExchange() { - List
candidates = getCandidates(getPriorityListForFurtherPeerExchange()); + List
getAddressesForExtendingPeerGroup() { + List
candidates = getCandidates(getPriorityListForExtendingPeerGroup()); if (candidates.isEmpty()) { // It can be that we don't have peers anymore which we have not already connected in the past. // We reset the usedAddresses and try again. It is likely that some peers have different peers to // send now. log.debug("We reset the usedAddresses and try again to connect to peers we tried in the past."); usedAddresses.clear(); - candidates = getCandidates(getPriorityListForFurtherPeerExchange()); + candidates = getCandidates(getPriorityListForExtendingPeerGroup()); } usedAddresses.addAll(candidates); return candidates; @@ -148,7 +148,7 @@ private List
getPriorityListForInitialPeerExchange() { return priorityList; } - private List
getPriorityListForFurtherPeerExchange() { + private List
getPriorityListForExtendingPeerGroup() { List
priorityList = new ArrayList<>(getReportedPeerAddresses()); priorityList.addAll(getPersistedAddresses()); return priorityList; From 8ff949e6eb75eb3c5335aa0268b2f2254079fed2 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 10:57:57 +0700 Subject: [PATCH 08/19] Add isNotOutDated filter --- .../peergroup/exchange/PeerExchangeStrategy.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java index 4afeaea9da..3548d7daaa 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java @@ -112,7 +112,8 @@ List
getAddressesForExtendingPeerGroup() { boolean shouldRedoInitialPeerExchange(int numSuccess, int numRequests) { int numFailed = numRequests - numSuccess; - return numFailed > numRequests / 2 || + int maxFailures = numRequests / 2; + return numFailed > maxFailures || peerGroupService.getAllConnectedPeers(node).count() < peerGroupService.getTargetNumConnectedPeers() || peerGroupService.getReportedPeers().size() < peerGroupService.getMinNumReportedPeers(); } @@ -172,6 +173,7 @@ private List
getReportedPeerAddresses() { private List
getPersistedAddresses() { return peerGroupService.getPersistedPeers().stream() .filter(this::isValidNonSeedPeer) + .filter(this::isNotOutDated) .sorted(Comparator.comparing(Peer::getDate)) .limit(config.getNumPersistedPeersAtBoostrap()) .map(Peer::getAddress) @@ -211,7 +213,7 @@ void addReportedPeers(Set reportedPeers, Address reporterAddress) { Set filtered = reportedPeers.stream() .filter(peer -> notSameAddress(reporterAddress, peer)) .filter(this::isValidNonSeedPeer) - .filter(this::isNotAged) + .filter(this::isNotOutDated) .sorted(Comparator.comparing(Peer::getDate).reversed()) .limit(REPORTED_PEERS_LIMIT) .collect(Collectors.toSet()); @@ -242,7 +244,7 @@ private boolean isValidNonSeedPeer(Peer peer) { return isValidNonSeedPeer(peer.getAddress()); } - private boolean isNotAged(Peer peer) { + private boolean isNotOutDated(Peer peer) { return peer.getAge() < MAX_AGE; } @@ -267,7 +269,7 @@ private Stream getAllConnectedPeers() { private Stream getReportedPeers() { return peerGroupService.getReportedPeers().stream() .filter(this::isValidNonSeedPeer) - .filter(this::isNotAged) + .filter(this::isNotOutDated) .sorted(Comparator.comparing(Peer::getDate).reversed()); } From a2001ee515de6b05d747620d0c31dc8b5d51aae1 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 11:05:15 +0700 Subject: [PATCH 09/19] Add missing reverse to sorting at getPersistedAddresses --- .../p2p/services/peergroup/exchange/PeerExchangeStrategy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java index 3548d7daaa..4f92ba9121 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java @@ -174,7 +174,7 @@ private List
getPersistedAddresses() { return peerGroupService.getPersistedPeers().stream() .filter(this::isValidNonSeedPeer) .filter(this::isNotOutDated) - .sorted(Comparator.comparing(Peer::getDate)) + .sorted(Comparator.comparing(Peer::getDate).reversed()) .limit(config.getNumPersistedPeersAtBoostrap()) .map(Peer::getAddress) .collect(Collectors.toList()); From ce5e63d9f36782687b89e08f89a18368525e2bae Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 11:06:42 +0700 Subject: [PATCH 10/19] Replace `sorted(Comparator.comparing(Peer::getDate).reversed())` with `sorted()` as we use descending order in `Peer.compareTo` --- .../services/peergroup/exchange/PeerExchangeStrategy.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java index 4f92ba9121..159b18146d 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java @@ -174,7 +174,7 @@ private List
getPersistedAddresses() { return peerGroupService.getPersistedPeers().stream() .filter(this::isValidNonSeedPeer) .filter(this::isNotOutDated) - .sorted(Comparator.comparing(Peer::getDate).reversed()) + .sorted() .limit(config.getNumPersistedPeersAtBoostrap()) .map(Peer::getAddress) .collect(Collectors.toList()); @@ -214,7 +214,7 @@ void addReportedPeers(Set reportedPeers, Address reporterAddress) { .filter(peer -> notSameAddress(reporterAddress, peer)) .filter(this::isValidNonSeedPeer) .filter(this::isNotOutDated) - .sorted(Comparator.comparing(Peer::getDate).reversed()) + .sorted() .limit(REPORTED_PEERS_LIMIT) .collect(Collectors.toSet()); peerGroupService.addReportedPeers(filtered); @@ -263,14 +263,14 @@ private boolean notSameAddress(Address address, Peer peer) { private Stream getAllConnectedPeers() { return peerGroupService.getAllConnectedPeers(node) .filter(this::isValidNonSeedPeer) - .sorted(Comparator.comparing(Peer::getDate).reversed()); + .sorted(); } private Stream getReportedPeers() { return peerGroupService.getReportedPeers().stream() .filter(this::isValidNonSeedPeer) .filter(this::isNotOutDated) - .sorted(Comparator.comparing(Peer::getDate).reversed()); + .sorted(); } private List
getShuffled(Collection
addresses) { From bedf06f73f175cc251c53e69c9ffccffeae643aa Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 11:11:45 +0700 Subject: [PATCH 11/19] Add handling of timeouts to startInitialPeerExchange and extendPeerGroup. We apply the exception to resultFuture and in case of startInitialPeerExchange we start the scheduler for repeating a call to startInitialPeerExchange. --- .../exchange/PeerExchangeService.java | 32 +++++++++++++++++-- .../exchange/PeerExchangeStrategy.java | 6 ++-- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java index 473eb9192b..777d0f2f19 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeService.java @@ -100,11 +100,37 @@ public void onDisconnect(Connection connection, CloseReason closeReason) { /////////////////////////////////////////////////////////////////////////////////////////////////// public CompletableFuture startInitialPeerExchange() { - return doPeerExchange(peerExchangeStrategy.getAddressesForInitialPeerExchange()).orTimeout(2, MINUTES); + List
candidates = peerExchangeStrategy.getAddressesForInitialPeerExchange(); + log.info("startInitialPeerExchange {}", candidates); + return doPeerExchange(candidates) + .orTimeout(2, MINUTES) + .whenComplete((nil, throwable) -> { + if (throwable != null) { + if (resultFuture != null) { + resultFuture.completeExceptionally(throwable); + + scheduler.ifPresent(Scheduler::stop); + scheduler = Optional.of(Scheduler.run(this::startInitialPeerExchange) + .after(doInitialPeerExchangeDelaySec, TimeUnit.SECONDS) + .name("PeerExchangeService.scheduler-" + StringUtils.truncate(node.toString(), 10))); + doInitialPeerExchangeDelaySec = Math.min(20, doInitialPeerExchangeDelaySec * 2); + } + } + }); } - public void extendPeerGroup() { - doPeerExchange(peerExchangeStrategy.getAddressesForExtendingPeerGroup()).orTimeout(2, MINUTES); + public CompletableFuture extendPeerGroup() { + List
candidates = peerExchangeStrategy.getAddressesForExtendingPeerGroup(); + log.info("extendPeerGroup {}", candidates); + return doPeerExchange(candidates) + .orTimeout(2, MINUTES) + .whenComplete((nil, throwable) -> { + if (throwable != null) { + if (resultFuture != null) { + resultFuture.completeExceptionally(throwable); + } + } + }); } /** diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java index 159b18146d..b3b83531fd 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java @@ -210,15 +210,15 @@ Set getPeersForReporting(Address requesterAddress) { } void addReportedPeers(Set reportedPeers, Address reporterAddress) { - Set filtered = reportedPeers.stream() + Set peers = reportedPeers.stream() .filter(peer -> notSameAddress(reporterAddress, peer)) .filter(this::isValidNonSeedPeer) .filter(this::isNotOutDated) .sorted() .limit(REPORTED_PEERS_LIMIT) .collect(Collectors.toSet()); - peerGroupService.addReportedPeers(filtered); - peerGroupService.addPersistedPeers(filtered); + peerGroupService.addReportedPeers(peers); + peerGroupService.addPersistedPeers(peers); } From bd09b0ce1e16612d5b04bd146fb7b8c2564e35dd Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 11:16:29 +0700 Subject: [PATCH 12/19] Add Node.Listener and call maybeCreateConnections at onDisconnect. This should ensure that we quickly get out optimal peer group size in case we lose connections. We add a 2 sec. delay to avoid that we get called too frequently in case of batch disconnects. --- .../services/peergroup/PeerGroupManager.java | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java index 486bb7af9b..3bbd19454e 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java @@ -20,6 +20,8 @@ import bisq.common.timer.Scheduler; import bisq.network.NetworkService; import bisq.network.common.Address; +import bisq.network.identity.NetworkId; +import bisq.network.p2p.message.EnvelopePayloadMessage; import bisq.network.p2p.node.CloseReason; import bisq.network.p2p.node.Connection; import bisq.network.p2p.node.Node; @@ -46,7 +48,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; @Slf4j -public class PeerGroupManager { +public class PeerGroupManager implements Node.Listener { public enum State { NEW, STARTING, @@ -123,7 +125,7 @@ public static Config from(PeerGroupService.Config peerGroupConfig, private final KeepAliveService keepAliveService; private final NetworkLoadExchangeService networkLoadExchangeService; private Optional scheduler = Optional.empty(); - + private Optional maybeCreateConnectionsScheduler = Optional.empty(); @Getter public AtomicReference state = new AtomicReference<>(PeerGroupManager.State.NEW); @@ -161,18 +163,40 @@ public PeerGroupManager(Node node, public void initialize() { // blocking + node.addListener(this); Failsafe.with(retryPolicy).run(this::doInitialize); } public void shutdown() { setState(State.STOPPING); + node.removeListener(this); peerExchangeService.shutdown(); keepAliveService.shutdown(); networkLoadExchangeService.shutdown(); scheduler.ifPresent(Scheduler::stop); + maybeCreateConnectionsScheduler.ifPresent(Scheduler::stop); setState(State.TERMINATED); } + + /////////////////////////////////////////////////////////////////////////////////////////////////// + // Node.Listener + /////////////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onMessage(EnvelopePayloadMessage envelopePayloadMessage, Connection connection, NetworkId networkId) { + } + + @Override + public void onConnection(Connection connection) { + } + + @Override + public void onDisconnect(Connection connection, CloseReason closeReason) { + maybeCreateConnectionsScheduler.ifPresent(Scheduler::stop); + maybeCreateConnectionsScheduler = Optional.of(Scheduler.run(this::maybeCreateConnections).after(2000)); + } + private void doInitialize() { log.info("{} called initialize", node); String nodeInfo = node.getNodeInfo(); @@ -236,6 +260,8 @@ private void doHouseKeeping() { Thread.sleep(100); maybeCloseExceedingConnections(); Thread.sleep(100); + maybeCreateConnectionsScheduler.ifPresent(Scheduler::stop); + maybeCreateConnectionsScheduler = Optional.empty(); maybeCreateConnections(); maybeRemoveReportedPeers(); maybeRemovePersistedPeers(); From c712a798230f39e7a10bd4126f3ab96c0a9db577 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 11:20:57 +0700 Subject: [PATCH 13/19] Replace `peerGroupService.getConnectionAgeComparator().reversed()` with `Connection.comparingDateDescending()` --- .../network/p2p/services/peergroup/PeerGroupManager.java | 9 +++------ .../network/p2p/services/peergroup/PeerGroupService.java | 5 ----- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java index 3bbd19454e..1be2ea346d 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java @@ -297,11 +297,10 @@ private void maybeCloseDuplicateConnections() { private void maybeCloseConnectionsToSeeds() { log.debug("{} called maybeCloseConnectionsToSeeds", node); - Comparator comparator = peerGroupService.getConnectionAgeComparator().reversed(); // reversed as we use skip node.getAllActiveConnections() .filter(this::allowDisconnect) .filter(peerGroupService::isSeed) - .sorted(comparator) + .sorted(Connection.comparingDateDescending()) // As we use skip we sort by descending creationDate so that we close the oldest connections .skip(config.getMaxSeeds()) .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as we have too " + "many connections to seeds.", @@ -323,10 +322,9 @@ private void maybeCloseAgedConnections() { private void maybeCloseExceedingInboundConnections() { log.debug("{} called maybeCloseExceedingInboundConnections", node); - Comparator comparator = peerGroupService.getConnectionAgeComparator().reversed(); node.getActiveInboundConnections() .filter(this::allowDisconnect) - .sorted(comparator) + .sorted(Connection.comparingDateDescending()) // As we use skip we sort by descending creationDate so that we close the oldest connections .skip(peerGroupService.getMaxInboundConnections()) .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as we have too many inbound connections.", node, connection.getPeersCapability().getAddress())) @@ -336,10 +334,9 @@ private void maybeCloseExceedingInboundConnections() { private void maybeCloseExceedingConnections() { log.debug("{} called maybeCloseExceedingConnections", node); - Comparator comparator = peerGroupService.getConnectionAgeComparator().reversed(); node.getAllActiveConnections() .filter(this::allowDisconnect) - .sorted(comparator) + .sorted(Connection.comparingDateDescending()) // As we use skip we sort by descending creationDate so that we close the oldest connections .skip(peerGroupService.getMaxNumConnectedPeers()) .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as we have too many connections.", node, connection.getPeersCapability().getAddress())) diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java index 5173736e1f..377b6844fe 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java @@ -29,7 +29,6 @@ import lombok.extern.slf4j.Slf4j; import java.util.Collection; -import java.util.Comparator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.stream.Stream; @@ -146,10 +145,6 @@ public int getMaxInboundConnections() { return config.getMaxNumConnectedPeers() - getMinOutboundConnections(); } - public Comparator getConnectionAgeComparator() { - return Comparator.comparing(connection -> connection.getConnectionMetrics().getCreationDate()); - } - /////////////////////////////////////////////////////////////////////////////////////////////////// // Peers From 994ea59df9b2a48240f45f2f1b5b8975389993b8 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 11:50:37 +0700 Subject: [PATCH 14/19] Refactor: Replace `connection.getPeersCapability().getAddress()` with `connection.getAddress()` --- .../network/p2p/services/peergroup/PeerGroupManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java index 1be2ea346d..cfce6c07ff 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java @@ -304,7 +304,7 @@ private void maybeCloseConnectionsToSeeds() { .skip(config.getMaxSeeds()) .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as we have too " + "many connections to seeds.", - node, connection.getPeersCapability().getAddress())) + node, connection.getPeerAddress())) .forEach(connection -> node.closeConnectionGracefully(connection, CloseReason.TOO_MANY_CONNECTIONS_TO_SEEDS)); } @@ -315,7 +315,7 @@ private void maybeCloseAgedConnections() { .filter(connection -> connection.getConnectionMetrics().getAge() > config.getMaxAge()) .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as the connection age " + "is too old.", - node, connection.getPeersCapability().getAddress())) + node, connection.getPeerAddress())) .forEach(connection -> node.closeConnectionGracefully(connection, CloseReason.AGED_CONNECTION)); } @@ -327,7 +327,7 @@ private void maybeCloseExceedingInboundConnections() { .sorted(Connection.comparingDateDescending()) // As we use skip we sort by descending creationDate so that we close the oldest connections .skip(peerGroupService.getMaxInboundConnections()) .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as we have too many inbound connections.", - node, connection.getPeersCapability().getAddress())) + node, connection.getPeerAddress())) .forEach(connection -> node.closeConnectionGracefully(connection, CloseReason.TOO_MANY_INBOUND_CONNECTIONS)); } @@ -339,7 +339,7 @@ private void maybeCloseExceedingConnections() { .sorted(Connection.comparingDateDescending()) // As we use skip we sort by descending creationDate so that we close the oldest connections .skip(peerGroupService.getMaxNumConnectedPeers()) .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as we have too many connections.", - node, connection.getPeersCapability().getAddress())) + node, connection.getPeerAddress())) .forEach(connection -> node.closeConnectionGracefully(connection, CloseReason.TOO_MANY_CONNECTIONS)); } From 2c65a9c347db9ecd1d4aac68dfaabfda7234b8cd Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 12:35:16 +0700 Subject: [PATCH 15/19] Fix bug with not updated peers. We used the EqualsAndHashCode.Include at Capability. When we called add/addAll we only add items when the Capability would be different, which is normally not the case if just the created date changed. Thus we did not update the age of the peers. We make it now more explicit to set the address as the identity field and use a hashmap in the service to manage updates. We only update peers if the one to get added is newer. --- .../network/p2p/services/peergroup/Peer.java | 11 ++- .../services/peergroup/PeerGroupService.java | 68 ++++++++++++++++--- .../services/peergroup/PeerGroupStore.java | 33 +++++---- 3 files changed, 86 insertions(+), 26 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/Peer.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/Peer.java index dd5a5821d1..74d7e5f5fa 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/Peer.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/Peer.java @@ -29,26 +29,35 @@ import javax.annotation.Nonnull; import java.util.Date; +/** + * We use only the address for EqualsAndHashCode as unique identifier as the other data can be + * updated and we do not consider those updates as a new peer instance. + */ @Getter @ToString @EqualsAndHashCode(onlyExplicitlyIncluded = true) public final class Peer implements NetworkProto, Comparable { @EqualsAndHashCode.Include + private final Address address; + private final Capability capability; private final NetworkLoad networkLoad; private final boolean isOutboundConnection; private final long created; + public Peer(Capability capability, NetworkLoad networkLoad, boolean isOutboundConnection) { this(capability, networkLoad, isOutboundConnection, System.currentTimeMillis()); } - public Peer(Capability capability, NetworkLoad networkLoad, boolean isOutboundConnection, long created) { + private Peer(Capability capability, NetworkLoad networkLoad, boolean isOutboundConnection, long created) { this.capability = capability; this.networkLoad = networkLoad; this.isOutboundConnection = isOutboundConnection; this.created = created; + address = capability.getAddress(); + verify(); } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java index 377b6844fe..a79e108a20 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java @@ -29,8 +29,11 @@ import lombok.extern.slf4j.Slf4j; import java.util.Collection; +import java.util.HashSet; +import java.util.Map; import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; /** @@ -77,8 +80,7 @@ public static Config from(com.typesafe.config.Config typesafeConfig) { @Getter private final Set
seedNodeAddresses; private final BanList banList; - @Getter - private final Set reportedPeers = new CopyOnWriteArraySet<>(); + private final Map reportedPeersByAddress = new ConcurrentHashMap<>(); public PeerGroupService(PersistenceService persistenceService, TransportType transportType, @@ -99,20 +101,54 @@ public PeerGroupService(PersistenceService persistenceService, // Persisted peers /////////////////////////////////////////////////////////////////////////////////////////////////// + public Map getPersistedPeersByAddress() { + return persistableStore.getPersistedPeersByAddress(); + } + public Set getPersistedPeers() { - return persistableStore.getPersistedPeers(); + return new HashSet<>(getPersistedPeersByAddress().values()); } - public void addPersistedPeers(Set peers) { - if (getPersistedPeers().addAll(peers)) { + public boolean addPersistedPeer(Peer peer) { + boolean wasAdded = doAddPersistedPeer(peer); + if (wasAdded) { persist(); } + return wasAdded; } - public void removePersistedPeers(Collection peers) { - if (getPersistedPeers().removeAll(peers)) { + private boolean doAddPersistedPeer(Peer peer) { + return doAddPeer(peer, getPersistedPeersByAddress()); + } + + private boolean doAddPeer(Peer peerToAdd, Map map) { + Address address = peerToAdd.getAddress(); + if (map.containsKey(address)) { + if (peerToAdd.getCreated() > map.get(address).getCreated()) { + map.put(address, peerToAdd); + return true; + } else { + return false; + } + } else { + map.put(address, peerToAdd); + return true; + } + } + + public boolean addPersistedPeers(Set peers) { + AtomicBoolean wasAdded = new AtomicBoolean(); + peers.forEach(peer -> wasAdded.set(doAddPersistedPeer(peer) || wasAdded.get())); + if (wasAdded.get()) { persist(); } + return wasAdded.get(); + } + + public void removePersistedPeers(Collection peers) { + Map persistedPeersById = getPersistedPeersByAddress(); + peers.forEach(peer -> persistedPeersById.remove(peer.getAddress())); + persist(); } @@ -120,12 +156,22 @@ public void removePersistedPeers(Collection peers) { // Reported peers /////////////////////////////////////////////////////////////////////////////////////////////////// - public void addReportedPeers(Set peers) { - reportedPeers.addAll(peers); + public Set getReportedPeers() { + return new HashSet<>(reportedPeersByAddress.values()); + } + + private boolean addReportedPeer(Peer peer) { + return doAddPeer(peer, reportedPeersByAddress); + } + + public boolean addReportedPeers(Set peers) { + AtomicBoolean wasAdded = new AtomicBoolean(); + peers.forEach(peer -> wasAdded.set(addReportedPeer(peer) || wasAdded.get())); + return wasAdded.get(); } public void removeReportedPeers(Collection peers) { - reportedPeers.removeAll(peers); + peers.forEach(peer -> reportedPeersByAddress.remove(peer.getAddress())); } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupStore.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupStore.java index 1b27ba068e..4acd80e7eb 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupStore.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupStore.java @@ -19,39 +19,40 @@ import bisq.common.proto.ProtoResolver; import bisq.common.proto.UnresolvableProtobufMessageException; +import bisq.network.common.Address; import bisq.persistence.PersistableStore; import com.google.protobuf.InvalidProtocolBufferException; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -@Getter @Slf4j public final class PeerGroupStore implements PersistableStore { - private final Set persistedPeers = new CopyOnWriteArraySet<>(); + private final Map persistedPeersByAddress = new ConcurrentHashMap<>(); public PeerGroupStore() { } - private PeerGroupStore(Set persistedPeers) { - this.persistedPeers.addAll(persistedPeers); + private PeerGroupStore(Map persistedPeersByAddress) { + this.persistedPeersByAddress.putAll(persistedPeersByAddress); } @Override public bisq.network.protobuf.PeerGroupStore toProto() { - return bisq.network.protobuf.PeerGroupStore.newBuilder().addAllPersistedPeers(persistedPeers.stream() + return bisq.network.protobuf.PeerGroupStore.newBuilder().addAllPersistedPeers(persistedPeersByAddress.values().stream() .map(Peer::toProto) .collect(Collectors.toSet())) .build(); } public static PeerGroupStore fromProto(bisq.network.protobuf.PeerGroupStore proto) { - return new PeerGroupStore(proto.getPersistedPeersList().stream() - .map(Peer::fromProto).collect(Collectors.toSet())); + Map persistedPeersById = proto.getPersistedPeersList().stream() + .map(Peer::fromProto) + .collect(Collectors.toMap(Peer::getAddress, e -> e)); + return new PeerGroupStore(persistedPeersById); } @Override @@ -67,12 +68,16 @@ public ProtoResolver> getResolver() { @Override public PeerGroupStore getClone() { - return new PeerGroupStore(new HashSet<>(persistedPeers)); + return new PeerGroupStore(new HashMap<>(persistedPeersByAddress)); } @Override public void applyPersisted(PeerGroupStore persisted) { - persistedPeers.clear(); - persistedPeers.addAll(persisted.getPersistedPeers()); + persistedPeersByAddress.clear(); + persistedPeersByAddress.putAll(persisted.getPersistedPeersByAddress()); + } + + Map getPersistedPeersByAddress() { + return persistedPeersByAddress; } } \ No newline at end of file From a6f8e5ed0216f1dd743a3311ce369d3e1d574093 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 12:52:31 +0700 Subject: [PATCH 16/19] Cleanup. Replace getPeersAddress with getPeerAddress. Remove getPeersAddress --- .../src/main/java/bisq/network/p2p/node/Connection.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/node/Connection.java b/network/network/src/main/java/bisq/network/p2p/node/Connection.java index c40db22ab3..03d496f870 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/Connection.java +++ b/network/network/src/main/java/bisq/network/p2p/node/Connection.java @@ -178,10 +178,6 @@ public boolean isRunning() { return !isStopped(); } - public Address getPeersAddress() { - return getPeersCapability().getAddress(); - } - public long getCreated() { return getConnectionMetrics().getCreated(); } @@ -195,7 +191,7 @@ public boolean createdBefore(long date) { } @Override public String toString() { - return "'" + getClass().getSimpleName() + " [peerAddress=" + getPeersAddress() + + return "'" + getClass().getSimpleName() + " [peerAddress=" + getPeerAddress() + ", keyId=" + getId() + "]'"; } @@ -302,7 +298,7 @@ boolean isStopped() { /////////////////////////////////////////////////////////////////////////////////////////////////// private String getThreadNameId() { - return StringUtils.truncate(getPeersAddress().toString() + "-" + id.substring(0, 8)); + return StringUtils.truncate(getPeerAddress().toString() + "-" + id.substring(0, 8)); } private boolean isInputStreamActive() { From 97225e6aecb4f0863de68daa78a55f5acfbeafe4 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 12:53:40 +0700 Subject: [PATCH 17/19] Use connection.createdBefore(maxAgeDate) instead of connection.getConnectionMetrics().getAge() > config.getMaxAge() --- .../bisq/network/p2p/services/peergroup/PeerGroupManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java index cfce6c07ff..8b52a955ee 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java @@ -310,9 +310,10 @@ private void maybeCloseConnectionsToSeeds() { private void maybeCloseAgedConnections() { log.debug("{} called maybeCloseAgedConnections", node); + long maxAgeDate = System.currentTimeMillis() - config.getMaxAge(); node.getAllActiveConnections() .filter(this::allowDisconnect) - .filter(connection -> connection.getConnectionMetrics().getAge() > config.getMaxAge()) + .filter(connection -> connection.createdBefore(maxAgeDate)) .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as the connection age " + "is too old.", node, connection.getPeerAddress())) From 99aa3248653052a5a9af5dabc76cce00085ff9ee Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 20 Mar 2024 13:14:22 +0700 Subject: [PATCH 18/19] Cleanup, Renaming --- .../services/peergroup/PeerGroupManager.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java index 8b52a955ee..e9bf97dbcb 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java @@ -318,7 +318,6 @@ private void maybeCloseAgedConnections() { "is too old.", node, connection.getPeerAddress())) .forEach(connection -> node.closeConnectionGracefully(connection, CloseReason.AGED_CONNECTION)); - } private void maybeCloseExceedingInboundConnections() { @@ -330,7 +329,6 @@ private void maybeCloseExceedingInboundConnections() { .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as we have too many inbound connections.", node, connection.getPeerAddress())) .forEach(connection -> node.closeConnectionGracefully(connection, CloseReason.TOO_MANY_INBOUND_CONNECTIONS)); - } private void maybeCloseExceedingConnections() { @@ -342,13 +340,11 @@ private void maybeCloseExceedingConnections() { .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as we have too many connections.", node, connection.getPeerAddress())) .forEach(connection -> node.closeConnectionGracefully(connection, CloseReason.TOO_MANY_CONNECTIONS)); - } private void maybeCreateConnections() { log.debug("{} called maybeCreateConnections", node); int minNumConnectedPeers = peerGroupService.getMinNumConnectedPeers(); - // We want to have at least 40% of our minNumConnectedPeers as outbound connections if (getMissingOutboundConnections() <= 0) { // We have enough outbound connections, lets check if we have sufficient connections in total if (node.getNumConnections() >= minNumConnectedPeers) { @@ -370,9 +366,9 @@ private void maybeRemoveReportedPeers() { int exceeding = reportedPeers.size() - config.getMaxReported(); if (exceeding > 0) { reportedPeers.sort(Comparator.comparing(Peer::getDate)); - List candidates = reportedPeers.subList(0, Math.min(exceeding, reportedPeers.size())); - log.info("Remove {} reported peers: {}", candidates.size(), candidates); - peerGroupService.removeReportedPeers(candidates); + List outDated = reportedPeers.subList(0, Math.min(exceeding, reportedPeers.size())); + log.info("Remove {} reported peers: {}", outDated.size(), outDated); + peerGroupService.removeReportedPeers(outDated); } } @@ -381,9 +377,9 @@ private void maybeRemovePersistedPeers() { int exceeding = persistedPeers.size() - config.getMaxPersisted(); if (exceeding > 0) { persistedPeers.sort(Comparator.comparing(Peer::getDate)); - List candidates = persistedPeers.subList(0, Math.min(exceeding, persistedPeers.size())); - log.info("Remove {} persisted peers: {}", candidates.size(), candidates); - peerGroupService.removePersistedPeers(candidates); + List outDated = persistedPeers.subList(0, Math.min(exceeding, persistedPeers.size())); + log.info("Remove {} persisted peers: {}", outDated.size(), outDated); + peerGroupService.removePersistedPeers(outDated); } } @@ -425,5 +421,4 @@ private boolean isNotBootstrapping(Connection connection) { private int getMissingOutboundConnections() { return peerGroupService.getMinOutboundConnections() - (int) node.getActiveOutboundConnections().count(); } - } \ No newline at end of file From 538b04aefc496149b7485801b6e4391b9e52692b Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Thu, 21 Mar 2024 10:34:32 +0700 Subject: [PATCH 19/19] Improve logs --- .../data/inventory/InventoryRequestService.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryRequestService.java b/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryRequestService.java index 0ffdc95a5f..57784359b5 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryRequestService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryRequestService.java @@ -40,10 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -151,7 +148,12 @@ private void requestInventory() { .whenComplete((list, throwable) -> { requestsPending.set(false); if (throwable != null) { - log.error("requestFromPeers failed", throwable); + if (throwable instanceof CompletionException && + throwable.getCause() instanceof CancellationException) { + log.debug("requestFromPeers failed", throwable); + } else { + log.error("requestFromPeers failed", throwable); + } } else if (list == null) { log.error("requestFromPeers completed with result list = null"); } else {