diff --git a/packages/relayer/cmd/flags/indexer.go b/packages/relayer/cmd/flags/indexer.go index 45eb322c047..f4eb72e590a 100644 --- a/packages/relayer/cmd/flags/indexer.go +++ b/packages/relayer/cmd/flags/indexer.go @@ -69,12 +69,23 @@ var ( Category: indexerCategory, EnvVars: []string{"SRC_TAIKO_ADDRESS"}, } - NumLatestBlocksToIgnoreWhenCrawling = &cli.Uint64Flag{ - Name: "numLatestBlocksToIgnoreWhenCrawling", - Usage: "Number of blocks to ignore when crawling chain, should be higher for L2-L1 indexing due to delay", - Value: 1000, + NumLatestBlocksEndWhenCrawling = &cli.Uint64Flag{ + Name: "numLatestBlocksEndWhenCrawling", + Usage: `Number of blocks to ignore from the end when crawling chain, + should be higher for L2-L1 indexing due to delay + `, + Value: 300, + Category: indexerCategory, + EnvVars: []string{"NUM_LATEST_BLOCKS_END_WHEN_CRAWLING"}, + } + NumLatestBlocksStartWhenCrawling = &cli.Uint64Flag{ + Name: "numLatestBlocksStartWhenCrawling", + Usage: `Number of latest blocks to index from the start when crawling chain. + The default value is to cover past 7 days. + `, + Value: 50400, Category: indexerCategory, - EnvVars: []string{"NUM_LATEST_BLOCKS_TO_IGNORE_WHEN_CRAWLING"}, + EnvVars: []string{"NUM_LATEST_BLOCKS_START_WHEN_CRAWLING"}, } EventName = &cli.StringFlag{ Name: "event", @@ -101,7 +112,8 @@ var IndexerFlags = MergeFlags(CommonFlags, QueueFlags, []cli.Flag{ SubscriptionBackoff, SyncMode, WatchMode, - NumLatestBlocksToIgnoreWhenCrawling, + NumLatestBlocksEndWhenCrawling, + NumLatestBlocksStartWhenCrawling, EventName, TargetBlockNumber, }) diff --git a/packages/relayer/indexer/config.go b/packages/relayer/indexer/config.go index 39172cea167..4f997ae1280 100644 --- a/packages/relayer/indexer/config.go +++ b/packages/relayer/indexer/config.go @@ -36,53 +36,55 @@ type Config struct { QueueHost string QueuePort uint64 // rpc configs - SrcRPCUrl string - DestRPCUrl string - ETHClientTimeout uint64 - BlockBatchSize uint64 - NumGoroutines uint64 - SubscriptionBackoff uint64 - SyncMode SyncMode - WatchMode WatchMode - NumLatestBlocksToIgnoreWhenCrawling uint64 - EventName string - TargetBlockNumber *uint64 - BackOffRetryInterval time.Duration - BackOffMaxRetries uint64 - OpenQueueFunc func() (queue.Queue, error) - OpenDBFunc func() (DB, error) + SrcRPCUrl string + DestRPCUrl string + ETHClientTimeout uint64 + BlockBatchSize uint64 + NumGoroutines uint64 + SubscriptionBackoff uint64 + SyncMode SyncMode + WatchMode WatchMode + NumLatestBlocksEndWhenCrawling uint64 + NumLatestBlocksStartWhenCrawling uint64 + EventName string + TargetBlockNumber *uint64 + BackOffRetryInterval time.Duration + BackOffMaxRetries uint64 + OpenQueueFunc func() (queue.Queue, error) + OpenDBFunc func() (DB, error) } // NewConfigFromCliContext creates a new config instance from command line flags. func NewConfigFromCliContext(c *cli.Context) (*Config, error) { return &Config{ - SrcBridgeAddress: common.HexToAddress(c.String(flags.SrcBridgeAddress.Name)), - SrcTaikoAddress: common.HexToAddress(c.String(flags.SrcTaikoAddress.Name)), - SrcSignalServiceAddress: common.HexToAddress(c.String(flags.SrcSignalServiceAddress.Name)), - DestBridgeAddress: common.HexToAddress(c.String(flags.DestBridgeAddress.Name)), - DatabaseUsername: c.String(flags.DatabaseUsername.Name), - DatabasePassword: c.String(flags.DatabasePassword.Name), - DatabaseName: c.String(flags.DatabaseName.Name), - DatabaseHost: c.String(flags.DatabaseHost.Name), - DatabaseMaxIdleConns: c.Uint64(flags.DatabaseMaxIdleConns.Name), - DatabaseMaxOpenConns: c.Uint64(flags.DatabaseMaxOpenConns.Name), - DatabaseMaxConnLifetime: c.Uint64(flags.DatabaseConnMaxLifetime.Name), - QueueUsername: c.String(flags.QueueUsername.Name), - QueuePassword: c.String(flags.QueuePassword.Name), - QueuePort: c.Uint64(flags.QueuePort.Name), - QueueHost: c.String(flags.QueueHost.Name), - SrcRPCUrl: c.String(flags.SrcRPCUrl.Name), - DestRPCUrl: c.String(flags.DestRPCUrl.Name), - BlockBatchSize: c.Uint64(flags.BlockBatchSize.Name), - NumGoroutines: c.Uint64(flags.MaxNumGoroutines.Name), - SubscriptionBackoff: c.Uint64(flags.SubscriptionBackoff.Name), - WatchMode: WatchMode(c.String(flags.WatchMode.Name)), - SyncMode: SyncMode(c.String(flags.SyncMode.Name)), - ETHClientTimeout: c.Uint64(flags.ETHClientTimeout.Name), - NumLatestBlocksToIgnoreWhenCrawling: c.Uint64(flags.NumLatestBlocksToIgnoreWhenCrawling.Name), - EventName: c.String(flags.EventName.Name), - BackOffMaxRetries: c.Uint64(flags.BackOffMaxRetrys.Name), - BackOffRetryInterval: c.Duration(flags.BackOffRetryInterval.Name), + SrcBridgeAddress: common.HexToAddress(c.String(flags.SrcBridgeAddress.Name)), + SrcTaikoAddress: common.HexToAddress(c.String(flags.SrcTaikoAddress.Name)), + SrcSignalServiceAddress: common.HexToAddress(c.String(flags.SrcSignalServiceAddress.Name)), + DestBridgeAddress: common.HexToAddress(c.String(flags.DestBridgeAddress.Name)), + DatabaseUsername: c.String(flags.DatabaseUsername.Name), + DatabasePassword: c.String(flags.DatabasePassword.Name), + DatabaseName: c.String(flags.DatabaseName.Name), + DatabaseHost: c.String(flags.DatabaseHost.Name), + DatabaseMaxIdleConns: c.Uint64(flags.DatabaseMaxIdleConns.Name), + DatabaseMaxOpenConns: c.Uint64(flags.DatabaseMaxOpenConns.Name), + DatabaseMaxConnLifetime: c.Uint64(flags.DatabaseConnMaxLifetime.Name), + QueueUsername: c.String(flags.QueueUsername.Name), + QueuePassword: c.String(flags.QueuePassword.Name), + QueuePort: c.Uint64(flags.QueuePort.Name), + QueueHost: c.String(flags.QueueHost.Name), + SrcRPCUrl: c.String(flags.SrcRPCUrl.Name), + DestRPCUrl: c.String(flags.DestRPCUrl.Name), + BlockBatchSize: c.Uint64(flags.BlockBatchSize.Name), + NumGoroutines: c.Uint64(flags.MaxNumGoroutines.Name), + SubscriptionBackoff: c.Uint64(flags.SubscriptionBackoff.Name), + WatchMode: WatchMode(c.String(flags.WatchMode.Name)), + SyncMode: SyncMode(c.String(flags.SyncMode.Name)), + ETHClientTimeout: c.Uint64(flags.ETHClientTimeout.Name), + NumLatestBlocksEndWhenCrawling: c.Uint64(flags.NumLatestBlocksEndWhenCrawling.Name), + NumLatestBlocksStartWhenCrawling: c.Uint64(flags.NumLatestBlocksStartWhenCrawling.Name), + EventName: c.String(flags.EventName.Name), + BackOffMaxRetries: c.Uint64(flags.BackOffMaxRetrys.Name), + BackOffRetryInterval: c.Duration(flags.BackOffRetryInterval.Name), TargetBlockNumber: func() *uint64 { if c.IsSet(flags.TargetBlockNumber.Name) { value := c.Uint64(flags.TargetBlockNumber.Name) diff --git a/packages/relayer/indexer/handle_chain_data_synced_event.go b/packages/relayer/indexer/handle_chain_data_synced_event.go index ecd307cff0c..86901e92f4f 100644 --- a/packages/relayer/indexer/handle_chain_data_synced_event.go +++ b/packages/relayer/indexer/handle_chain_data_synced_event.go @@ -3,7 +3,6 @@ package indexer import ( "context" "encoding/json" - "math/big" "log/slog" @@ -16,7 +15,6 @@ import ( // handleChainDataSyncedEvent handles an individual ChainDataSynced event func (i *Indexer) handleChainDataSyncedEvent( ctx context.Context, - chainID *big.Int, event *signalservice.SignalServiceChainDataSynced, waitForConfirmations bool, ) error { @@ -74,7 +72,11 @@ func (i *Indexer) handleChainDataSyncedEvent( return errors.Wrap(err, "i.eventRepo.Save") } - slog.Info("chainDataSynced event saved") + slog.Info("chainDataSynced event saved", + "srcChainId", i.srcChainId, + "destChainId", i.destChainId, + "SyncedChainID", event.ChainId, + ) relayer.ChainDataSyncedEventsIndexed.Inc() diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index dc88e75c7b5..dc6683351a2 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -116,7 +116,8 @@ type Indexer struct { wg *sync.WaitGroup - numLatestBlocksToIgnoreWhenCrawling uint64 + numLatestBlocksEndWhenCrawling uint64 + numLatestBlocksStartWhenCrawling uint64 targetBlockNumber *uint64 @@ -233,7 +234,8 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) { i.ethClientTimeout = time.Duration(cfg.ETHClientTimeout) * time.Second - i.numLatestBlocksToIgnoreWhenCrawling = cfg.NumLatestBlocksToIgnoreWhenCrawling + i.numLatestBlocksEndWhenCrawling = cfg.NumLatestBlocksEndWhenCrawling + i.numLatestBlocksStartWhenCrawling = cfg.NumLatestBlocksStartWhenCrawling i.targetBlockNumber = cfg.TargetBlockNumber @@ -274,7 +276,7 @@ func (i *Indexer) Start() error { } // set the initial processing block, which will vary by sync mode. - if err := i.setInitialIndexingBlockByMode(i.ctx, i.syncMode, i.srcChainId); err != nil { + if err := i.setInitialIndexingBlockByMode(i.syncMode, i.srcChainId); err != nil { return errors.Wrap(err, "i.setInitialIndexingBlockByMode") } @@ -298,10 +300,6 @@ func (i *Indexer) eventLoop(ctx context.Context, startBlockID uint64) { var d time.Duration = 10 * time.Second - if i.watchMode == CrawlPastBlocks { - d = 10 * time.Minute - } - t := time.NewTicker(d) defer t.Stop() @@ -333,6 +331,15 @@ func (i *Indexer) filter(ctx context.Context) error { // ignore latest N blocks if we are crawling past blocks, they are probably in queue already // and are not "missed", have just not been processed. if i.watchMode == CrawlPastBlocks { + if i.numLatestBlocksEndWhenCrawling > i.numLatestBlocksStartWhenCrawling { + slog.Error("Invalid configuration", + "numLatestBlocksEndWhenCrawling", i.numLatestBlocksEndWhenCrawling, + "numLatestBlocksStartWhenCrawling", i.numLatestBlocksStartWhenCrawling, + ) + + return errors.New("numLatestBlocksStartWhenCrawling must be greater than numLatestBlocksEndWhenCrawling") + } + // if targetBlockNumber is not nil, we are just going to process a singular block. if i.targetBlockNumber != nil { slog.Info("targetBlockNumber is set", "targetBlockNumber", *i.targetBlockNumber) @@ -342,15 +349,19 @@ func (i *Indexer) filter(ctx context.Context) error { endBlockID = i.latestIndexedBlockNumber + 1 } else { // set the initial processing block back to either 0 or the genesis block again. - if err := i.setInitialIndexingBlockByMode(i.ctx, i.syncMode, i.srcChainId); err != nil { + if err := i.setInitialIndexingBlockByMode(i.syncMode, i.srcChainId); err != nil { return errors.Wrap(err, "i.setInitialIndexingBlockByMode") } - if endBlockID > i.numLatestBlocksToIgnoreWhenCrawling { + if i.latestIndexedBlockNumber < endBlockID-i.numLatestBlocksStartWhenCrawling { + i.latestIndexedBlockNumber = endBlockID - i.numLatestBlocksStartWhenCrawling + } + + if endBlockID > i.numLatestBlocksEndWhenCrawling { // otherwise, we need to set the endBlockID as the greater of the two: // either the endBlockID minus the number of latest blocks to ignore, // or endBlockID. - endBlockID -= i.numLatestBlocksToIgnoreWhenCrawling + endBlockID -= i.numLatestBlocksEndWhenCrawling } } } @@ -594,7 +605,7 @@ func (i *Indexer) indexChainDataSyncedEvents(ctx context.Context, event := chainDataSyncedEvents.Event group.Go(func() error { - err := i.handleChainDataSyncedEvent(ctx, i.srcChainId, event, true) + err := i.handleChainDataSyncedEvent(ctx, event, true) if err != nil { relayer.MessageStatusChangedEventsIndexingErrors.Inc() diff --git a/packages/relayer/indexer/set_initial_Indexing_block_by_mode.go b/packages/relayer/indexer/set_initial_Indexing_block_by_mode.go index 49b295deb4a..f03cdb0ea39 100644 --- a/packages/relayer/indexer/set_initial_Indexing_block_by_mode.go +++ b/packages/relayer/indexer/set_initial_Indexing_block_by_mode.go @@ -1,7 +1,6 @@ package indexer import ( - "context" "math/big" "github.com/pkg/errors" @@ -11,7 +10,6 @@ import ( // setInitialIndexingBlockByMode takes in a SyncMode and determines how we should // start our indexing func (i *Indexer) setInitialIndexingBlockByMode( - ctx context.Context, mode SyncMode, chainID *big.Int, ) error { diff --git a/packages/relayer/indexer/set_initial_indexing_block_by_mode_test.go b/packages/relayer/indexer/set_initial_indexing_block_by_mode_test.go index 9806c5a7457..50fc51834f3 100644 --- a/packages/relayer/indexer/set_initial_indexing_block_by_mode_test.go +++ b/packages/relayer/indexer/set_initial_indexing_block_by_mode_test.go @@ -1,7 +1,6 @@ package indexer import ( - "context" "math/big" "testing" @@ -51,7 +50,6 @@ func Test_setInitialIndexingBlockByMode(t *testing.T) { t.Run(tt.name, func(t *testing.T) { svc, _ := newTestService(tt.mode, FilterAndSubscribe) err := svc.setInitialIndexingBlockByMode( - context.Background(), tt.mode, tt.chainID, )