Skip to content

Commit

Permalink
Refactor packet classes for dependency injection (hyperledger#8271)
Browse files Browse the repository at this point in the history
* Apply expiration checks in PacketData classes

Signed-off-by: Matilda Clerke <[email protected]>

* Fix broken unit tests

Signed-off-by: Matilda Clerke <[email protected]>

* spotless

Signed-off-by: Matilda Clerke <[email protected]>

* Fix failing test

Signed-off-by: Matilda Clerke <[email protected]>

* Refactor to use clock to validate expiry

Signed-off-by: Matilda Clerke <[email protected]>

* Refactor Packet and PacketData

Signed-off-by: Matilda Clerke <[email protected]>

* Move java.inject version to platform/build.gradle

Signed-off-by: Matilda Clerke <[email protected]>

* Update copyright notices on Packet and PacketData classes

Signed-off-by: Matilda Clerke <[email protected]>

* Update PeerDiscoveryControllerTest to avoid use of Thread.sleep

Signed-off-by: Matilda Clerke <[email protected]>

* Update copyright noticed on PeerDiscoveryControllerTest and MockPacketDataFactory

Signed-off-by: Matilda Clerke <[email protected]>

* Remove old PacketTest

Signed-off-by: Matilda Clerke <[email protected]>

* Reorder platform/build.gradle api platform dependencies to alphabetical order

Signed-off-by: Matilda Clerke <[email protected]>

---------

Signed-off-by: Matilda Clerke <[email protected]>
  • Loading branch information
Matilda-Clerke authored Feb 13, 2025
1 parent fa19459 commit 90015fc
Show file tree
Hide file tree
Showing 100 changed files with 5,194 additions and 2,410 deletions.
3 changes: 3 additions & 0 deletions ethereum/p2p/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ dependencies {
implementation project(':util')

implementation 'com.google.guava:guava'
annotationProcessor 'com.google.dagger:dagger-compiler'
implementation 'com.google.dagger:dagger'
implementation 'dnsjava:dnsjava'
implementation 'io.netty:netty-transport-native-unix-common'
implementation 'io.vertx:vertx-core'
implementation 'javax.inject:javax.inject'

implementation 'io.tmio:tuweni-bytes'
implementation 'io.tmio:tuweni-crypto'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.hyperledger.besu.ethereum.chain.VariablesStorage;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.p2p.config.DiscoveryConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerRequirement;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PingPacketData;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.TimerUtil;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.packet.Packet;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.packet.ping.PingPacketData;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURLImpl;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.p2p.config.DiscoveryConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.TimerUtil;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.VertxTimerUtil;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.packet.DaggerPacketPackage;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.packet.Packet;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.packet.PacketDeserializer;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.packet.PacketPackage;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.packet.PacketSerializer;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
Expand Down Expand Up @@ -65,7 +69,10 @@ public class VertxPeerDiscoveryAgent extends PeerDiscoveryAgent {
/* The vert.x UDP socket. */
private DatagramSocket socket;

public VertxPeerDiscoveryAgent(
private final PacketSerializer packetSerializer;
private final PacketDeserializer packetDeserializer;

VertxPeerDiscoveryAgent(
final Vertx vertx,
final NodeKey nodeKey,
final DiscoveryConfiguration config,
Expand All @@ -75,7 +82,9 @@ public VertxPeerDiscoveryAgent(
final StorageProvider storageProvider,
final ForkIdManager forkIdManager,
final RlpxAgent rlpxAgent,
final PeerTable peerTable) {
final PeerTable peerTable,
final PacketSerializer packetSerializer,
final PacketDeserializer packetDeserializer) {
super(
nodeKey,
config,
Expand All @@ -88,6 +97,8 @@ public VertxPeerDiscoveryAgent(
peerTable);
checkArgument(vertx != null, "vertx instance cannot be null");
this.vertx = vertx;
this.packetSerializer = packetSerializer;
this.packetDeserializer = packetDeserializer;

metricsSystem.createIntegerGauge(
BesuMetricCategory.NETWORK,
Expand All @@ -96,6 +107,33 @@ public VertxPeerDiscoveryAgent(
pendingTaskCounter(vertx.nettyEventLoopGroup()));
}

public static VertxPeerDiscoveryAgent create(
final Vertx vertx,
final NodeKey nodeKey,
final DiscoveryConfiguration config,
final PeerPermissions peerPermissions,
final NatService natService,
final MetricsSystem metricsSystem,
final StorageProvider storageProvider,
final ForkIdManager forkIdManager,
final RlpxAgent rlpxAgent,
final PeerTable peerTable) {
PacketPackage packetPackage = DaggerPacketPackage.create();
return new VertxPeerDiscoveryAgent(
vertx,
nodeKey,
config,
peerPermissions,
natService,
metricsSystem,
storageProvider,
forkIdManager,
rlpxAgent,
peerTable,
packetPackage.packetSerializer(),
packetPackage.packetDeserializer());
}

private IntSupplier pendingTaskCounter(final EventLoopGroup eventLoopGroup) {
return () ->
StreamSupport.stream(eventLoopGroup.spliterator(), false)
Expand Down Expand Up @@ -172,7 +210,7 @@ protected CompletableFuture<Void> sendOutgoingPacket(
new RuntimeException("Discovery socket already closed, because Besu is closing down"));
} else {
socket.send(
packet.encode(),
packetSerializer.encode(packet),
peer.getEndpoint().getUdpPort(),
peer.getEnodeURL().getIpAsString(),
ar -> {
Expand Down Expand Up @@ -217,7 +255,7 @@ protected void handleOutgoingPacketError(
.setMessage("Peer {} is unreachable, native error code {}, packet: {}, stacktrace: {}")
.addArgument(peer)
.addArgument(nativeErr::expectedErr)
.addArgument(() -> wrapBuffer(packet.encode()))
.addArgument(() -> wrapBuffer(packetSerializer.encode(packet)))
.addArgument(err)
.log();
} else {
Expand All @@ -226,15 +264,15 @@ protected void handleOutgoingPacketError(
"Sending to peer {} failed, native error code {}, packet: {}, stacktrace: {}")
.addArgument(peer)
.addArgument(nativeErr.expectedErr())
.addArgument(wrapBuffer(packet.encode()))
.addArgument(wrapBuffer(packetSerializer.encode(packet)))
.addArgument(err)
.log();
}
} else if (err instanceof SocketException && err.getMessage().contains("unreachable")) {
LOG.atDebug()
.setMessage("Peer {} is unreachable, packet: {}")
.addArgument(peer)
.addArgument(() -> wrapBuffer(packet.encode()))
.addArgument(() -> wrapBuffer(packetSerializer.encode(packet)))
.addArgument(err)
.log();
} else if (err instanceof SocketException
Expand All @@ -251,14 +289,14 @@ protected void handleOutgoingPacketError(
LOG.atTrace()
.setMessage("Sending to peer {} failed, packet: {}, stacktrace: {}")
.addArgument(peer)
.addArgument(() -> wrapBuffer(packet.encode()))
.addArgument(() -> wrapBuffer(packetSerializer.encode(packet)))
.addArgument(err)
.log();
} else {
LOG.warn(
"Sending to peer {} failed, packet: {}, stacktrace: {}",
peer,
wrapBuffer(packet.encode()),
wrapBuffer(packetSerializer.encode(packet)),
err);
}
}
Expand Down Expand Up @@ -290,7 +328,7 @@ private void handlePacket(final DatagramPacket datagram) {
vertx.<Packet>executeBlocking(
future -> {
try {
future.complete(Packet.decode(datagram.data()));
future.complete(packetDeserializer.decode(datagram.data()));
} catch (final Throwable t) {
future.fail(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.p2p.discovery.internal;

import org.hyperledger.besu.ethereum.p2p.discovery.internal.packet.Packet;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 90015fc

Please sign in to comment.