diff --git a/op-chain-ops/script/console2_gen.go b/op-chain-ops/script/console2_gen.go index 9430a9441ccec..4a7cc9a7aa7bf 100644 --- a/op-chain-ops/script/console2_gen.go +++ b/op-chain-ops/script/console2_gen.go @@ -1511,4 +1511,3 @@ func (c *ConsolePrecompile) Log_59cfcbe3(p0 *big.Int, p1 *big.Int, p2 *big.Int, func (c *ConsolePrecompile) Log_193fb800(p0 *big.Int, p1 *big.Int, p2 *big.Int, p3 *big.Int) { c.log("p0", p0, "p1", p1, "p2", p2, "p3", p3) } - diff --git a/op-node/node/node.go b/op-node/node/node.go index e31ad00c7f498..910f6cc5838c4 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -8,6 +8,8 @@ import ( "sync/atomic" "time" + "github.com/ethereum-optimism/optimism/op-node/rollup/sequencing" + "github.com/hashicorp/go-multierror" "github.com/libp2p/go-libp2p/core/peer" @@ -448,7 +450,7 @@ func (n *OpNode) initHeartbeat(cfg *Config) { return } var peerID string - if cfg.P2P.Disabled() { + if !n.p2pEnabled() { peerID = "disabled" } else { // Is there a check for p2p enabled missing here? Is it implied that p2p is enabled if there's a heartbeat? @@ -650,9 +652,10 @@ func (n *OpNode) Stop(ctx context.Context) error { } // Stop sequencer and report last hash. l2Driver can be nil if we're cleaning up a failed init. - if n.l2Driver != nil && n.cfg.Driver.SequencerEnabled { + if n.l2Driver != nil { latestHead, err := n.l2Driver.StopSequencer(ctx) switch { + case errors.Is(err, sequencing.ErrSequencerNotEnabled): case errors.Is(err, driver.ErrSequencerAlreadyStopped): n.log.Info("stopping node: sequencer already stopped", "latestHead", latestHead) case err == nil: diff --git a/op-node/p2p/config.go b/op-node/p2p/config.go index 94b75a95de263..ee21ba20fc395 100644 --- a/op-node/p2p/config.go +++ b/op-node/p2p/config.go @@ -48,6 +48,7 @@ type HostMetrics interface { // SetupP2P provides a host and discovery service for usage in the rollup node. type SetupP2P interface { Check() error + // Looks like this was started to prevent partially inited p2p. Disabled() bool // Host creates a libp2p host service. Returns nil, nil if p2p is disabled. Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error) diff --git a/op-node/p2p/node.go b/op-node/p2p/node.go index 4c88556ddd9c6..70f7dbc67c0bb 100644 --- a/op-node/p2p/node.go +++ b/op-node/p2p/node.go @@ -52,10 +52,23 @@ type NodeP2P struct { // NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil. // If metrics are configured, a bandwidth monitor will be spawned in a goroutine. -func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer, elSyncEnabled bool) (*NodeP2P, error) { +func NewNodeP2P( + resourcesCtx context.Context, + rollupCfg *rollup.Config, + log log.Logger, + setup SetupP2P, + gossipIn GossipIn, + l2Chain L2Chain, + runCfg GossipRuntimeConfig, + metrics metrics.Metricer, + elSyncEnabled bool, +) (*NodeP2P, error) { if setup == nil { return nil, errors.New("p2p node cannot be created without setup") } + if setup.Disabled() { + return nil, errors.New("SetupP2P.Disabled is true") + } var n NodeP2P if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, l2Chain, runCfg, metrics, elSyncEnabled); err != nil { closeErr := n.Close() @@ -65,12 +78,24 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log. return nil, err } if n.host == nil { - return nil, nil + // See prior comment about n.host optionality: + // TODO(CLI-4016): host is not optional, NodeP2P as a whole is. + panic("host is not optional if p2p is enabled") } return &n, nil } -func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer, elSyncEnabled bool) error { +func (n *NodeP2P) init( + resourcesCtx context.Context, + rollupCfg *rollup.Config, + log log.Logger, + setup SetupP2P, + gossipIn GossipIn, + l2Chain L2Chain, + runCfg GossipRuntimeConfig, + metrics metrics.Metricer, + elSyncEnabled bool, +) error { bwc := p2pmetrics.NewBandwidthCounter() n.log = log @@ -85,86 +110,83 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l return fmt.Errorf("failed to start p2p host: %w", err) } - // TODO(CLI-4016): host is not optional, NodeP2P as a whole is. This if statement is wrong - if n.host != nil { - // Enable extra features, if any. During testing we don't setup the most advanced host all the time. - if extra, ok := n.host.(ExtraHostFeatures); ok { - n.gater = extra.ConnectionGater() - n.connMgr = extra.ConnectionManager() - } - eps, ok := n.host.Peerstore().(store.ExtendedPeerstore) - if !ok { - return fmt.Errorf("cannot init without extended peerstore: %w", err) - } - n.store = eps - scoreParams := setup.PeerScoringParams() + // Enable extra features, if any. During testing we don't setup the most advanced host all the time. + if extra, ok := n.host.(ExtraHostFeatures); ok { + n.gater = extra.ConnectionGater() + n.connMgr = extra.ConnectionManager() + } + eps, ok := n.host.Peerstore().(store.ExtendedPeerstore) + if !ok { + return fmt.Errorf("cannot init without extended peerstore: %w", err) + } + n.store = eps + scoreParams := setup.PeerScoringParams() - if scoreParams != nil { - n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers) - } else { - n.appScorer = &NoopApplicationScorer{} - } - // Activate the P2P req-resp sync if enabled by feature-flag. - if setup.ReqRespSyncEnabled() && !elSyncEnabled { - n.syncCl = NewSyncClient(log, rollupCfg, n.host, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer) - n.host.Network().Notify(&network.NotifyBundle{ - ConnectedF: func(nw network.Network, conn network.Conn) { - n.syncCl.AddPeer(conn.RemotePeer()) - }, - DisconnectedF: func(nw network.Network, conn network.Conn) { - // only when no connection is available, we can remove the peer - if nw.Connectedness(conn.RemotePeer()) == network.NotConnected { - n.syncCl.RemovePeer(conn.RemotePeer()) - } - }, - }) - n.syncCl.Start() - // the host may already be connected to peers, add them all to the sync client - for _, peerID := range n.host.Network().Peers() { - n.syncCl.AddPeer(peerID) - } - if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy - n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics) - // register the sync protocol with libp2p host - payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest) - n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber) - } + if scoreParams != nil { + n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers) + } else { + n.appScorer = &NoopApplicationScorer{} + } + // Activate the P2P req-resp sync if enabled by feature-flag. + if setup.ReqRespSyncEnabled() && !elSyncEnabled { + n.syncCl = NewSyncClient(log, rollupCfg, n.host, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer) + n.host.Network().Notify(&network.NotifyBundle{ + ConnectedF: func(nw network.Network, conn network.Conn) { + n.syncCl.AddPeer(conn.RemotePeer()) + }, + DisconnectedF: func(nw network.Network, conn network.Conn) { + // only when no connection is available, we can remove the peer + if nw.Connectedness(conn.RemotePeer()) == network.NotConnected { + n.syncCl.RemovePeer(conn.RemotePeer()) + } + }, + }) + n.syncCl.Start() + // the host may already be connected to peers, add them all to the sync client + for _, peerID := range n.host.Network().Peers() { + n.syncCl.AddPeer(peerID) } - n.scorer = NewScorer(rollupCfg, eps, metrics, n.appScorer, log) - // notify of any new connections/streams/etc. - n.host.Network().Notify(NewNetworkNotifier(log, metrics)) - // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled. - n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log) - if err != nil { - return fmt.Errorf("failed to start gossipsub router: %w", err) + if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy + n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics) + // register the sync protocol with libp2p host + payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest) + n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber) } - n.gsOut, err = JoinGossip(n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn) - if err != nil { - return fmt.Errorf("failed to join blocks gossip topic: %w", err) - } - log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().String()) + } + n.scorer = NewScorer(rollupCfg, eps, metrics, n.appScorer, log) + // notify of any new connections/streams/etc. + n.host.Network().Notify(NewNetworkNotifier(log, metrics)) + // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled. + n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log) + if err != nil { + return fmt.Errorf("failed to start gossipsub router: %w", err) + } + n.gsOut, err = JoinGossip(n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn) + if err != nil { + return fmt.Errorf("failed to join blocks gossip topic: %w", err) + } + log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().String()) - tcpPort, err := FindActiveTCPPort(n.host) - if err != nil { - log.Warn("failed to find what TCP port p2p is binded to", "err", err) - } + tcpPort, err := FindActiveTCPPort(n.host) + if err != nil { + log.Warn("failed to find what TCP port p2p is binded to", "err", err) + } - // All nil if disabled. - n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"), rollupCfg, tcpPort) - if err != nil { - return fmt.Errorf("failed to start discv5: %w", err) - } + // All nil if disabled. + n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"), rollupCfg, tcpPort) + if err != nil { + return fmt.Errorf("failed to start discv5: %w", err) + } - if metrics != nil { - go metrics.RecordBandwidth(resourcesCtx, bwc) - } + if metrics != nil { + go metrics.RecordBandwidth(resourcesCtx, bwc) + } - if setup.BanPeers() { - n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration()) - n.peerMonitor.Start() - } - n.appScorer.start() + if setup.BanPeers() { + n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration()) + n.peerMonitor.Start() } + n.appScorer.start() return nil }