Skip to content

Commit

Permalink
Fix up use of SetupP2P.Disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Aug 20, 2024
1 parent 9c1d20c commit 571cbec
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 79 deletions.
1 change: 0 additions & 1 deletion op-chain-ops/script/console2_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -1511,4 +1511,3 @@ func (c *ConsolePrecompile) Log_59cfcbe3(p0 *big.Int, p1 *big.Int, p2 *big.Int,
func (c *ConsolePrecompile) Log_193fb800(p0 *big.Int, p1 *big.Int, p2 *big.Int, p3 *big.Int) {
c.log("p0", p0, "p1", p1, "p2", p2, "p3", p3)
}

8 changes: 5 additions & 3 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-node/rollup/sequencing"

"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p/core/peer"

Expand Down Expand Up @@ -448,10 +450,9 @@ func (n *OpNode) initHeartbeat(cfg *Config) {
return
}
var peerID string
if cfg.P2P.Disabled() {
if !n.p2pEnabled() {
peerID = "disabled"
} else {
// Is there a check for p2p enabled missing here? Is it implied that p2p is enabled if there's a heartbeat?
peerID = n.P2P().Host().ID().String()
}

Expand Down Expand Up @@ -650,9 +651,10 @@ func (n *OpNode) Stop(ctx context.Context) error {
}

// Stop sequencer and report last hash. l2Driver can be nil if we're cleaning up a failed init.
if n.l2Driver != nil && n.cfg.Driver.SequencerEnabled {
if n.l2Driver != nil {
latestHead, err := n.l2Driver.StopSequencer(ctx)
switch {
case errors.Is(err, sequencing.ErrSequencerNotEnabled):
case errors.Is(err, driver.ErrSequencerAlreadyStopped):
n.log.Info("stopping node: sequencer already stopped", "latestHead", latestHead)
case err == nil:
Expand Down
1 change: 1 addition & 0 deletions op-node/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type HostMetrics interface {
// SetupP2P provides a host and discovery service for usage in the rollup node.
type SetupP2P interface {
Check() error
// Looks like this was started to prevent partially inited p2p.
Disabled() bool
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error)
Expand Down
172 changes: 97 additions & 75 deletions op-node/p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,23 @@ type NodeP2P struct {

// NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil.
// If metrics are configured, a bandwidth monitor will be spawned in a goroutine.
func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer, elSyncEnabled bool) (*NodeP2P, error) {
func NewNodeP2P(
resourcesCtx context.Context,
rollupCfg *rollup.Config,
log log.Logger,
setup SetupP2P,
gossipIn GossipIn,
l2Chain L2Chain,
runCfg GossipRuntimeConfig,
metrics metrics.Metricer,
elSyncEnabled bool,
) (*NodeP2P, error) {
if setup == nil {
return nil, errors.New("p2p node cannot be created without setup")
}
if setup.Disabled() {
return nil, errors.New("SetupP2P.Disabled is true")
}
var n NodeP2P
if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, l2Chain, runCfg, metrics, elSyncEnabled); err != nil {
closeErr := n.Close()
Expand All @@ -65,12 +78,24 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.
return nil, err
}
if n.host == nil {
return nil, nil
// See prior comment about n.host optionality:
// TODO(CLI-4016): host is not optional, NodeP2P as a whole is.
panic("host is not optional if p2p is enabled")
}
return &n, nil
}

func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer, elSyncEnabled bool) error {
func (n *NodeP2P) init(
resourcesCtx context.Context,
rollupCfg *rollup.Config,
log log.Logger,
setup SetupP2P,
gossipIn GossipIn,
l2Chain L2Chain,
runCfg GossipRuntimeConfig,
metrics metrics.Metricer,
elSyncEnabled bool,
) error {
bwc := p2pmetrics.NewBandwidthCounter()

n.log = log
Expand All @@ -85,86 +110,83 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
return fmt.Errorf("failed to start p2p host: %w", err)
}

// TODO(CLI-4016): host is not optional, NodeP2P as a whole is. This if statement is wrong
if n.host != nil {
// Enable extra features, if any. During testing we don't setup the most advanced host all the time.
if extra, ok := n.host.(ExtraHostFeatures); ok {
n.gater = extra.ConnectionGater()
n.connMgr = extra.ConnectionManager()
}
eps, ok := n.host.Peerstore().(store.ExtendedPeerstore)
if !ok {
return fmt.Errorf("cannot init without extended peerstore: %w", err)
}
n.store = eps
scoreParams := setup.PeerScoringParams()
// Enable extra features, if any. During testing we don't setup the most advanced host all the time.
if extra, ok := n.host.(ExtraHostFeatures); ok {
n.gater = extra.ConnectionGater()
n.connMgr = extra.ConnectionManager()
}
eps, ok := n.host.Peerstore().(store.ExtendedPeerstore)
if !ok {
return fmt.Errorf("cannot init without extended peerstore: %w", err)
}
n.store = eps
scoreParams := setup.PeerScoringParams()

if scoreParams != nil {
n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers)
} else {
n.appScorer = &NoopApplicationScorer{}
}
// Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() && !elSyncEnabled {
n.syncCl = NewSyncClient(log, rollupCfg, n.host, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer)
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(nw network.Network, conn network.Conn) {
n.syncCl.AddPeer(conn.RemotePeer())
},
DisconnectedF: func(nw network.Network, conn network.Conn) {
// only when no connection is available, we can remove the peer
if nw.Connectedness(conn.RemotePeer()) == network.NotConnected {
n.syncCl.RemovePeer(conn.RemotePeer())
}
},
})
n.syncCl.Start()
// the host may already be connected to peers, add them all to the sync client
for _, peerID := range n.host.Network().Peers() {
n.syncCl.AddPeer(peerID)
}
if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy
n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)
// register the sync protocol with libp2p host
payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
}
if scoreParams != nil {
n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers)
} else {
n.appScorer = &NoopApplicationScorer{}
}
// Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() && !elSyncEnabled {
n.syncCl = NewSyncClient(log, rollupCfg, n.host, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer)
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(nw network.Network, conn network.Conn) {
n.syncCl.AddPeer(conn.RemotePeer())
},
DisconnectedF: func(nw network.Network, conn network.Conn) {
// only when no connection is available, we can remove the peer
if nw.Connectedness(conn.RemotePeer()) == network.NotConnected {
n.syncCl.RemovePeer(conn.RemotePeer())
}
},
})
n.syncCl.Start()
// the host may already be connected to peers, add them all to the sync client
for _, peerID := range n.host.Network().Peers() {
n.syncCl.AddPeer(peerID)
}
n.scorer = NewScorer(rollupCfg, eps, metrics, n.appScorer, log)
// notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err)
if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy
n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)
// register the sync protocol with libp2p host
payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
}
n.gsOut, err = JoinGossip(n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
if err != nil {
return fmt.Errorf("failed to join blocks gossip topic: %w", err)
}
log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().String())
}
n.scorer = NewScorer(rollupCfg, eps, metrics, n.appScorer, log)
// notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err)
}
n.gsOut, err = JoinGossip(n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
if err != nil {
return fmt.Errorf("failed to join blocks gossip topic: %w", err)
}
log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().String())

tcpPort, err := FindActiveTCPPort(n.host)
if err != nil {
log.Warn("failed to find what TCP port p2p is binded to", "err", err)
}
tcpPort, err := FindActiveTCPPort(n.host)
if err != nil {
log.Warn("failed to find what TCP port p2p is binded to", "err", err)
}

// All nil if disabled.
n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"), rollupCfg, tcpPort)
if err != nil {
return fmt.Errorf("failed to start discv5: %w", err)
}
// All nil if disabled.
n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"), rollupCfg, tcpPort)
if err != nil {
return fmt.Errorf("failed to start discv5: %w", err)
}

if metrics != nil {
go metrics.RecordBandwidth(resourcesCtx, bwc)
}
if metrics != nil {
go metrics.RecordBandwidth(resourcesCtx, bwc)
}

if setup.BanPeers() {
n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration())
n.peerMonitor.Start()
}
n.appScorer.start()
if setup.BanPeers() {
n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration())
n.peerMonitor.Start()
}
n.appScorer.start()
return nil
}

Expand Down

0 comments on commit 571cbec

Please sign in to comment.