Skip to content

Commit

Permalink
Conductor and sequencer p2p refactoring (#11455)
Browse files Browse the repository at this point in the history
* Shutdown sequencer before stopping p2p

* Check p2p isn't also disabled

Co-authored-by: Sebastian Stammler <[email protected]>

* Remove missed time.Sleep

* Fix up use of SetupP2P.Disabled

* Revert error check after RPC boundary

* Add comment about context for StopSequencer

* Add Config.p2pEnabled

* op-node: Make Config.P2PEnabled public

---------

Co-authored-by: Sebastian Stammler <[email protected]>
  • Loading branch information
anacrolix and sebastianst authored Aug 29, 2024
1 parent e53a86a commit c7b91ab
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 102 deletions.
1 change: 0 additions & 1 deletion op-chain-ops/script/console2_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -1511,4 +1511,3 @@ func (c *ConsolePrecompile) Log_59cfcbe3(p0 *big.Int, p1 *big.Int, p2 *big.Int,
func (c *ConsolePrecompile) Log_193fb800(p0 *big.Int, p1 *big.Int, p2 *big.Int, p3 *big.Int) {
c.log(p0, p1, p2, p3)
}

36 changes: 27 additions & 9 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,9 +647,12 @@ func (oc *OpConductor) action() {

oc.log.Debug("exiting action with status and error", "status", status, "err", err)
if err != nil {
oc.log.Error("failed to execute step, queueing another one to retry", "err", err, "status", status)
time.Sleep(oc.retryBackoff())
oc.queueAction()
select {
case <-oc.shutdownCtx.Done():
case <-time.After(oc.retryBackoff()):
oc.log.Error("failed to execute step, queueing another one to retry", "err", err, "status", status)
oc.queueAction()
}
return
}

Expand Down Expand Up @@ -683,18 +686,33 @@ func (oc *OpConductor) transferLeader() error {
}

func (oc *OpConductor) stopSequencer() error {
oc.log.Info("stopping sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())

_, err := oc.ctrl.StopSequencer(context.Background())
if err != nil {
oc.log.Info(
"stopping sequencer",
"server", oc.cons.ServerID(),
"leader", oc.leader.Load(),
"healthy", oc.healthy.Load(),
"active", oc.seqActive.Load())

// Quoting (@zhwrd): StopSequencer is called after conductor loses leadership. In the event that
// the StopSequencer call fails, it actually has little real consequences because the sequencer
// cant produce a block and gossip / commit it to the raft log (requires leadership). Once
// conductor comes back up it will check its leader and sequencer state and attempt to stop the
// sequencer again. So it is "okay" to fail to stop a sequencer, the state will eventually be
// rectified and we won't have two active sequencers that are actually producing blocks.
//
// To that end we allow to cancel the StopSequencer call if we're shutting down.
latestHead, err := oc.ctrl.StopSequencer(oc.shutdownCtx)
if err == nil {
// None of the consensus state should have changed here so don't log it again.
oc.log.Info("stopped sequencer", "latestHead", latestHead)
} else {
if strings.Contains(err.Error(), driver.ErrSequencerAlreadyStopped.Error()) {
oc.log.Warn("sequencer already stopped.", "err", err)
oc.log.Warn("sequencer already stopped", "err", err)
} else {
return errors.Wrap(err, "failed to stop sequencer")
}
}
oc.metrics.RecordStopSequencer(err == nil)

oc.seqActive.Store(false)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion op-e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (sys *System) Close() {
}

for name, node := range sys.RollupNodes {
if err := node.Stop(postCtx); err != nil && !errors.Is(err, rollupNode.ErrAlreadyClosed) {
if err := node.Stop(postCtx); err != nil && !errors.Is(err, rollupNode.ErrAlreadyClosed) && !errors.Is(err, postCtx.Err()) {
combinedErr = errors.Join(combinedErr, fmt.Errorf("stop rollup node %v: %w", name, err))
}
}
Expand Down
4 changes: 4 additions & 0 deletions op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@ func (cfg *Config) Check() error {
}
return nil
}

func (cfg *Config) P2PEnabled() bool {
return cfg.P2P != nil && !cfg.P2P.Disabled()
}
62 changes: 46 additions & 16 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-node/rollup/sequencing"

"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p/core/peer"

Expand Down Expand Up @@ -40,6 +42,8 @@ type closableSafeDB interface {
}

type OpNode struct {
// Retain the config to test for active features rather than test for runtime state.
cfg *Config
log log.Logger
appVersion string
metrics *metrics.Metrics
Expand Down Expand Up @@ -93,6 +97,7 @@ func New(ctx context.Context, cfg *Config, log log.Logger, appVersion string, m
}

n := &OpNode{
cfg: cfg,
log: log,
appVersion: appVersion,
metrics: m,
Expand Down Expand Up @@ -134,7 +139,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error {
if err := n.initP2PSigner(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the P2P signer: %w", err)
}
if err := n.initP2P(ctx, cfg); err != nil {
if err := n.initP2P(cfg); err != nil {
return fmt.Errorf("failed to init the P2P stack: %w", err)
}
// Only expose the server at the end, ensuring all RPC backend components are initialized.
Expand Down Expand Up @@ -407,7 +412,7 @@ func (n *OpNode) initRPCServer(cfg *Config) error {
if err != nil {
return err
}
if n.p2pNode != nil {
if n.p2pEnabled() {
server.EnableP2P(p2p.NewP2PAPIBackend(n.p2pNode, n.log, n.metrics))
}
if cfg.RPC.EnableAdmin {
Expand Down Expand Up @@ -454,30 +459,35 @@ func (n *OpNode) initPProf(cfg *Config) error {
return nil
}

func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
if cfg.P2P != nil {
func (n *OpNode) p2pEnabled() bool {
return n.cfg.P2PEnabled()
}

func (n *OpNode) initP2P(cfg *Config) (err error) {
if n.p2pNode != nil {
panic("p2p node already initialized")
}
if n.p2pEnabled() {
// TODO(protocol-quest/97): Use EL Sync instead of CL Alt sync for fetching missing blocks in the payload queue.
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false)
if err != nil || p2pNode == nil {
return err
n.p2pNode, err = p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false)
if err != nil {
return
}
n.p2pNode = p2pNode
if n.p2pNode.Dv5Udp() != nil {
go n.p2pNode.DiscoveryProcess(n.resourcesCtx, n.log, &cfg.Rollup, cfg.P2P.TargetPeers())
}
}
return nil
}

func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) (err error) {
// the p2p signer setup is optional
if cfg.P2PSigner == nil {
return nil
return
}
// p2pSigner may still be nil, the signer setup may not create any signer, the signer is optional
var err error
n.p2pSigner, err = cfg.P2PSigner.SetupSigner(ctx)
return err
return
}

func (n *OpNode) Start(ctx context.Context) error {
Expand Down Expand Up @@ -533,7 +543,7 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa
n.tracer.OnPublishL2Payload(ctx, envelope)

// publish to p2p, if we are running p2p at all
if n.p2pNode != nil {
if n.p2pEnabled() {
payload := envelope.ExecutionPayload
if n.p2pSigner == nil {
return fmt.Errorf("node has no p2p signer, payload %s cannot be published", payload.ID())
Expand All @@ -547,7 +557,7 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa

func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *eth.ExecutionPayloadEnvelope) error {
// ignore if it's from ourselves
if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
if n.p2pEnabled() && from == n.p2pNode.Host().ID() {
return nil
}

Expand All @@ -568,9 +578,13 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *
}

func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() {
if n.p2pEnabled() && n.p2pNode.AltSyncEnabled() {
if unixTimeStale(start.Time, 12*time.Hour) {
n.log.Debug("ignoring request to sync L2 range, timestamp is too old for p2p", "start", start, "end", end, "start_time", start.Time)
n.log.Debug(
"ignoring request to sync L2 range, timestamp is too old for p2p",
"start", start,
"end", end,
"start_time", start.Time)
return nil
}
return n.p2pNode.RequestL2Range(ctx, start, end)
Expand Down Expand Up @@ -606,10 +620,26 @@ func (n *OpNode) Stop(ctx context.Context) error {
result = multierror.Append(result, fmt.Errorf("failed to close RPC server: %w", err))
}
}

// Stop sequencer and report last hash. l2Driver can be nil if we're cleaning up a failed init.
if n.l2Driver != nil {
latestHead, err := n.l2Driver.StopSequencer(ctx)
switch {
case errors.Is(err, sequencing.ErrSequencerNotEnabled):
case errors.Is(err, driver.ErrSequencerAlreadyStopped):
n.log.Info("stopping node: sequencer already stopped", "latestHead", latestHead)
case err == nil:
n.log.Info("stopped sequencer", "latestHead", latestHead)
default:
result = multierror.Append(result, fmt.Errorf("error stopping sequencer: %w", err))
}
}
if n.p2pNode != nil {
if err := n.p2pNode.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %w", err))
}
// Prevent further use of p2p.
n.p2pNode = nil
}
if n.p2pSigner != nil {
if err := n.p2pSigner.Close(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions op-node/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type HostMetrics interface {
// SetupP2P provides a host and discovery service for usage in the rollup node.
type SetupP2P interface {
Check() error
// Looks like this was started to prevent partially inited p2p.
Disabled() bool
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error)
Expand Down
Loading

0 comments on commit c7b91ab

Please sign in to comment.