From 4ced6a097e027a90948119ad56faec7966a6275f Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Thu, 18 Apr 2024 17:10:11 +1000 Subject: [PATCH 01/18] peers don't do their own disconnects Signed-off-by: Sally MacFarlane --- .../org/hyperledger/besu/ethereum/eth/manager/EthPeer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java index 4ea10ef96e5..d654d3a2d50 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java @@ -215,7 +215,7 @@ public void recordRequestTimeout(final int requestCode) { .addArgument(this::getLoggableId) .log(); LOG.trace("Timed out while waiting for response from peer {}", this); - reputation.recordRequestTimeout(requestCode).ifPresent(this::disconnect); + reputation.recordRequestTimeout(requestCode); } public void recordUselessResponse(final String requestType) { @@ -224,7 +224,7 @@ public void recordUselessResponse(final String requestType) { .addArgument(requestType) .addArgument(this::getLoggableId) .log(); - reputation.recordUselessResponse(System.currentTimeMillis()).ifPresent(this::disconnect); + reputation.recordUselessResponse(System.currentTimeMillis()); } public void recordUsefulResponse() { From 2d4badf0e5fb883ac4fec91d44e53180d2b05957 Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Thu, 18 Apr 2024 17:30:27 +1000 Subject: [PATCH 02/18] only disconnect if at capacity; use peer reputation for peer comparator Signed-off-by: Sally MacFarlane --- .../MergeBesuControllerBuilder.java | 1 - .../besu/ethereum/eth/manager/EthPeers.java | 69 ++++++++++++++----- .../eth/manager/EthProtocolManager.java | 9 +++ .../eth/sync/fastsync/SyncTargetManager.java | 1 - 4 files changed, 62 insertions(+), 18 deletions(-) diff --git a/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java index 623aa2918f5..d7220d8a071 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java @@ -106,7 +106,6 @@ protected EthProtocolManager createEthProtocolManager( .getTerminalTotalDifficulty() .map(Difficulty::of) .orElseThrow()); - ethPeers.setBestChainComparator(mergeBestPeerComparator); mergeContext.observeNewIsPostMergeState(mergeBestPeerComparator); Optional filterToUse = Optional.of(new MergePeerFilter()); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index 2a4469220e0..05d0d54e808 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -115,7 +115,7 @@ public EthPeers( this.clock = clock; this.permissioningProviders = permissioningProviders; this.maxMessageSize = maxMessageSize; - this.bestPeerComparator = HEAVIEST_CHAIN; + this.bestPeerComparator = MOST_USEFUL_PEER; this.localNodeId = localNodeId; this.peerUpperBound = peerUpperBound; this.maxRemotelyInitiatedConnections = maxRemotelyInitiatedConnections; @@ -392,21 +392,58 @@ public boolean shouldConnect(final Peer peer, final boolean inbound) { return true; } - public void disconnectWorstUselessPeer() { - streamAvailablePeers() - .sorted(getBestChainComparator()) - .findFirst() - .ifPresent( - peer -> { - LOG.atDebug() - .setMessage( - "disconnecting peer {}. Waiting for better peers. Current {} of max {}") - .addArgument(peer::getLoggableId) - .addArgument(this::peerCount) - .addArgument(this::getMaxPeers) - .log(); - peer.disconnect(DisconnectMessage.DisconnectReason.USELESS_PEER_BY_CHAIN_COMPARATOR); - }); + public void disconnectWorstUselessPeerIfAtCapacity() { + if (peerCount() >= getMaxPeers()) { + streamAvailablePeers() + .filter(p -> !canExceedPeerLimits(p.getId())) + .min(getBestChainComparator()) + .ifPresent( + peer -> { + LOG.atDebug() + .setMessage( + "disconnecting peer {}. Waiting for better peers. Current {} of max {}") + .addArgument(peer::getLoggableId) + .addArgument(this::peerCount) + .addArgument(this::getMaxPeers) + .log(); + peer.disconnect( + DisconnectMessage.DisconnectReason.USELESS_PEER_BY_CHAIN_COMPARATOR); + }); + } + } + + public void disconnectWorstUselessPeerIfAtCapacityIncludingConnectingPeer( + final EthPeer connectingPeer) { + if (peerCount() >= getMaxPeers()) { + streamAvailablePeers() + .filter(p -> !canExceedPeerLimits(p.getId())) + .min(getBestChainComparator()) + .ifPresent( + worstCurrentlyConnectedPeer -> { + if (getBestChainComparator().compare(worstCurrentlyConnectedPeer, connectingPeer) + < 0) { + + LOG.atDebug() + .setMessage( + "disconnecting current peer {}. Waiting for better peers. Current {} of max {}") + .addArgument(worstCurrentlyConnectedPeer::getLoggableId) + .addArgument(this::peerCount) + .addArgument(this::getMaxPeers) + .log(); + worstCurrentlyConnectedPeer.disconnect( + DisconnectMessage.DisconnectReason.USELESS_PEER_BY_CHAIN_COMPARATOR); + } else { + LOG.atDebug() + .setMessage( + "disconnecting connecting peer {}. Waiting for better peers. Current {} of max {}") + .addArgument(connectingPeer::getLoggableId) + .addArgument(this::peerCount) + .addArgument(this::getMaxPeers) + .log(); + connectingPeer.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS); + } + }); + } } @FunctionalInterface diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index df1ee33d480..767b3af683b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -46,6 +46,7 @@ import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; import java.math.BigInteger; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -63,6 +64,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { private static final Logger LOG = LoggerFactory.getLogger(EthProtocolManager.class); + private static final Duration DEFAULT_ETH_PEERS_REFRESH_TIMEOUT = Duration.ofSeconds(10); private final EthScheduler scheduler; private final CountDownLatch shutdown; @@ -125,6 +127,13 @@ public EthProtocolManager( transactionPool, ethMessages, ethereumWireProtocolConfiguration); + scheduleEthPeersEvictionCheck(ethPeers); + } + + private void scheduleEthPeersEvictionCheck(final EthPeers ethPeers) { + // schedule a periodic check on ethPeers to evict worst peer if at capacity + this.scheduler.scheduleFutureTask( + () -> ethPeers.disconnectWorstUselessPeerIfAtCapacity(), DEFAULT_ETH_PEERS_REFRESH_TIMEOUT); } @VisibleForTesting diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/SyncTargetManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/SyncTargetManager.java index b23297c8e17..06e5fc1e618 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/SyncTargetManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/SyncTargetManager.java @@ -101,7 +101,6 @@ protected CompletableFuture> selectBestAvailableSyncTarget() { pivotBlockHeader.getNumber(), ethPeers.peerCount(), ethPeers.getMaxPeers()); - ethPeers.disconnectWorstUselessPeer(); return completedFuture(Optional.empty()); } else { return confirmPivotBlockHeader(bestPeer); From 632a3e047c819f84b9a119ed0505ac082de8991f Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Thu, 18 Apr 2024 19:40:16 +1000 Subject: [PATCH 03/18] delay the decision about whether to connect if we have full peers, until we actually compare Signed-off-by: Sally MacFarlane --- .../besu/ethereum/eth/manager/EthPeers.java | 91 ++++++++++++------- .../eth/manager/EthProtocolManager.java | 3 +- 2 files changed, 58 insertions(+), 36 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index 05d0d54e808..f213fa96b81 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -363,12 +363,8 @@ public Stream getAllConnections() { public boolean shouldConnect(final Peer peer, final boolean inbound) { final Bytes id = peer.getId(); - if (peerCount() >= peerUpperBound && !canExceedPeerLimits(id)) { - LOG.atTrace() - .setMessage("not connecting to peer {} - too many peers") - .addArgument(peer.getLoggableId()) - .log(); - return false; + if (canExceedPeerLimits(id)) { + return true; } final EthPeer ethPeer = completeConnections.get(id); if (ethPeer != null && !ethPeer.isDisconnected()) { @@ -420,6 +416,12 @@ public void disconnectWorstUselessPeerIfAtCapacityIncludingConnectingPeer( .min(getBestChainComparator()) .ifPresent( worstCurrentlyConnectedPeer -> { + // TODO remove this debug log + LOG.atDebug() + .setMessage("comparing worstCurrentPeer {} with connectingPeer {}") + .addArgument(worstCurrentlyConnectedPeer) + .addArgument(connectingPeer) + .log(); if (getBestChainComparator().compare(worstCurrentlyConnectedPeer, connectingPeer) < 0) { @@ -585,40 +587,59 @@ private boolean addPeerToEthPeers(final EthPeer peer) { } final PeerConnection connection = peer.getConnection(); final Bytes id = peer.getId(); - if (!randomPeerPriority) { - // Disconnect if too many peers - if (!canExceedPeerLimits(id) && peerCount() >= peerUpperBound) { - LOG.trace( - "Too many peers. Disconnect connection: {}, max connections {}", - connection, - peerUpperBound); - connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS); - return false; - } - // Disconnect if too many remotely-initiated connections - if (connection.inboundInitiated() - && !canExceedPeerLimits(id) - && remoteConnectionLimitReached()) { - LOG.trace( - "Too many remotely-initiated connections. Disconnect incoming connection: {}, maxRemote={}", - connection, - maxRemotelyInitiatedConnections); - connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS); - return false; - } - final boolean added = (completeConnections.putIfAbsent(id, peer) == null); - if (added) { - LOG.trace("Added peer {} with connection {} to completeConnections", id, connection); - } else { - LOG.trace("Did not add peer {} with connection {} to completeConnections", id, connection); - } - return added; - } else { + if (randomPeerPriority || canExceedPeerLimits(id)) { + // either the incoming peer is a static peer, or // randomPeerPriority! Add the peer and if there are too many connections fix it completeConnections.putIfAbsent(id, peer); enforceRemoteConnectionLimits(); enforceConnectionLimits(); return completeConnections.containsKey(id); } + // otherwise, we need to compare the incoming peer to existing peers + + // Disconnect if too many peers + if (peerCount() >= peerUpperBound) { + // If too many peers, disconnect one (could be the connecting peer if it does not compare + // favourably + disconnectWorstUselessPeerIfAtCapacityIncludingConnectingPeer(peer); + } + // Disconnect if too many remotely-initiated connections + if (connection.inboundInitiated() && remoteConnectionLimitReached()) { + LOG.trace( + "Too many remotely-initiated connections. Disconnect incoming connection: {}, maxRemote={}", + connection, + maxRemotelyInitiatedConnections); + disconnectWorstIncomingUselessPeer(); + connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS); + return false; + } + final boolean added = (completeConnections.putIfAbsent(id, peer) == null); + if (added) { + LOG.trace("Added peer {} with connection {} to completeConnections", id, connection); + } else { + LOG.trace("Did not add peer {} with connection {} to completeConnections", id, connection); + } + return added; + } + + public void disconnectWorstIncomingUselessPeer() { + if (peerCount() >= getMaxPeers()) { + streamAvailablePeers() + .filter(p -> p.getConnection().inboundInitiated()) + .filter(p -> !canExceedPeerLimits(p.getId())) + .min(getBestChainComparator()) + .ifPresent( + peer -> { + LOG.atDebug() + .setMessage( + "disconnecting peer {}. Waiting for better peers. Current {} of max {}") + .addArgument(peer::getLoggableId) + .addArgument(this::peerCount) + .addArgument(this::getMaxPeers) + .log(); + peer.disconnect( + DisconnectMessage.DisconnectReason.USELESS_PEER_BY_CHAIN_COMPARATOR); + }); + } } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index 767b3af683b..8d9212ccfc5 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -131,7 +131,8 @@ public EthProtocolManager( } private void scheduleEthPeersEvictionCheck(final EthPeers ethPeers) { - // schedule a periodic check on ethPeers to evict worst peer if at capacity + LOG.info( + "scheduling periodic check for max eth peers every {}", DEFAULT_ETH_PEERS_REFRESH_TIMEOUT); this.scheduler.scheduleFutureTask( () -> ethPeers.disconnectWorstUselessPeerIfAtCapacity(), DEFAULT_ETH_PEERS_REFRESH_TIMEOUT); } From 3dccfaf858c5904014d29b3200c2b4a4a493db7a Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Fri, 19 Apr 2024 04:43:52 +1000 Subject: [PATCH 04/18] delay the tru decision Signed-off-by: Sally MacFarlane --- .../org/hyperledger/besu/ethereum/eth/manager/EthPeers.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index f213fa96b81..fe56e34fe71 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -363,9 +363,6 @@ public Stream getAllConnections() { public boolean shouldConnect(final Peer peer, final boolean inbound) { final Bytes id = peer.getId(); - if (canExceedPeerLimits(id)) { - return true; - } final EthPeer ethPeer = completeConnections.get(id); if (ethPeer != null && !ethPeer.isDisconnected()) { LOG.atTrace() From bb695b56b15f1f97649d05299c694a80fb8546cc Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Fri, 19 Apr 2024 04:49:04 +1000 Subject: [PATCH 05/18] delay the tru decision Signed-off-by: Sally MacFarlane --- .../besu/ethereum/eth/manager/EthProtocolManager.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index 8d9212ccfc5..89336e6e317 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -415,9 +415,7 @@ public boolean shouldConnect(final Peer peer, final boolean incoming) { .setMessage("ForkId OK or not available for peer {}") .addArgument(peer::getLoggableId) .log(); - if (ethPeers.shouldConnect(peer, incoming)) { - return true; - } + return (ethPeers.shouldConnect(peer, incoming)); } else { LOG.atDebug() .setMessage("ForkId check failed for peer {} our fork id {} theirs {}") @@ -427,7 +425,6 @@ public boolean shouldConnect(final Peer peer, final boolean incoming) { .log(); return false; } - return false; } @Override From b275d52082f7bdcc264ff5e6cc8caa59ce738680 Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Fri, 19 Apr 2024 09:20:09 +1000 Subject: [PATCH 06/18] remove scheduled task Signed-off-by: Sally MacFarlane --- .../besu/ethereum/eth/manager/EthPeers.java | 3 ++- .../besu/ethereum/eth/manager/EthProtocolManager.java | 11 ----------- .../task/AbstractRetryingSwitchingPeerTask.java | 1 + .../besu/ethereum/eth/manager/EthPeersTest.java | 4 ---- 4 files changed, 3 insertions(+), 16 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index fe56e34fe71..a88e9f8ae48 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -400,7 +400,8 @@ public void disconnectWorstUselessPeerIfAtCapacity() { .addArgument(this::getMaxPeers) .log(); peer.disconnect( - DisconnectMessage.DisconnectReason.USELESS_PEER_BY_CHAIN_COMPARATOR); + DisconnectMessage.DisconnectReason + .USELESS_PEER_BY_REPUTATION); // TODO is this too granular }); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index 89336e6e317..a37491de5cf 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -46,7 +46,6 @@ import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; import java.math.BigInteger; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -64,8 +63,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { private static final Logger LOG = LoggerFactory.getLogger(EthProtocolManager.class); - private static final Duration DEFAULT_ETH_PEERS_REFRESH_TIMEOUT = Duration.ofSeconds(10); - private final EthScheduler scheduler; private final CountDownLatch shutdown; private final AtomicBoolean stopped = new AtomicBoolean(false); @@ -127,14 +124,6 @@ public EthProtocolManager( transactionPool, ethMessages, ethereumWireProtocolConfiguration); - scheduleEthPeersEvictionCheck(ethPeers); - } - - private void scheduleEthPeersEvictionCheck(final EthPeers ethPeers) { - LOG.info( - "scheduling periodic check for max eth peers every {}", DEFAULT_ETH_PEERS_REFRESH_TIMEOUT); - this.scheduler.scheduleFutureTask( - () -> ethPeers.disconnectWorstUselessPeerIfAtCapacity(), DEFAULT_ETH_PEERS_REFRESH_TIMEOUT); } @VisibleForTesting diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java index 9411fac081a..e2140a2c2d5 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java @@ -132,6 +132,7 @@ private Stream remainingPeersToTry() { } private void refreshPeers() { + // TODO this duplicates EthPeers.disconnectWorst final EthPeers peers = getEthContext().getEthPeers(); // If we are at max connections, then refresh peers disconnecting one of the failed peers, // or the least useful diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java index 59c02941428..57d9000eb16 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java @@ -75,10 +75,6 @@ public void comparesPeersWithHeightAndTd() { assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerA)).isGreaterThan(0); assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerA, peerA)).isEqualTo(0); assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerB)).isEqualTo(0); - - assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeer()).contains(peerB); - assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeerWithHeightEstimate()) - .contains(peerB); } @Test From de4db902345dbc7353dc958835ff30e84cf2a5fd Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Fri, 19 Apr 2024 11:08:37 +1000 Subject: [PATCH 07/18] use simple comparator for ethPeers in tests Signed-off-by: Sally MacFarlane --- .../besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java | 4 ++++ .../eth/sync/fullsync/FullSyncChainDownloaderTest.java | 3 +++ 2 files changed, 7 insertions(+) diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java index e060bab6fbe..ee0999afbbf 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java @@ -100,6 +100,10 @@ public void setUp(final DataStorageFormat storageFormat) { EthProtocolConfiguration.defaultConfig()); ethContext = ethProtocolManager.ethContext(); ethPeers = ethContext.getEthPeers(); + + // for tests use the heaviest chain comparator + ethPeers.setBestChainComparator(EthPeers.HEAVIEST_CHAIN); + syncState = new SyncState(blockchain, ethPeers); metricsSystem = new NoOpMetricsSystem(); fastSyncActions = diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java index f44c8e2da34..f7c3c3fa425 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java @@ -30,6 +30,7 @@ import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; @@ -102,6 +103,8 @@ public void setupTest(final DataStorageFormat storageFormat) { localBlockchainSetup.getTransactionPool(), EthProtocolConfiguration.defaultConfig()); ethContext = ethProtocolManager.ethContext(); + // for tests use the heaviest chain comparator + ethContext.getEthPeers().setBestChainComparator(EthPeers.HEAVIEST_CHAIN); syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); } From d9e840f83d77445b2b27ae23a8c778ca1907e591 Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Fri, 19 Apr 2024 12:00:57 +1000 Subject: [PATCH 08/18] removed unused method Signed-off-by: Sally MacFarlane --- .../besu/ethereum/eth/manager/EthPeers.java | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index a88e9f8ae48..12e8d1cbe49 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -385,27 +385,6 @@ public boolean shouldConnect(final Peer peer, final boolean inbound) { return true; } - public void disconnectWorstUselessPeerIfAtCapacity() { - if (peerCount() >= getMaxPeers()) { - streamAvailablePeers() - .filter(p -> !canExceedPeerLimits(p.getId())) - .min(getBestChainComparator()) - .ifPresent( - peer -> { - LOG.atDebug() - .setMessage( - "disconnecting peer {}. Waiting for better peers. Current {} of max {}") - .addArgument(peer::getLoggableId) - .addArgument(this::peerCount) - .addArgument(this::getMaxPeers) - .log(); - peer.disconnect( - DisconnectMessage.DisconnectReason - .USELESS_PEER_BY_REPUTATION); // TODO is this too granular - }); - } - } - public void disconnectWorstUselessPeerIfAtCapacityIncludingConnectingPeer( final EthPeer connectingPeer) { if (peerCount() >= getMaxPeers()) { From ad927c386c14d14c1782ae6d2b81ec959f1b8b2d Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Fri, 19 Apr 2024 12:48:11 +1000 Subject: [PATCH 09/18] fix some tests Signed-off-by: Sally MacFarlane --- .../besu/ethereum/eth/manager/EthPeers.java | 2 +- .../ethereum/eth/manager/EthPeersTest.java | 1 - .../AbstractBlockPropagationManagerTest.java | 47 ++++++++++--------- .../backwardsync/BackwardSyncAlgSpec.java | 2 + 4 files changed, 27 insertions(+), 25 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index 12e8d1cbe49..9c34e9904de 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -335,7 +335,7 @@ public Optional bestPeerMatchingCriteria(final Predicate match } public void setBestChainComparator(final Comparator comparator) { - LOG.info("Updating the default best peer comparator"); + LOG.info("Updating the default best peer comparator to {}",comparator); bestPeerComparator = comparator; } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java index 57d9000eb16..8ae8657d560 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java @@ -100,7 +100,6 @@ public void comparesPeersWithTdAndNoHeight() { assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerA, peerA)).isEqualTo(0); assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerB)).isEqualTo(0); - assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeer()).contains(peerA); assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeerWithHeightEstimate()) .isEmpty(); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java index a91b19c8e9e..9181db32c8c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java @@ -114,7 +114,12 @@ protected void setup(final DataStorageFormat dataStorageFormat) { blockchainUtil.getTransactionPool(), EthProtocolConfiguration.defaultConfig()); syncConfig = SynchronizerConfiguration.builder().blockPropagationRange(-3, 5).build(); - syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers()); + + // for tests use simple peer comparator + final EthPeers ethPeers = ethProtocolManager.ethContext().getEthPeers(); + ethPeers.setBestChainComparator(EthPeers.HEAVIEST_CHAIN); + + syncState = new SyncState(blockchain, ethPeers); blockBroadcaster = mock(BlockBroadcaster.class); blockPropagationManager = new BlockPropagationManager( @@ -619,17 +624,7 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() { .thenReturn(new CompletableFuture<>()); final EthContext ethContext = new EthContext( - new EthPeers( - "eth", - () -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader()), - TestClock.fixed(), - metricsSystem, - EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE, - Collections.emptyList(), - Bytes.random(64), - 25, - 25, - false), + setupEthPeersWithHeaviestChainComparator(), new EthMessages(), ethScheduler); final BlockPropagationManager blockPropagationManager = @@ -757,17 +752,7 @@ public Object answer(final InvocationOnMock invocation) throws Throwable { final EthContext ethContext = new EthContext( - new EthPeers( - "eth", - () -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader()), - TestClock.fixed(), - metricsSystem, - EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE, - Collections.emptyList(), - Bytes.random(64), - 25, - 25, - false), + setupEthPeersWithHeaviestChainComparator(), new EthMessages(), ethScheduler); final BlockPropagationManager blockPropagationManager = @@ -998,4 +983,20 @@ public void shouldRequestBlockFromOtherPeersIfFirstPeerFails() { private BlockHeader blockHeader(final long number) { return new BlockHeaderTestFixture().number(number).buildHeader(); } + + private EthPeers setupEthPeersWithHeaviestChainComparator() { + final EthPeers ethPeers = new EthPeers( + "eth", + () -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader()), + TestClock.fixed(), + metricsSystem, + EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE, + Collections.emptyList(), + Bytes.random(64), + 25, + 25, + false); + ethPeers.setBestChainComparator(EthPeers.HEAVIEST_CHAIN); + return ethPeers; + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java index 438433c5181..558f1e14a0c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java @@ -32,6 +32,7 @@ import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.TransactionReceipt; +import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.plugin.services.BesuEvents; import java.nio.charset.StandardCharsets; @@ -179,6 +180,7 @@ public void shouldAwokeWhenConditionReachedAndReady() throws Exception { when(context.getSyncState().subscribeTTDReached(any())).thenReturn(88L); when(context.getSyncState().subscribeCompletionReached(any())).thenReturn(99L); when(context.getEthContext().getEthPeers().peerCount()).thenReturn(1); + when(context.getEthContext().getEthPeers().getBestChainComparator()).thenReturn(EthPeers.HEAVIEST_CHAIN); final CompletableFuture voidCompletableFuture = algorithm.waitForReady(); Thread.sleep(50); From f808a7268da4368fcd1136ca890b2706be92b449 Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Fri, 19 Apr 2024 12:51:41 +1000 Subject: [PATCH 10/18] formatting Signed-off-by: Sally MacFarlane --- .../besu/ethereum/eth/manager/EthPeers.java | 2 +- .../sync/AbstractBlockPropagationManagerTest.java | 13 ++++--------- .../eth/sync/backwardsync/BackwardSyncAlgSpec.java | 3 ++- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index 9c34e9904de..fa3a9c98b64 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -335,7 +335,7 @@ public Optional bestPeerMatchingCriteria(final Predicate match } public void setBestChainComparator(final Comparator comparator) { - LOG.info("Updating the default best peer comparator to {}",comparator); + LOG.info("Updating the default best peer comparator to {}", comparator); bestPeerComparator = comparator; } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java index 9181db32c8c..81e9a85d835 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java @@ -623,10 +623,7 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() { when(ethScheduler.scheduleSyncWorkerTask(any(Supplier.class))) .thenReturn(new CompletableFuture<>()); final EthContext ethContext = - new EthContext( - setupEthPeersWithHeaviestChainComparator(), - new EthMessages(), - ethScheduler); + new EthContext(setupEthPeersWithHeaviestChainComparator(), new EthMessages(), ethScheduler); final BlockPropagationManager blockPropagationManager = new BlockPropagationManager( syncConfig, @@ -751,10 +748,7 @@ public Object answer(final InvocationOnMock invocation) throws Throwable { }); final EthContext ethContext = - new EthContext( - setupEthPeersWithHeaviestChainComparator(), - new EthMessages(), - ethScheduler); + new EthContext(setupEthPeersWithHeaviestChainComparator(), new EthMessages(), ethScheduler); final BlockPropagationManager blockPropagationManager = new BlockPropagationManager( syncConfig, @@ -985,7 +979,8 @@ private BlockHeader blockHeader(final long number) { } private EthPeers setupEthPeersWithHeaviestChainComparator() { - final EthPeers ethPeers = new EthPeers( + final EthPeers ethPeers = + new EthPeers( "eth", () -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader()), TestClock.fixed(), diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java index 558f1e14a0c..594769c4f18 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java @@ -180,7 +180,8 @@ public void shouldAwokeWhenConditionReachedAndReady() throws Exception { when(context.getSyncState().subscribeTTDReached(any())).thenReturn(88L); when(context.getSyncState().subscribeCompletionReached(any())).thenReturn(99L); when(context.getEthContext().getEthPeers().peerCount()).thenReturn(1); - when(context.getEthContext().getEthPeers().getBestChainComparator()).thenReturn(EthPeers.HEAVIEST_CHAIN); + when(context.getEthContext().getEthPeers().getBestChainComparator()) + .thenReturn(EthPeers.HEAVIEST_CHAIN); final CompletableFuture voidCompletableFuture = algorithm.waitForReady(); Thread.sleep(50); From f623e72b98288876aaf5b9fe53b5155a3394c0bd Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Fri, 19 Apr 2024 12:52:08 +1000 Subject: [PATCH 11/18] changelog Signed-off-by: Sally MacFarlane --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 045968989cc..87182856755 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ - Expose bad block events via the BesuEvents plugin API [#6848](https://github.com/hyperledger/besu/pull/6848) - Add RPC errors metric [#6919](https://github.com/hyperledger/besu/pull/6919/) - Add `rlp decode` subcommand to decode IBFT/QBFT extraData to validator list [#6895](https://github.com/hyperledger/besu/pull/6895) +- Peering - refactor disconnect logic to improve peer acquisition and retention [#6968](https://github.com/hyperledger/besu/pull/6968) ### Bug fixes - Fix txpool dump/restore race condition [#6665](https://github.com/hyperledger/besu/pull/6665) From 6afebd3f13586f486bf22c5d807de25231da3ea2 Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Fri, 19 Apr 2024 17:40:28 +1000 Subject: [PATCH 12/18] fixed block propagation tests Signed-off-by: Sally MacFarlane --- .../eth/sync/AbstractBlockPropagationManagerTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java index 81e9a85d835..6ffd88f5520 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java @@ -58,6 +58,7 @@ import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.storage.DataStorageFormat; @@ -562,6 +563,8 @@ public void purgesOldBlocks() { blockPropagationManager.start(); final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); + final RespondingEthPeer secondPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 2); final NewBlockMessage blockAnnouncementMsg = NewBlockMessage.create(blockToPurge, Difficulty.ZERO); @@ -573,6 +576,7 @@ public void purgesOldBlocks() { // Check that we pushed our block into the pending collection assertThat(blockchain.contains(blockToPurge.getHash())).isFalse(); assertThat(pendingBlocksManager.contains(blockToPurge.getHash())).isTrue(); + secondPeer.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS); // Import blocks until we bury the target block far enough to be cleaned up for (int i = 0; i < oldBlocksToImport; i++) { @@ -946,6 +950,10 @@ public void shouldRequestBlockFromOtherPeersIfFirstPeerFails() { final RespondingEthPeer firstPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); + // second peer responds + final RespondingEthPeer secondPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 2); + final NewBlockHashesMessage nextAnnouncement = NewBlockHashesMessage.create( Collections.singletonList( @@ -958,9 +966,6 @@ public void shouldRequestBlockFromOtherPeersIfFirstPeerFails() { assertThat(blockchain.contains(nextBlock.getHash())).isFalse(); - // second peer responds - final RespondingEthPeer secondPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); final Responder goodResponder = RespondingEthPeer.blockchainResponder(getFullBlockchain()); secondPeer.respondWhile(goodResponder, secondPeer::hasOutstandingRequests); From ab1809cd6fed53c948fbf88e10e50a95ac0947cd Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Mon, 22 Apr 2024 18:12:45 +1000 Subject: [PATCH 13/18] allow some trailing peers Signed-off-by: Sally MacFarlane --- .../besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java | 2 +- .../besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java index 7b64f885c29..b976b00eaa0 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java @@ -163,7 +163,7 @@ public void deleteFastSyncState() { protected FastSyncState updateMaxTrailingPeers(final FastSyncState state) { if (state.getPivotBlockNumber().isPresent()) { trailingPeerRequirements = - Optional.of(new TrailingPeerRequirements(state.getPivotBlockNumber().getAsLong(), 0)); + Optional.of(new TrailingPeerRequirements(state.getPivotBlockNumber().getAsLong(), 5)); } else { trailingPeerRequirements = Optional.empty(); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java index ae252a39f47..69030f484d3 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java @@ -564,7 +564,7 @@ public void shouldNotAllowPeersBeforePivotBlockOnceSelected( downloader.start(); Assertions.assertThat(downloader.calculateTrailingPeerRequirements()) - .contains(new TrailingPeerRequirements(50, 0)); + .contains(new TrailingPeerRequirements(50, 5)); } @ParameterizedTest From 5fa7599530413af0c5213a51c4fad132586c7245 Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Thu, 16 May 2024 14:55:52 +1000 Subject: [PATCH 14/18] simplify the logic around enforcing connection limits Signed-off-by: Sally MacFarlane --- .../besu/ethereum/eth/manager/EthPeers.java | 47 +++++++------------ 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index fa3a9c98b64..10a28a0efd4 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -526,24 +526,10 @@ private void enforceConnectionLimits() { }); } - private boolean remoteConnectionLimitReached() { - return shouldLimitRemoteConnections() - && countUntrustedRemotelyInitiatedConnections() >= maxRemotelyInitiatedConnections; - } - private boolean shouldLimitRemoteConnections() { return maxRemotelyInitiatedConnections < peerUpperBound; } - private long countUntrustedRemotelyInitiatedConnections() { - return completeConnections.values().stream() - .map(ep -> ep.getConnection()) - .filter(c -> c.inboundInitiated()) - .filter(c -> !c.isDisconnected()) - .filter(conn -> !canExceedPeerLimits(conn.getPeer().getId())) - .count(); - } - private void onCacheRemoval( final RemovalNotification removalNotification) { if (removalNotification.wasEvicted()) { @@ -567,10 +553,7 @@ private boolean addPeerToEthPeers(final EthPeer peer) { if (randomPeerPriority || canExceedPeerLimits(id)) { // either the incoming peer is a static peer, or // randomPeerPriority! Add the peer and if there are too many connections fix it - completeConnections.putIfAbsent(id, peer); - enforceRemoteConnectionLimits(); - enforceConnectionLimits(); - return completeConnections.containsKey(id); + return addPeerAndEnforceConnectionLimits(peer, id, connection); } // otherwise, we need to compare the incoming peer to existing peers @@ -580,17 +563,23 @@ private boolean addPeerToEthPeers(final EthPeer peer) { // favourably disconnectWorstUselessPeerIfAtCapacityIncludingConnectingPeer(peer); } - // Disconnect if too many remotely-initiated connections - if (connection.inboundInitiated() && remoteConnectionLimitReached()) { - LOG.trace( - "Too many remotely-initiated connections. Disconnect incoming connection: {}, maxRemote={}", - connection, - maxRemotelyInitiatedConnections); - disconnectWorstIncomingUselessPeer(); - connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS); - return false; - } - final boolean added = (completeConnections.putIfAbsent(id, peer) == null); + return addPeerAndEnforceConnectionLimits(peer, id, connection); + } + + /** + * Returns true if this specific peer was added, after connection limits were checked + * + * @param peer the peer to add + * @param id the peer's id + * @param connection the peer's connection + * @return whether the peer remains added after connection limits are enforced + */ + private boolean addPeerAndEnforceConnectionLimits( + final EthPeer peer, final Bytes id, final PeerConnection connection) { + completeConnections.putIfAbsent(id, peer); + enforceRemoteConnectionLimits(); + enforceConnectionLimits(); + final boolean added = completeConnections.containsKey(id); if (added) { LOG.trace("Added peer {} with connection {} to completeConnections", id, connection); } else { From 3420cc3afd05355654860a7954d30979c1947d96 Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Wed, 12 Jun 2024 15:25:56 +1000 Subject: [PATCH 15/18] review feedback Signed-off-by: Sally MacFarlane --- .../org/hyperledger/besu/ethereum/eth/manager/EthPeer.java | 4 ++-- .../eth/manager/task/AbstractRetryingSwitchingPeerTask.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java index c9f1ba97620..41db20bb07f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java @@ -215,7 +215,7 @@ public void recordRequestTimeout(final int requestCode) { .addArgument(this::getLoggableId) .log(); LOG.trace("Timed out while waiting for response from peer {}", this); - reputation.recordRequestTimeout(requestCode, this); + reputation.recordRequestTimeout(requestCode, this).ifPresent(this::disconnect); } public void recordUselessResponse(final String requestType) { @@ -224,7 +224,7 @@ public void recordUselessResponse(final String requestType) { .addArgument(requestType) .addArgument(this::getLoggableId) .log(); - reputation.recordUselessResponse(System.currentTimeMillis(), this); + reputation.recordUselessResponse(System.currentTimeMillis(), this).ifPresent(this::disconnect); } public void recordUsefulResponse() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java index 203a15aefe9..8a9e51ee501 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java @@ -138,7 +138,7 @@ private void refreshPeers() { // or the least useful if (peers.peerCount() >= peers.getMaxPeers()) { - failedPeers.stream().filter(peer -> !peer.isDisconnected()).findAny().stream() + failedPeers.stream() .min(EthPeers.MOST_USEFUL_PEER) .or(() -> peers.streamAvailablePeers().min(EthPeers.MOST_USEFUL_PEER)) .ifPresent( From cd6a448524423b2b39035e5d1198ce53fb670215 Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Mon, 24 Jun 2024 15:07:27 +1000 Subject: [PATCH 16/18] fixed method rename lost in merge Signed-off-by: Sally MacFarlane --- .../org/hyperledger/besu/ethereum/eth/manager/EthPeers.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index a5eb11e07e5..b4406eb3639 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -604,7 +604,7 @@ public void disconnectWorstIncomingUselessPeer() { streamAvailablePeers() .filter(p -> p.getConnection().inboundInitiated()) .filter(p -> !canExceedPeerLimits(p.getId())) - .min(getBestChainComparator()) + .min(getBestPeerComparator()) .ifPresent( peer -> { LOG.atDebug() From be1b2d72e37fc546cc9a18ea4e747d38a627651c Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Mon, 24 Jun 2024 15:40:09 +1000 Subject: [PATCH 17/18] fixed method rename lost in merge Signed-off-by: Sally MacFarlane --- .../eth/sync/AbstractBlockPropagationManagerTest.java | 4 ++-- .../ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java | 2 +- .../besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java | 2 +- .../eth/sync/fullsync/FullSyncChainDownloaderTest.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java index 6ffd88f5520..fbd6a32fc69 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java @@ -118,7 +118,7 @@ protected void setup(final DataStorageFormat dataStorageFormat) { // for tests use simple peer comparator final EthPeers ethPeers = ethProtocolManager.ethContext().getEthPeers(); - ethPeers.setBestChainComparator(EthPeers.HEAVIEST_CHAIN); + ethPeers.setBestPeerComparator(EthPeers.HEAVIEST_CHAIN); syncState = new SyncState(blockchain, ethPeers); blockBroadcaster = mock(BlockBroadcaster.class); @@ -996,7 +996,7 @@ private EthPeers setupEthPeersWithHeaviestChainComparator() { 25, 25, false); - ethPeers.setBestChainComparator(EthPeers.HEAVIEST_CHAIN); + ethPeers.setBestPeerComparator(EthPeers.HEAVIEST_CHAIN); return ethPeers; } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java index a6adfa487e5..aabd371223a 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java @@ -177,7 +177,7 @@ public void shouldAwokeWhenConditionReachedAndReady() throws Exception { when(context.getSyncState().subscribeTTDReached(any())).thenReturn(88L); when(context.getSyncState().subscribeCompletionReached(any())).thenReturn(99L); when(context.getEthContext().getEthPeers().peerCount()).thenReturn(1); - when(context.getEthContext().getEthPeers().getBestChainComparator()) + when(context.getEthContext().getEthPeers().getBestPeerComparator()) .thenReturn(EthPeers.HEAVIEST_CHAIN); final CompletableFuture voidCompletableFuture = algorithm.waitForReady(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java index ee0999afbbf..89663caaeab 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java @@ -102,7 +102,7 @@ public void setUp(final DataStorageFormat storageFormat) { ethPeers = ethContext.getEthPeers(); // for tests use the heaviest chain comparator - ethPeers.setBestChainComparator(EthPeers.HEAVIEST_CHAIN); + ethPeers.setBestPeerComparator(EthPeers.HEAVIEST_CHAIN); syncState = new SyncState(blockchain, ethPeers); metricsSystem = new NoOpMetricsSystem(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java index f7c3c3fa425..0bfa10b324b 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java @@ -104,7 +104,7 @@ public void setupTest(final DataStorageFormat storageFormat) { EthProtocolConfiguration.defaultConfig()); ethContext = ethProtocolManager.ethContext(); // for tests use the heaviest chain comparator - ethContext.getEthPeers().setBestChainComparator(EthPeers.HEAVIEST_CHAIN); + ethContext.getEthPeers().setBestPeerComparator(EthPeers.HEAVIEST_CHAIN); syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); } From 785990698941631cdbadd60f3f21946eaa3aab1e Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Mon, 24 Jun 2024 15:50:43 +1000 Subject: [PATCH 18/18] formatting Signed-off-by: Sally MacFarlane --- .../besu/ethereum/eth/manager/EthPeers.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index b4406eb3639..224d1a46df5 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -584,17 +584,17 @@ private boolean addPeerAndEnforceConnectionLimits( enforceConnectionLimits(); final boolean added = completeConnections.containsKey(id); if (added) { - LOG.atTrace() - .setMessage("Added peer {} with connection {} to completeConnections") - .addArgument(id) - .addArgument(connection) - .log(); + LOG.atTrace() + .setMessage("Added peer {} with connection {} to completeConnections") + .addArgument(id) + .addArgument(connection) + .log(); } else { - LOG.atTrace() - .setMessage("Did not add peer {} with connection {} to completeConnections") - .addArgument(id) - .addArgument(connection) - .log(); + LOG.atTrace() + .setMessage("Did not add peer {} with connection {} to completeConnections") + .addArgument(id) + .addArgument(connection) + .log(); } return added; }