diff --git a/packages/eventindexer/cmd/flags/indexer.go b/packages/eventindexer/cmd/flags/indexer.go index 88824cdbe36..b41d929dd0a 100644 --- a/packages/eventindexer/cmd/flags/indexer.go +++ b/packages/eventindexer/cmd/flags/indexer.go @@ -74,6 +74,13 @@ var ( Category: indexerCategory, EnvVars: []string{"INDEX_ERC20S"}, } + OntakeForkHeight = &cli.Uint64Flag{ + Name: "ontakeForkHeight", + Usage: "Block number ontake fork height happened", + Value: 21134698, + Category: indexerCategory, + EnvVars: []string{"ONTAKE_FORK_HEIGHT"}, + } ) var IndexerFlags = MergeFlags(CommonFlags, []cli.Flag{ @@ -87,4 +94,5 @@ var IndexerFlags = MergeFlags(CommonFlags, []cli.Flag{ SyncMode, IndexNFTs, IndexERC20s, + OntakeForkHeight, }) diff --git a/packages/eventindexer/indexer/config.go b/packages/eventindexer/indexer/config.go index f0ac08b1983..122c45c59de 100644 --- a/packages/eventindexer/indexer/config.go +++ b/packages/eventindexer/indexer/config.go @@ -31,6 +31,7 @@ type Config struct { IndexNFTs bool IndexERC20s bool Layer string + OntakeForkHeight uint64 OpenDBFunc func() (db.DB, error) } @@ -55,6 +56,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) { IndexNFTs: c.Bool(flags.IndexNFTs.Name), IndexERC20s: c.Bool(flags.IndexERC20s.Name), Layer: c.String(flags.Layer.Name), + OntakeForkHeight: c.Uint64(flags.OntakeForkHeight.Name), OpenDBFunc: func() (db.DB, error) { return db.OpenDBConnection(db.DBConnectionOpts{ Name: c.String(flags.DatabaseUsername.Name), diff --git a/packages/eventindexer/indexer/filter.go b/packages/eventindexer/indexer/filter.go index 9d6a2b67b98..6d457aed17a 100644 --- a/packages/eventindexer/indexer/filter.go +++ b/packages/eventindexer/indexer/filter.go @@ -123,9 +123,112 @@ func filterFunc( return nil } +func filterFuncOntake( + ctx context.Context, + chainID *big.Int, + i *Indexer, + filterOpts *bind.FilterOpts, +) error { + wg, ctx := errgroup.WithContext(ctx) + + if i.taikol1 != nil { + wg.Go(func() error { + transitionProvedEvents, err := i.taikol1.FilterTransitionProvedV2(filterOpts, nil) + if err != nil { + return errors.Wrap(err, "i.taikol1.FilterTransitionProved") + } + + err = i.saveTransitionProvedEventsV2(ctx, chainID, transitionProvedEvents) + if err != nil { + return errors.Wrap(err, "i.saveTransitionProvedEvents") + } + + return nil + }) + + wg.Go(func() error { + transitionContestedEvents, err := i.taikol1.FilterTransitionContestedV2(filterOpts, nil) + if err != nil { + return errors.Wrap(err, "i.taikol1.FilterTransitionContested") + } + + err = i.saveTransitionContestedEventsV2(ctx, chainID, transitionContestedEvents) + if err != nil { + return errors.Wrap(err, "i.saveTransitionContestedEvents") + } + + return nil + }) + + wg.Go(func() error { + blockProposedEvents, err := i.taikol1.FilterBlockProposedV2(filterOpts, nil) + if err != nil { + return errors.Wrap(err, "i.taikol1.FilterBlockProposed") + } + + err = i.saveBlockProposedEventsV2(ctx, chainID, blockProposedEvents) + if err != nil { + return errors.Wrap(err, "i.saveBlockProposedEvents") + } + + return nil + }) + + wg.Go(func() error { + blockVerifiedEvents, err := i.taikol1.FilterBlockVerifiedV2(filterOpts, nil, nil) + if err != nil { + return errors.Wrap(err, "i.taikol1.FilterBlockVerified") + } + + err = i.saveBlockVerifiedEventsV2(ctx, chainID, blockVerifiedEvents) + if err != nil { + return errors.Wrap(err, "i.saveBlockVerifiedEvents") + } + + return nil + }) + } + + if i.bridge != nil { + wg.Go(func() error { + messagesSent, err := i.bridge.FilterMessageSent(filterOpts, nil) + if err != nil { + return errors.Wrap(err, "i.bridge.FilterMessageSent") + } + + err = i.saveMessageSentEvents(ctx, chainID, messagesSent) + if err != nil { + return errors.Wrap(err, "i.saveMessageSentEvents") + } + + return nil + }) + } + + wg.Go(func() error { + if err := i.indexRawBlockData(ctx, chainID, filterOpts.Start, *filterOpts.End); err != nil { + return errors.Wrap(err, "i.indexRawBlockData") + } + + return nil + }) + + err := wg.Wait() + + if err != nil { + if errors.Is(err, context.Canceled) { + slog.Error("filter context cancelled") + return err + } + + return err + } + + return nil +} + func (i *Indexer) filter( ctx context.Context, - filter FilterFunc, ) error { endBlockID, err := i.ethClient.BlockNumber(ctx) if err != nil { @@ -138,14 +241,35 @@ func (i *Indexer) filter( "batchSize", i.blockBatchSize, ) + if i.latestIndexedBlockNumber >= i.ontakeForkHeight { + i.isPostOntakeForkHeightReached = true + } + for j := i.latestIndexedBlockNumber + 1; j <= endBlockID; j += i.blockBatchSize { - end := i.latestIndexedBlockNumber + i.blockBatchSize + end := j + i.blockBatchSize - 1 + // if the end of the batch is greater than the latest block number, set end // to the latest block number if end > endBlockID { end = endBlockID } + if !i.isPostOntakeForkHeightReached && i.taikol1 != nil && i.ontakeForkHeight > i.latestIndexedBlockNumber && i.ontakeForkHeight < end { + slog.Info("ontake fork height reached", "height", i.ontakeForkHeight) + + i.isPostOntakeForkHeightReached = true + + end = i.ontakeForkHeight - 1 + + slog.Info("setting end block ID to ontakeForkHeight - 1", + "latestIndexedBlockNumber", + i.latestIndexedBlockNumber, + "ontakeForkHeight", i.ontakeForkHeight, + "endBlockID", end, + "isPostOntakeForkHeightReached", i.isPostOntakeForkHeightReached, + ) + } + slog.Info("block batch", "start", j, "end", end) filterOpts := &bind.FilterOpts{ @@ -154,6 +278,14 @@ func (i *Indexer) filter( Context: ctx, } + var filter FilterFunc + + if i.isPostOntakeForkHeightReached { + filter = filterFuncOntake + } else { + filter = filterFunc + } + if err := filter(ctx, new(big.Int).SetUint64(i.srcChainID), i, filterOpts); err != nil { return errors.Wrap(err, "filter") } diff --git a/packages/eventindexer/indexer/indexer.go b/packages/eventindexer/indexer/indexer.go index 6093865f3bf..fe4a9562e26 100644 --- a/packages/eventindexer/indexer/indexer.go +++ b/packages/eventindexer/indexer/indexer.go @@ -68,6 +68,9 @@ type Indexer struct { contractToMetadata map[common.Address]*eventindexer.ERC20Metadata contractToMetadataMutex *sync.Mutex + + ontakeForkHeight uint64 + isPostOntakeForkHeightReached bool } func (i *Indexer) Start() error { @@ -97,7 +100,7 @@ func (i *Indexer) eventLoop(ctx context.Context) { slog.Info("event loop context done") return case <-t.C: - if err := i.filter(ctx, filterFunc); err != nil { + if err := i.filter(ctx); err != nil { slog.Error("error filtering", "error", err) } } @@ -204,6 +207,7 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) error { i.layer = cfg.Layer i.contractToMetadata = make(map[common.Address]*eventindexer.ERC20Metadata, 0) i.contractToMetadataMutex = &sync.Mutex{} + i.ontakeForkHeight = cfg.OntakeForkHeight return nil } diff --git a/packages/eventindexer/indexer/save_block_proposed_event.go b/packages/eventindexer/indexer/save_block_proposed_event.go index e8dc2669847..86585a0c963 100644 --- a/packages/eventindexer/indexer/save_block_proposed_event.go +++ b/packages/eventindexer/indexer/save_block_proposed_event.go @@ -103,3 +103,89 @@ func (i *Indexer) saveBlockProposedEvent( return nil } + +func (i *Indexer) saveBlockProposedEventsV2( + ctx context.Context, + chainID *big.Int, + events *taikol1.TaikoL1BlockProposedV2Iterator, +) error { + if !events.Next() || events.Event == nil { + slog.Info("no blockProposedV2 events") + return nil + } + + wg, ctx := errgroup.WithContext(ctx) + + for { + event := events.Event + + wg.Go(func() error { + tx, _, err := i.ethClient.TransactionByHash(ctx, event.Raw.TxHash) + if err != nil { + return errors.Wrap(err, "i.ethClient.TransactionByHash") + } + + sender, err := i.ethClient.TransactionSender(ctx, tx, event.Raw.BlockHash, event.Raw.TxIndex) + if err != nil { + return errors.Wrap(err, "i.ethClient.TransactionSender") + } + + if err := i.saveBlockProposedEventV2(ctx, chainID, event, sender); err != nil { + eventindexer.BlockProposedEventsProcessedError.Inc() + + return errors.Wrap(err, "i.saveBlockProposedEvent") + } + + return nil + }) + + if !events.Next() { + break + } + } + + if err := wg.Wait(); err != nil { + return err + } + + return nil +} + +func (i *Indexer) saveBlockProposedEventV2( + ctx context.Context, + chainID *big.Int, + event *taikol1.TaikoL1BlockProposedV2, + sender common.Address, +) error { + slog.Info("blockProposed", "proposer", sender.Hex()) + + marshaled, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "json.Marshal(event)") + } + + blockID := event.BlockId.Int64() + + block, err := i.ethClient.BlockByNumber(ctx, new(big.Int).SetUint64(event.Raw.BlockNumber)) + if err != nil { + return errors.Wrap(err, "i.ethClient.BlockByNumber") + } + + _, err = i.eventRepo.Save(ctx, eventindexer.SaveEventOpts{ + Name: eventindexer.EventNameBlockProposed, + Data: string(marshaled), + ChainID: chainID, + Event: eventindexer.EventNameBlockProposed, + Address: sender.Hex(), + BlockID: &blockID, + TransactedAt: time.Unix(int64(block.Time()), 0).UTC(), + EmittedBlockID: event.Raw.BlockNumber, + }) + if err != nil { + return errors.Wrap(err, "i.eventRepo.Save") + } + + eventindexer.BlockProposedEventsProcessed.Inc() + + return nil +} diff --git a/packages/eventindexer/indexer/save_block_verified_event.go b/packages/eventindexer/indexer/save_block_verified_event.go index 3f9386cf369..1edcb560c9c 100644 --- a/packages/eventindexer/indexer/save_block_verified_event.go +++ b/packages/eventindexer/indexer/save_block_verified_event.go @@ -88,3 +88,78 @@ func (i *Indexer) saveBlockVerifiedEvent( return nil } + +func (i *Indexer) saveBlockVerifiedEventsV2( + ctx context.Context, + chainID *big.Int, + events *taikol1.TaikoL1BlockVerifiedV2Iterator, +) error { + if !events.Next() || events.Event == nil { + slog.Info("no BlockVerified events") + return nil + } + + wg, ctx := errgroup.WithContext(ctx) + + for { + event := events.Event + + wg.Go(func() error { + if err := i.saveBlockVerifiedEventV2(ctx, chainID, event); err != nil { + eventindexer.BlockVerifiedEventsProcessedError.Inc() + + return errors.Wrap(err, "i.saveBlockVerifiedEvent") + } + + return nil + }) + + if !events.Next() { + break + } + } + + if err := wg.Wait(); err != nil { + return err + } + + return nil +} + +func (i *Indexer) saveBlockVerifiedEventV2( + ctx context.Context, + chainID *big.Int, + event *taikol1.TaikoL1BlockVerifiedV2, +) error { + slog.Info("new blockVerified event", "blockID", event.BlockId.Int64()) + + marshaled, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "json.Marshal(event)") + } + + blockID := event.BlockId.Int64() + + block, err := i.ethClient.BlockByNumber(ctx, new(big.Int).SetUint64(event.Raw.BlockNumber)) + if err != nil { + return errors.Wrap(err, "i.ethClient.BlockByNumber") + } + + _, err = i.eventRepo.Save(ctx, eventindexer.SaveEventOpts{ + Name: eventindexer.EventNameBlockVerified, + Data: string(marshaled), + ChainID: chainID, + Event: eventindexer.EventNameBlockVerified, + Address: "", + BlockID: &blockID, + TransactedAt: time.Unix(int64(block.Time()), 0), + EmittedBlockID: event.Raw.BlockNumber, + }) + if err != nil { + return errors.Wrap(err, "i.eventRepo.Save") + } + + eventindexer.BlockVerifiedEventsProcessed.Inc() + + return nil +} diff --git a/packages/eventindexer/indexer/save_transition_contested_event.go b/packages/eventindexer/indexer/save_transition_contested_event.go index fe2499e564e..a36c72c8ffc 100644 --- a/packages/eventindexer/indexer/save_transition_contested_event.go +++ b/packages/eventindexer/indexer/save_transition_contested_event.go @@ -81,3 +81,72 @@ func (i *Indexer) saveTransitionContestedEvent( return nil } + +func (i *Indexer) saveTransitionContestedEventsV2( + ctx context.Context, + chainID *big.Int, + events *taikol1.TaikoL1TransitionContestedV2Iterator, +) error { + if !events.Next() || events.Event == nil { + slog.Info("no transitionContested events") + return nil + } + + for { + event := events.Event + + if err := i.saveTransitionContestedEventV2(ctx, chainID, event); err != nil { + eventindexer.TransitionContestedEventsProcessedError.Inc() + + return errors.Wrap(err, "i.saveBlockProvenEvent") + } + + if !events.Next() { + return nil + } + } +} + +func (i *Indexer) saveTransitionContestedEventV2( + ctx context.Context, + chainID *big.Int, + event *taikol1.TaikoL1TransitionContestedV2, +) error { + slog.Info("transitionContested event found", + "blockID", event.BlockId.Int64(), + "contestBond", event.ContestBond.String(), + "contester", event.Contester.Hex(), + "tier", event.Tier, + ) + + marshaled, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "json.Marshal(event)") + } + + blockID := event.BlockId.Int64() + + block, err := i.ethClient.BlockByNumber(ctx, new(big.Int).SetUint64(event.Raw.BlockNumber)) + if err != nil { + return errors.Wrap(err, "i.ethClient.BlockByNumber") + } + + _, err = i.eventRepo.Save(ctx, eventindexer.SaveEventOpts{ + Name: eventindexer.EventNameTransitionContested, + Data: string(marshaled), + ChainID: chainID, + Event: eventindexer.EventNameTransitionContested, + Address: event.Contester.Hex(), + BlockID: &blockID, + TransactedAt: time.Unix(int64(block.Time()), 0), + Tier: &event.Tier, + EmittedBlockID: event.Raw.BlockNumber, + }) + if err != nil { + return errors.Wrap(err, "i.eventRepo.Save") + } + + eventindexer.TransitionContestedEventsProcessed.Inc() + + return nil +} diff --git a/packages/eventindexer/indexer/save_transition_proved_event.go b/packages/eventindexer/indexer/save_transition_proved_event.go index 00efd2559d7..54f26caaa35 100644 --- a/packages/eventindexer/indexer/save_transition_proved_event.go +++ b/packages/eventindexer/indexer/save_transition_proved_event.go @@ -91,3 +91,81 @@ func (i *Indexer) saveTransitionProvedEvent( return nil } + +func (i *Indexer) saveTransitionProvedEventsV2( + ctx context.Context, + chainID *big.Int, + events *taikol1.TaikoL1TransitionProvedV2Iterator, +) error { + if !events.Next() || events.Event == nil { + slog.Info("no transitionProved events") + return nil + } + + wg, ctx := errgroup.WithContext(ctx) + + for { + event := events.Event + + wg.Go(func() error { + if err := i.saveTransitionProvedEventV2(ctx, chainID, event); err != nil { + eventindexer.TransitionProvedEventsProcessedError.Inc() + + return errors.Wrap(err, "i.saveBlockProvenEvent") + } + + return nil + }) + + if !events.Next() { + break + } + } + + if err := wg.Wait(); err != nil { + return err + } + + return nil +} + +func (i *Indexer) saveTransitionProvedEventV2( + ctx context.Context, + chainID *big.Int, + event *taikol1.TaikoL1TransitionProvedV2, +) error { + slog.Info("transitionProved event found", + "blockID", event.BlockId.Int64(), + "prover", event.Prover.Hex()) + + marshaled, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "json.Marshal(event)") + } + + blockID := event.BlockId.Int64() + + block, err := i.ethClient.BlockByNumber(ctx, new(big.Int).SetUint64(event.Raw.BlockNumber)) + if err != nil { + return errors.Wrap(err, "i.ethClient.BlockByNumber") + } + + _, err = i.eventRepo.Save(ctx, eventindexer.SaveEventOpts{ + Name: eventindexer.EventNameTransitionProved, + Data: string(marshaled), + ChainID: chainID, + Event: eventindexer.EventNameTransitionProved, + Address: event.Prover.Hex(), + BlockID: &blockID, + TransactedAt: time.Unix(int64(block.Time()), 0), + Tier: &event.Tier, + EmittedBlockID: event.Raw.BlockNumber, + }) + if err != nil { + return errors.Wrap(err, "i.eventRepo.Save") + } + + eventindexer.TransitionProvedEventsProcessed.Inc() + + return nil +}