Skip to content

Commit

Permalink
Shutdown sequencer before stopping p2p
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Aug 19, 2024
1 parent 698633c commit 48045a6
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 26 deletions.
33 changes: 23 additions & 10 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,9 +647,13 @@ func (oc *OpConductor) action() {

oc.log.Debug("exiting action with status and error", "status", status, "err", err)
if err != nil {
oc.log.Error("failed to execute step, queueing another one to retry", "err", err, "status", status)
time.Sleep(oc.retryBackoff())
oc.queueAction()
select {
case <-oc.shutdownCtx.Done():
case <-time.After(oc.retryBackoff()):
oc.log.Error("failed to execute step, queueing another one to retry", "err", err, "status", status)
time.Sleep(oc.retryBackoff())
oc.queueAction()
}
return
}

Expand Down Expand Up @@ -683,18 +687,27 @@ func (oc *OpConductor) transferLeader() error {
}

func (oc *OpConductor) stopSequencer() error {
oc.log.Info("stopping sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())

_, err := oc.ctrl.StopSequencer(context.Background())
if err != nil {
if strings.Contains(err.Error(), driver.ErrSequencerAlreadyStopped.Error()) {
oc.log.Warn("sequencer already stopped.", "err", err)
oc.log.Info(
"stopping sequencer",
"server", oc.cons.ServerID(),
"leader", oc.leader.Load(),
"healthy", oc.healthy.Load(),
"active", oc.seqActive.Load())

// Getting stuck stopping a sequencer can't be good. Is it okay to fail to stop a sequencer on
// shutdown? From what I can tell it is.
latestHead, err := oc.ctrl.StopSequencer(oc.shutdownCtx)
if err == nil {
// None of the consensus state should have changed here so don't log it again.
oc.log.Info("stopped sequencer", "latestHead", latestHead)
} else {
if errors.Is(err, driver.ErrSequencerAlreadyStopped) {
oc.log.Warn("sequencer already stopped", "err", err)
} else {
return errors.Wrap(err, "failed to stop sequencer")
}
}
oc.metrics.RecordStopSequencer(err == nil)

oc.seqActive.Store(false)
return nil
}
Expand Down
61 changes: 45 additions & 16 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type closableSafeDB interface {
}

type OpNode struct {
// Retain the config to test for active features rather than test for runtime state.
cfg *Config
log log.Logger
appVersion string
metrics *metrics.Metrics
Expand Down Expand Up @@ -95,6 +97,7 @@ func New(ctx context.Context, cfg *Config, log log.Logger, appVersion string, m
}

n := &OpNode{
cfg: cfg,
log: log,
appVersion: appVersion,
metrics: m,
Expand Down Expand Up @@ -136,7 +139,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error {
if err := n.initP2PSigner(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the P2P signer: %w", err)
}
if err := n.initP2P(ctx, cfg); err != nil {
if err := n.initP2P(cfg); err != nil {
return fmt.Errorf("failed to init the P2P stack: %w", err)
}
// Only expose the server at the end, ensuring all RPC backend components are initialized.
Expand Down Expand Up @@ -410,7 +413,7 @@ func (n *OpNode) initRPCServer(cfg *Config) error {
if err != nil {
return err
}
if n.p2pNode != nil {
if n.p2pEnabled() {
server.EnableP2P(p2p.NewP2PAPIBackend(n.p2pNode, n.log, n.metrics))
}
if cfg.RPC.EnableAdmin {
Expand Down Expand Up @@ -448,6 +451,7 @@ func (n *OpNode) initHeartbeat(cfg *Config) {
if cfg.P2P.Disabled() {
peerID = "disabled"
} else {
// Is there a check for p2p enabled missing here? Is it implied that p2p is enabled if there's a heartbeat?
peerID = n.P2P().Host().ID().String()
}

Expand Down Expand Up @@ -483,30 +487,35 @@ func (n *OpNode) initPProf(cfg *Config) error {
return nil
}

func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
if cfg.P2P != nil {
func (n *OpNode) p2pEnabled() bool {
return n.cfg.P2P != nil
}

func (n *OpNode) initP2P(cfg *Config) (err error) {
if n.p2pNode != nil {
panic("p2p node already initialized")
}
if n.p2pEnabled() {
// TODO(protocol-quest/97): Use EL Sync instead of CL Alt sync for fetching missing blocks in the payload queue.
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false)
if err != nil || p2pNode == nil {
return err
n.p2pNode, err = p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false)
if err != nil {
return
}
n.p2pNode = p2pNode
if n.p2pNode.Dv5Udp() != nil {
go n.p2pNode.DiscoveryProcess(n.resourcesCtx, n.log, &cfg.Rollup, cfg.P2P.TargetPeers())
}
}
return nil
}

func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) (err error) {
// the p2p signer setup is optional
if cfg.P2PSigner == nil {
return nil
return
}
// p2pSigner may still be nil, the signer setup may not create any signer, the signer is optional
var err error
n.p2pSigner, err = cfg.P2PSigner.SetupSigner(ctx)
return err
return
}

func (n *OpNode) Start(ctx context.Context) error {
Expand Down Expand Up @@ -562,7 +571,7 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa
n.tracer.OnPublishL2Payload(ctx, envelope)

// publish to p2p, if we are running p2p at all
if n.p2pNode != nil {
if n.p2pEnabled() {
payload := envelope.ExecutionPayload
if n.p2pSigner == nil {
return fmt.Errorf("node has no p2p signer, payload %s cannot be published", payload.ID())
Expand All @@ -576,7 +585,7 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa

func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *eth.ExecutionPayloadEnvelope) error {
// ignore if it's from ourselves
if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
if n.p2pEnabled() && from == n.p2pNode.Host().ID() {
return nil
}

Expand All @@ -597,9 +606,13 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *
}

func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() {
if n.p2pEnabled() && n.p2pNode.AltSyncEnabled() {
if unixTimeStale(start.Time, 12*time.Hour) {
n.log.Debug("ignoring request to sync L2 range, timestamp is too old for p2p", "start", start, "end", end, "start_time", start.Time)
n.log.Debug(
"ignoring request to sync L2 range, timestamp is too old for p2p",
"start", start,
"end", end,
"start_time", start.Time)
return nil
}
return n.p2pNode.RequestL2Range(ctx, start, end)
Expand Down Expand Up @@ -635,10 +648,26 @@ func (n *OpNode) Stop(ctx context.Context) error {
result = multierror.Append(result, fmt.Errorf("failed to close RPC server: %w", err))
}
}
// TODO: stop sequencer and report last hash

// Check in case we're cleaning up a failed init.
if n.l2Driver != nil && n.cfg.Driver.SequencerEnabled {
latestHead, err := n.l2Driver.StopSequencer(ctx)
switch {
case errors.Is(err, driver.ErrSequencerAlreadyStopped):
n.log.Info("stopping node: sequencer already stopped", "latestHead", latestHead)
case err == nil:
n.log.Info("stopped sequencer", "latestHead", latestHead)
default:
result = multierror.Append(result, fmt.Errorf("error stopping sequencer: %w", err))
}
}
if n.p2pNode != nil {
if err := n.p2pNode.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %w", err))
}
// Mayhem
n.p2pNode = nil
}
if n.p2pSigner != nil {
if err := n.p2pSigner.Close(); err != nil {
Expand Down

0 comments on commit 48045a6

Please sign in to comment.