Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peering - disconnects refactor #6968

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4ced6a0
peers don't do their own disconnects
macfarla Apr 18, 2024
2d4badf
only disconnect if at capacity; use peer reputation for peer comparator
macfarla Apr 18, 2024
632a3e0
delay the decision about whether to connect if we have full peers, un…
macfarla Apr 18, 2024
3dccfaf
delay the tru decision
macfarla Apr 18, 2024
bb695b5
delay the tru decision
macfarla Apr 18, 2024
b275d52
remove scheduled task
macfarla Apr 18, 2024
de4db90
use simple comparator for ethPeers in tests
macfarla Apr 19, 2024
d9e840f
removed unused method
macfarla Apr 19, 2024
ad927c3
fix some tests
macfarla Apr 19, 2024
f808a72
formatting
macfarla Apr 19, 2024
f623e72
changelog
macfarla Apr 19, 2024
6afebd3
fixed block propagation tests
macfarla Apr 19, 2024
40405a3
merge
macfarla Apr 21, 2024
ab1809c
allow some trailing peers
macfarla Apr 22, 2024
60aa98d
Merge branch 'main' of github.com:hyperledger/besu into useless-disco…
macfarla Apr 22, 2024
e02c1ab
merge
macfarla May 15, 2024
5fa7599
simplify the logic around enforcing connection limits
macfarla May 16, 2024
9857dce
Merge branch 'main' of github.com:hyperledger/besu into useless-disco…
macfarla May 16, 2024
7d29573
Merge branch 'main' into useless-disconnects
macfarla May 22, 2024
a2d8181
Merge branch 'main' into useless-disconnects
macfarla May 29, 2024
d40cfaf
Merge branch 'main' into useless-disconnects
macfarla May 29, 2024
2871bea
Merge branch 'main' into useless-disconnects
macfarla Jun 3, 2024
0f82c92
merge
macfarla Jun 12, 2024
3420cc3
review feedback
macfarla Jun 12, 2024
fdc4099
Merge branch 'useless-disconnects' of github.com:macfarla/besu into u…
macfarla Jun 12, 2024
428c5d6
merge
macfarla Jun 24, 2024
cd6a448
fixed method rename lost in merge
macfarla Jun 24, 2024
be1b2d7
fixed method rename lost in merge
macfarla Jun 24, 2024
7859906
formatting
macfarla Jun 24, 2024
1dc6f15
Merge branch 'main' of github.com:hyperledger/besu into useless-disco…
macfarla Jun 24, 2024
6c66577
merge
macfarla Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- Default bonsai to use full-flat db and code-storage-by-code-hash [#6984](https://github.com/hyperledger/besu/pull/6894)
- New RPC methods miner_setExtraData and miner_getExtraData [#7078](https://github.com/hyperledger/besu/pull/7078)
- Disconnect peers that have multiple discovery ports since they give us bad neighbours [#7089](https://github.com/hyperledger/besu/pull/7089)
- Peering - refactor disconnect logic to improve peer acquisition and retention [#6968](https://github.com/hyperledger/besu/pull/6968)

### Known Issues
- [Frequency: occasional < 10%] Chain download halt. Only affects new syncs (new nodes syncing from scratch). Symptom: Block import halts, despite having a full set of peers and world state downloading finishing. Generally restarting besu will resolve the issue. We are tracking this in [#6884](https://github.com/hyperledger/besu/pull/6884)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ protected EthProtocolManager createEthProtocolManager(
var mergeBestPeerComparator =
new TransitionBestPeerComparator(
genesisConfigOptions.getTerminalTotalDifficulty().map(Difficulty::of).orElseThrow());
ethPeers.setBestChainComparator(mergeBestPeerComparator);
mergeContext.observeNewIsPostMergeState(mergeBestPeerComparator);

Optional<MergePeerFilter> filterToUse = Optional.of(new MergePeerFilter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void recordRequestTimeout(final int requestCode) {
.addArgument(this::getLoggableId)
.log();
LOG.trace("Timed out while waiting for response from peer {}", this);
reputation.recordRequestTimeout(requestCode, this).ifPresent(this::disconnect);
reputation.recordRequestTimeout(requestCode, this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should disconnect these, because otherwise we might waist time by using these peers for requests, because they are likely to timeout as well.

}

public void recordUselessResponse(final String requestType) {
Expand All @@ -224,7 +224,7 @@ public void recordUselessResponse(final String requestType) {
.addArgument(requestType)
.addArgument(this::getLoggableId)
.log();
reputation.recordUselessResponse(System.currentTimeMillis(), this).ifPresent(this::disconnect);
reputation.recordUselessResponse(System.currentTimeMillis(), this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s.a.

}

public void recordUsefulResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public EthPeers(
this.clock = clock;
this.permissioningProviders = permissioningProviders;
this.maxMessageSize = maxMessageSize;
this.bestPeerComparator = HEAVIEST_CHAIN;
this.bestPeerComparator = MOST_USEFUL_PEER;
this.localNodeId = localNodeId;
this.peerUpperBound = peerUpperBound;
this.maxRemotelyInitiatedConnections = maxRemotelyInitiatedConnections;
Expand Down Expand Up @@ -335,7 +335,7 @@ public Optional<EthPeer> bestPeerMatchingCriteria(final Predicate<EthPeer> match
}

public void setBestChainComparator(final Comparator<EthPeer> comparator) {
LOG.info("Updating the default best peer comparator");
LOG.info("Updating the default best peer comparator to {}", comparator);
bestPeerComparator = comparator;
}

Expand Down Expand Up @@ -363,13 +363,6 @@ public Stream<PeerConnection> getAllConnections() {

public boolean shouldConnect(final Peer peer, final boolean inbound) {
final Bytes id = peer.getId();
if (peerCount() >= peerUpperBound && !canExceedPeerLimits(id)) {
LOG.atTrace()
.setMessage("not connecting to peer {} - too many peers")
.addArgument(peer.getLoggableId())
.log();
return false;
}
final EthPeer ethPeer = completeConnections.get(id);
if (ethPeer != null && !ethPeer.isDisconnected()) {
LOG.atTrace()
Expand All @@ -392,21 +385,44 @@ public boolean shouldConnect(final Peer peer, final boolean inbound) {
return true;
}

public void disconnectWorstUselessPeer() {
streamAvailablePeers()
.sorted(getBestChainComparator())
.findFirst()
.ifPresent(
peer -> {
LOG.atDebug()
.setMessage(
"disconnecting peer {}. Waiting for better peers. Current {} of max {}")
.addArgument(peer::getLoggableId)
.addArgument(this::peerCount)
.addArgument(this::getMaxPeers)
.log();
peer.disconnect(DisconnectMessage.DisconnectReason.USELESS_PEER_BY_CHAIN_COMPARATOR);
});
public void disconnectWorstUselessPeerIfAtCapacityIncludingConnectingPeer(
final EthPeer connectingPeer) {
if (peerCount() >= getMaxPeers()) {
streamAvailablePeers()
.filter(p -> !canExceedPeerLimits(p.getId()))
.min(getBestChainComparator())
.ifPresent(
worstCurrentlyConnectedPeer -> {
// TODO remove this debug log
LOG.atDebug()
.setMessage("comparing worstCurrentPeer {} with connectingPeer {}")
.addArgument(worstCurrentlyConnectedPeer)
.addArgument(connectingPeer)
.log();
if (getBestChainComparator().compare(worstCurrentlyConnectedPeer, connectingPeer)
< 0) {

LOG.atDebug()
.setMessage(
"disconnecting current peer {}. Waiting for better peers. Current {} of max {}")
.addArgument(worstCurrentlyConnectedPeer::getLoggableId)
.addArgument(this::peerCount)
.addArgument(this::getMaxPeers)
.log();
worstCurrentlyConnectedPeer.disconnect(
DisconnectMessage.DisconnectReason.USELESS_PEER_BY_CHAIN_COMPARATOR);
} else {
LOG.atDebug()
.setMessage(
"disconnecting connecting peer {}. Waiting for better peers. Current {} of max {}")
.addArgument(connectingPeer::getLoggableId)
.addArgument(this::peerCount)
.addArgument(this::getMaxPeers)
.log();
connectingPeer.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS);
}
});
}
}

@FunctionalInterface
Expand Down Expand Up @@ -510,24 +526,10 @@ private void enforceConnectionLimits() {
});
}

private boolean remoteConnectionLimitReached() {
return shouldLimitRemoteConnections()
&& countUntrustedRemotelyInitiatedConnections() >= maxRemotelyInitiatedConnections;
}

private boolean shouldLimitRemoteConnections() {
return maxRemotelyInitiatedConnections < peerUpperBound;
}

private long countUntrustedRemotelyInitiatedConnections() {
return completeConnections.values().stream()
.map(ep -> ep.getConnection())
.filter(c -> c.inboundInitiated())
.filter(c -> !c.isDisconnected())
.filter(conn -> !canExceedPeerLimits(conn.getPeer().getId()))
.count();
}

private void onCacheRemoval(
final RemovalNotification<PeerConnection, EthPeer> removalNotification) {
if (removalNotification.wasEvicted()) {
Expand All @@ -548,40 +550,62 @@ private boolean addPeerToEthPeers(final EthPeer peer) {
}
final PeerConnection connection = peer.getConnection();
final Bytes id = peer.getId();
if (!randomPeerPriority) {
// Disconnect if too many peers
if (!canExceedPeerLimits(id) && peerCount() >= peerUpperBound) {
LOG.trace(
"Too many peers. Disconnect connection: {}, max connections {}",
connection,
peerUpperBound);
connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS);
return false;
}
// Disconnect if too many remotely-initiated connections
if (connection.inboundInitiated()
&& !canExceedPeerLimits(id)
&& remoteConnectionLimitReached()) {
LOG.trace(
"Too many remotely-initiated connections. Disconnect incoming connection: {}, maxRemote={}",
connection,
maxRemotelyInitiatedConnections);
connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS);
return false;
}
final boolean added = (completeConnections.putIfAbsent(id, peer) == null);
if (added) {
LOG.trace("Added peer {} with connection {} to completeConnections", id, connection);
} else {
LOG.trace("Did not add peer {} with connection {} to completeConnections", id, connection);
}
return added;
} else {
if (randomPeerPriority || canExceedPeerLimits(id)) {
// either the incoming peer is a static peer, or
// randomPeerPriority! Add the peer and if there are too many connections fix it
completeConnections.putIfAbsent(id, peer);
enforceRemoteConnectionLimits();
enforceConnectionLimits();
return completeConnections.containsKey(id);
return addPeerAndEnforceConnectionLimits(peer, id, connection);
}
// otherwise, we need to compare the incoming peer to existing peers

// Disconnect if too many peers
if (peerCount() >= peerUpperBound) {
// If too many peers, disconnect one (could be the connecting peer if it does not compare
// favourably
disconnectWorstUselessPeerIfAtCapacityIncludingConnectingPeer(peer);
}
return addPeerAndEnforceConnectionLimits(peer, id, connection);
}

/**
* Returns true if this specific peer was added, after connection limits were checked
*
* @param peer the peer to add
* @param id the peer's id
* @param connection the peer's connection
* @return whether the peer remains added after connection limits are enforced
*/
private boolean addPeerAndEnforceConnectionLimits(
final EthPeer peer, final Bytes id, final PeerConnection connection) {
completeConnections.putIfAbsent(id, peer);
enforceRemoteConnectionLimits();
enforceConnectionLimits();
final boolean added = completeConnections.containsKey(id);
if (added) {
LOG.trace("Added peer {} with connection {} to completeConnections", id, connection);
} else {
LOG.trace("Did not add peer {} with connection {} to completeConnections", id, connection);
}
return added;
}

public void disconnectWorstIncomingUselessPeer() {
if (peerCount() >= getMaxPeers()) {
streamAvailablePeers()
.filter(p -> p.getConnection().inboundInitiated())
.filter(p -> !canExceedPeerLimits(p.getId()))
.min(getBestChainComparator())
.ifPresent(
peer -> {
LOG.atDebug()
.setMessage(
"disconnecting peer {}. Waiting for better peers. Current {} of max {}")
.addArgument(peer::getLoggableId)
.addArgument(this::peerCount)
.addArgument(this::getMaxPeers)
.log();
peer.disconnect(
DisconnectMessage.DisconnectReason.USELESS_PEER_BY_CHAIN_COMPARATOR);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@

public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
private static final Logger LOG = LoggerFactory.getLogger(EthProtocolManager.class);

private final EthScheduler scheduler;
private final CountDownLatch shutdown;
private final AtomicBoolean stopped = new AtomicBoolean(false);
Expand Down Expand Up @@ -405,9 +404,7 @@ public boolean shouldConnect(final Peer peer, final boolean incoming) {
.setMessage("ForkId OK or not available for peer {}")
.addArgument(peer::getLoggableId)
.log();
if (ethPeers.shouldConnect(peer, incoming)) {
return true;
}
return (ethPeers.shouldConnect(peer, incoming));
} else {
LOG.atDebug()
.setMessage("ForkId check failed for peer {} our fork id {} theirs {}")
Expand All @@ -417,7 +414,6 @@ public boolean shouldConnect(final Peer peer, final boolean incoming) {
.log();
return false;
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ private Stream<EthPeer> remainingPeersToTry() {
}

private void refreshPeers() {
// TODO this duplicates EthPeers.disconnectWorst
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at line 141, I think we could just not filter on !is.disconnected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

final EthPeers peers = getEthContext().getEthPeers();
// If we are at max connections, then refresh peers disconnecting one of the failed peers,
// or the least useful
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void deleteFastSyncState() {
protected FastSyncState updateMaxTrailingPeers(final FastSyncState state) {
if (state.getPivotBlockNumber().isPresent()) {
trailingPeerRequirements =
Optional.of(new TrailingPeerRequirements(state.getPivotBlockNumber().getAsLong(), 0));
Optional.of(new TrailingPeerRequirements(state.getPivotBlockNumber().getAsLong(), 5));
} else {
trailingPeerRequirements = Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ protected CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget() {
pivotBlockHeader.getNumber(),
ethPeers.peerCount(),
ethPeers.getMaxPeers());
ethPeers.disconnectWorstUselessPeer();
return completedFuture(Optional.empty());
} else {
return confirmPivotBlockHeader(bestPeer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ public void comparesPeersWithHeightAndTd() {
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerA)).isGreaterThan(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);

assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeer()).contains(peerB);
assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeerWithHeightEstimate())
.contains(peerB);
}

@Test
Expand All @@ -104,7 +100,6 @@ public void comparesPeersWithTdAndNoHeight() {
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);

assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeer()).contains(peerA);
assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeerWithHeightEstimate())
.isEmpty();
}
Expand Down
Loading
Loading