Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

feat(prover): move sub event logic into event function. #513

Merged
merged 7 commits into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added internal/docker/nodes/.env
Empty file.
68 changes: 25 additions & 43 deletions prover/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/leveldb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"

Expand Down Expand Up @@ -70,17 +69,8 @@ type Prover struct {
proofSubmitters []proofSubmitter.Submitter
proofContester proofSubmitter.Contester

// Subscriptions
blockProposedCh chan *bindings.TaikoL1ClientBlockProposed
blockProposedSub event.Subscription
transitionProvedCh chan *bindings.TaikoL1ClientTransitionProved
transitionProvedSub event.Subscription
transitionContestedCh chan *bindings.TaikoL1ClientTransitionContested
transitionContestedSub event.Subscription
blockVerifiedCh chan *bindings.TaikoL1ClientBlockVerified
blockVerifiedSub event.Subscription
proofWindowExpiredCh chan *bindings.TaikoL1ClientBlockProposed
proveNotify chan struct{}
proofWindowExpiredCh chan *bindings.TaikoL1ClientBlockProposed
proveNotify chan struct{}

// Proof related
proofGenerationCh chan *proofProducer.ProofWithHeader
Expand Down Expand Up @@ -144,10 +134,6 @@ func InitFromConfig(ctx context.Context, p *Prover, cfg *Config) (err error) {
p.proverAddress = crypto.PubkeyToAddress(p.cfg.L1ProverPrivKey.PublicKey)

chBufferSize := p.protocolConfigs.BlockMaxProposals
p.blockProposedCh = make(chan *bindings.TaikoL1ClientBlockProposed, chBufferSize)
p.blockVerifiedCh = make(chan *bindings.TaikoL1ClientBlockVerified, chBufferSize)
p.transitionProvedCh = make(chan *bindings.TaikoL1ClientTransitionProved, chBufferSize)
p.transitionContestedCh = make(chan *bindings.TaikoL1ClientTransitionContested, chBufferSize)
p.proofGenerationCh = make(chan *proofProducer.ProofWithHeader, chBufferSize)
p.proofWindowExpiredCh = make(chan *bindings.TaikoL1ClientBlockProposed, chBufferSize)
p.proveNotify = make(chan struct{}, 1)
Expand Down Expand Up @@ -407,9 +393,6 @@ func (p *Prover) Start() error {
}
}

p.wg.Add(1)
p.initSubscription()

go func() {
if err := p.srv.Start(fmt.Sprintf(":%v", p.cfg.HTTPServerPort)); !errors.Is(err, http.ErrServerClosed) {
log.Crit("Failed to start http server", "error", err)
Expand All @@ -421,6 +404,7 @@ func (p *Prover) Start() error {
go p.heartbeatInterval(p.ctx)
}

p.wg.Add(1)
go p.eventLoop()

return nil
Expand All @@ -440,15 +424,31 @@ func (p *Prover) eventLoop() {
default:
}
}
// Call reqProving() right away to catch up with the latest state.
reqProving()

// If there is too many (TaikoData.Config.blockMaxProposals) pending blocks in TaikoL1 contract, there will be no new
// BlockProposed temporarily, so except the BlockProposed subscription, we need another trigger to start
// fetching the proposed blocks.
forceProvingTicker := time.NewTicker(15 * time.Second)
defer forceProvingTicker.Stop()

// Call reqProving() right away to catch up with the latest state.
reqProving()
chBufferSize := p.protocolConfigs.BlockMaxProposals
blockProposedCh := make(chan *bindings.TaikoL1ClientBlockProposed, chBufferSize)
blockVerifiedCh := make(chan *bindings.TaikoL1ClientBlockVerified, chBufferSize)
transitionProvedCh := make(chan *bindings.TaikoL1ClientTransitionProved, chBufferSize)
transitionContestedCh := make(chan *bindings.TaikoL1ClientTransitionContested, chBufferSize)
// Subscriptions
blockProposedSub := rpc.SubscribeBlockProposed(p.rpc.TaikoL1, blockProposedCh)
blockVerifiedSub := rpc.SubscribeBlockVerified(p.rpc.TaikoL1, blockVerifiedCh)
transitionProvedSub := rpc.SubscribeTransitionProved(p.rpc.TaikoL1, transitionProvedCh)
transitionContestedSub := rpc.SubscribeTransitionContested(p.rpc.TaikoL1, transitionContestedCh)
defer func() {
blockProposedSub.Unsubscribe()
blockVerifiedSub.Unsubscribe()
transitionProvedSub.Unsubscribe()
transitionContestedSub.Unsubscribe()
}()

for {
select {
Expand All @@ -460,23 +460,23 @@ func (p *Prover) eventLoop() {
if err := p.proveOp(); err != nil {
log.Error("Prove new blocks error", "error", err)
}
case e := <-p.blockVerifiedCh:
case e := <-blockVerifiedCh:
if err := p.onBlockVerified(p.ctx, e); err != nil {
log.Error("Handle BlockVerified event error", "error", err)
}
case e := <-p.transitionProvedCh:
case e := <-transitionProvedCh:
if err := p.onTransitionProved(p.ctx, e); err != nil {
log.Error("Handle TransitionProved event error", "error", err)
}
case e := <-p.transitionContestedCh:
case e := <-transitionContestedCh:
if err := p.onTransitionContested(p.ctx, e); err != nil {
log.Error("Handle TransitionContested event error", "error", err)
}
case e := <-p.proofWindowExpiredCh:
if err := p.onProvingWindowExpired(p.ctx, e); err != nil {
log.Error("Handle provingWindow expired event error", "error", err)
}
case <-p.blockProposedCh:
case <-blockProposedCh:
reqProving()
case <-forceProvingTicker.C:
reqProving()
Expand All @@ -486,8 +486,6 @@ func (p *Prover) eventLoop() {

// Close closes the prover instance.
func (p *Prover) Close(ctx context.Context) {
p.closeSubscription()

if p.guardianProverSender != nil {
if err := p.guardianProverSender.Close(); err != nil {
log.Error("failed to close database connection", "error", err)
Expand Down Expand Up @@ -1073,22 +1071,6 @@ func (p *Prover) isBlockVerified(id *big.Int) (bool, error) {
return id.Uint64() <= stateVars.B.LastVerifiedBlockId, nil
}

// initSubscription initializes all subscriptions in current prover instance.
func (p *Prover) initSubscription() {
p.blockProposedSub = rpc.SubscribeBlockProposed(p.rpc.TaikoL1, p.blockProposedCh)
p.blockVerifiedSub = rpc.SubscribeBlockVerified(p.rpc.TaikoL1, p.blockVerifiedCh)
p.transitionProvedSub = rpc.SubscribeTransitionProved(p.rpc.TaikoL1, p.transitionProvedCh)
p.transitionContestedSub = rpc.SubscribeTransitionContested(p.rpc.TaikoL1, p.transitionContestedCh)
}

// closeSubscription closes all subscriptions.
func (p *Prover) closeSubscription() {
p.blockVerifiedSub.Unsubscribe()
p.blockProposedSub.Unsubscribe()
p.transitionProvedSub.Unsubscribe()
p.transitionContestedSub.Unsubscribe()
}

// isValidProof checks if the given proof is a valid one, comparing to current L2 node canonical chain.
func (p *Prover) isValidProof(
ctx context.Context,
Expand Down
5 changes: 0 additions & 5 deletions prover/prover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,6 @@ func (s *ProverTestSuite) TestProveOp() {
s.Equal(header.ParentHash, common.BytesToHash(event.Tran.ParentHash[:]))
}

func (s *ProverTestSuite) TestStartSubscription() {
s.NotPanics(s.p.initSubscription)
s.NotPanics(s.p.closeSubscription)
}

func (s *ProverTestSuite) TestSetApprovalAmount() {
opts, err := bind.NewKeyedTransactorWithChainID(s.p.proverPrivateKey, s.p.rpc.L1ChainID)
s.Nil(err)
Expand Down