diff --git a/internal/docker/nodes/.env b/internal/docker/nodes/.env new file mode 100644 index 000000000..e69de29bb diff --git a/prover/prover.go b/prover/prover.go index 3f7617382..3d692c64c 100644 --- a/prover/prover.go +++ b/prover/prover.go @@ -18,7 +18,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/leveldb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/urfave/cli/v2" @@ -70,17 +69,8 @@ type Prover struct { proofSubmitters []proofSubmitter.Submitter proofContester proofSubmitter.Contester - // Subscriptions - blockProposedCh chan *bindings.TaikoL1ClientBlockProposed - blockProposedSub event.Subscription - transitionProvedCh chan *bindings.TaikoL1ClientTransitionProved - transitionProvedSub event.Subscription - transitionContestedCh chan *bindings.TaikoL1ClientTransitionContested - transitionContestedSub event.Subscription - blockVerifiedCh chan *bindings.TaikoL1ClientBlockVerified - blockVerifiedSub event.Subscription - proofWindowExpiredCh chan *bindings.TaikoL1ClientBlockProposed - proveNotify chan struct{} + proofWindowExpiredCh chan *bindings.TaikoL1ClientBlockProposed + proveNotify chan struct{} // Proof related proofGenerationCh chan *proofProducer.ProofWithHeader @@ -144,10 +134,6 @@ func InitFromConfig(ctx context.Context, p *Prover, cfg *Config) (err error) { p.proverAddress = crypto.PubkeyToAddress(p.cfg.L1ProverPrivKey.PublicKey) chBufferSize := p.protocolConfigs.BlockMaxProposals - p.blockProposedCh = make(chan *bindings.TaikoL1ClientBlockProposed, chBufferSize) - p.blockVerifiedCh = make(chan *bindings.TaikoL1ClientBlockVerified, chBufferSize) - p.transitionProvedCh = make(chan *bindings.TaikoL1ClientTransitionProved, chBufferSize) - p.transitionContestedCh = make(chan *bindings.TaikoL1ClientTransitionContested, chBufferSize) p.proofGenerationCh = make(chan *proofProducer.ProofWithHeader, chBufferSize) p.proofWindowExpiredCh = make(chan *bindings.TaikoL1ClientBlockProposed, chBufferSize) p.proveNotify = make(chan struct{}, 1) @@ -407,9 +393,6 @@ func (p *Prover) Start() error { } } - p.wg.Add(1) - p.initSubscription() - go func() { if err := p.srv.Start(fmt.Sprintf(":%v", p.cfg.HTTPServerPort)); !errors.Is(err, http.ErrServerClosed) { log.Crit("Failed to start http server", "error", err) @@ -421,6 +404,7 @@ func (p *Prover) Start() error { go p.heartbeatInterval(p.ctx) } + p.wg.Add(1) go p.eventLoop() return nil @@ -440,6 +424,8 @@ func (p *Prover) eventLoop() { default: } } + // Call reqProving() right away to catch up with the latest state. + reqProving() // If there is too many (TaikoData.Config.blockMaxProposals) pending blocks in TaikoL1 contract, there will be no new // BlockProposed temporarily, so except the BlockProposed subscription, we need another trigger to start @@ -447,8 +433,22 @@ func (p *Prover) eventLoop() { forceProvingTicker := time.NewTicker(15 * time.Second) defer forceProvingTicker.Stop() - // Call reqProving() right away to catch up with the latest state. - reqProving() + chBufferSize := p.protocolConfigs.BlockMaxProposals + blockProposedCh := make(chan *bindings.TaikoL1ClientBlockProposed, chBufferSize) + blockVerifiedCh := make(chan *bindings.TaikoL1ClientBlockVerified, chBufferSize) + transitionProvedCh := make(chan *bindings.TaikoL1ClientTransitionProved, chBufferSize) + transitionContestedCh := make(chan *bindings.TaikoL1ClientTransitionContested, chBufferSize) + // Subscriptions + blockProposedSub := rpc.SubscribeBlockProposed(p.rpc.TaikoL1, blockProposedCh) + blockVerifiedSub := rpc.SubscribeBlockVerified(p.rpc.TaikoL1, blockVerifiedCh) + transitionProvedSub := rpc.SubscribeTransitionProved(p.rpc.TaikoL1, transitionProvedCh) + transitionContestedSub := rpc.SubscribeTransitionContested(p.rpc.TaikoL1, transitionContestedCh) + defer func() { + blockProposedSub.Unsubscribe() + blockVerifiedSub.Unsubscribe() + transitionProvedSub.Unsubscribe() + transitionContestedSub.Unsubscribe() + }() for { select { @@ -460,15 +460,15 @@ func (p *Prover) eventLoop() { if err := p.proveOp(); err != nil { log.Error("Prove new blocks error", "error", err) } - case e := <-p.blockVerifiedCh: + case e := <-blockVerifiedCh: if err := p.onBlockVerified(p.ctx, e); err != nil { log.Error("Handle BlockVerified event error", "error", err) } - case e := <-p.transitionProvedCh: + case e := <-transitionProvedCh: if err := p.onTransitionProved(p.ctx, e); err != nil { log.Error("Handle TransitionProved event error", "error", err) } - case e := <-p.transitionContestedCh: + case e := <-transitionContestedCh: if err := p.onTransitionContested(p.ctx, e); err != nil { log.Error("Handle TransitionContested event error", "error", err) } @@ -476,7 +476,7 @@ func (p *Prover) eventLoop() { if err := p.onProvingWindowExpired(p.ctx, e); err != nil { log.Error("Handle provingWindow expired event error", "error", err) } - case <-p.blockProposedCh: + case <-blockProposedCh: reqProving() case <-forceProvingTicker.C: reqProving() @@ -486,8 +486,6 @@ func (p *Prover) eventLoop() { // Close closes the prover instance. func (p *Prover) Close(ctx context.Context) { - p.closeSubscription() - if p.guardianProverSender != nil { if err := p.guardianProverSender.Close(); err != nil { log.Error("failed to close database connection", "error", err) @@ -1073,22 +1071,6 @@ func (p *Prover) isBlockVerified(id *big.Int) (bool, error) { return id.Uint64() <= stateVars.B.LastVerifiedBlockId, nil } -// initSubscription initializes all subscriptions in current prover instance. -func (p *Prover) initSubscription() { - p.blockProposedSub = rpc.SubscribeBlockProposed(p.rpc.TaikoL1, p.blockProposedCh) - p.blockVerifiedSub = rpc.SubscribeBlockVerified(p.rpc.TaikoL1, p.blockVerifiedCh) - p.transitionProvedSub = rpc.SubscribeTransitionProved(p.rpc.TaikoL1, p.transitionProvedCh) - p.transitionContestedSub = rpc.SubscribeTransitionContested(p.rpc.TaikoL1, p.transitionContestedCh) -} - -// closeSubscription closes all subscriptions. -func (p *Prover) closeSubscription() { - p.blockVerifiedSub.Unsubscribe() - p.blockProposedSub.Unsubscribe() - p.transitionProvedSub.Unsubscribe() - p.transitionContestedSub.Unsubscribe() -} - // isValidProof checks if the given proof is a valid one, comparing to current L2 node canonical chain. func (p *Prover) isValidProof( ctx context.Context, diff --git a/prover/prover_test.go b/prover/prover_test.go index 64177edad..909744017 100644 --- a/prover/prover_test.go +++ b/prover/prover_test.go @@ -389,11 +389,6 @@ func (s *ProverTestSuite) TestProveOp() { s.Equal(header.ParentHash, common.BytesToHash(event.Tran.ParentHash[:])) } -func (s *ProverTestSuite) TestStartSubscription() { - s.NotPanics(s.p.initSubscription) - s.NotPanics(s.p.closeSubscription) -} - func (s *ProverTestSuite) TestSetApprovalAmount() { opts, err := bind.NewKeyedTransactorWithChainID(s.p.proverPrivateKey, s.p.rpc.L1ChainID) s.Nil(err)