From 656cbe34bf554943696c490ff29fb4ff5c427606 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 21 Apr 2021 15:55:35 -0700 Subject: [PATCH 01/29] l2geth: add Backend enums and config parsing --- l2geth/cmd/geth/main.go | 2 +- l2geth/cmd/geth/usage.go | 1 + l2geth/cmd/utils/flags.go | 15 ++++++++++++++ l2geth/rollup/config.go | 2 ++ l2geth/rollup/types.go | 42 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 61 insertions(+), 1 deletion(-) diff --git a/l2geth/cmd/geth/main.go b/l2geth/cmd/geth/main.go index 54facda951dd..e4fcfef7a917 100644 --- a/l2geth/cmd/geth/main.go +++ b/l2geth/cmd/geth/main.go @@ -156,7 +156,6 @@ var ( utils.Eth1ETHGatewayAddressFlag, utils.Eth1ChainIdFlag, utils.RollupClientHttpFlag, - // Enable verifier mode utils.RollupEnableVerifierFlag, utils.RollupAddressManagerOwnerAddressFlag, utils.RollupTimstampRefreshFlag, @@ -166,6 +165,7 @@ var ( utils.RollupMaxCalldataSizeFlag, utils.RollupDataPriceFlag, utils.RollupExecutionPriceFlag, + utils.RollupBackendFlag, } rpcFlags = []cli.Flag{ diff --git a/l2geth/cmd/geth/usage.go b/l2geth/cmd/geth/usage.go index abfb30f346bc..158c1ba67d8c 100644 --- a/l2geth/cmd/geth/usage.go +++ b/l2geth/cmd/geth/usage.go @@ -80,6 +80,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.RollupMaxCalldataSizeFlag, utils.RollupDataPriceFlag, utils.RollupExecutionPriceFlag, + utils.RollupBackendFlag, }, }, { diff --git a/l2geth/cmd/utils/flags.go b/l2geth/cmd/utils/flags.go index c9364e6b9de1..de88994d2a99 100644 --- a/l2geth/cmd/utils/flags.go +++ b/l2geth/cmd/utils/flags.go @@ -849,6 +849,12 @@ var ( Value: time.Minute * 3, EnvVar: "ROLLUP_TIMESTAMP_REFRESH", } + RollupBackendFlag = cli.StringFlag{ + Name: "rollup.backend", + Usage: "Sync backend for verifiers (\"l1\" or \"l2\")", + Value: "l1", + EnvVar: "ROLLUP_BACKEND", + } // Flag to enable verifier mode RollupEnableVerifierFlag = cli.BoolFlag{ Name: "rollup.verifier", @@ -1170,6 +1176,15 @@ func setRollup(ctx *cli.Context, cfg *rollup.Config) { if ctx.GlobalIsSet(RollupExecutionPriceFlag.Name) { cfg.ExecutionPrice = GlobalBig(ctx, RollupExecutionPriceFlag.Name) } + if ctx.GlobalIsSet(RollupBackendFlag.Name) { + val := ctx.GlobalString(RollupBackendFlag.Name) + backend, err := rollup.NewBackend(val) + if err != nil { + log.Error("Configured with unknown sync backend, defaulting to l1", "backend", val) + backend, _ = rollup.NewBackend("l1") + } + cfg.Backend = backend + } } // setLes configures the les server and ultra light client settings from the command line flags. diff --git a/l2geth/rollup/config.go b/l2geth/rollup/config.go index 572626a804d8..08e29db5f616 100644 --- a/l2geth/rollup/config.go +++ b/l2geth/rollup/config.go @@ -37,4 +37,6 @@ type Config struct { DataPrice *big.Int // The gas price to use for L2 congestion costs ExecutionPrice *big.Int + // Represents the source of the transactions that is being synced + Backend Backend } diff --git a/l2geth/rollup/types.go b/l2geth/rollup/types.go index b3dbdbbce99f..6ce283eb973b 100644 --- a/l2geth/rollup/types.go +++ b/l2geth/rollup/types.go @@ -2,11 +2,53 @@ package rollup import ( "bytes" + "fmt" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ) +// Backend represents the type of transactions that are being synced. +// The different types have different security models. +type Backend uint + +// String implements the Stringer interface +func (s Backend) String() string { + switch s { + case BackendL1: + return "l1" + case BackendL2: + return "l2" + default: + return "" + } +} + +// NewBackend creates a Backend from a human readable string +func NewBackend(typ string) (Backend, error) { + switch typ { + case "l1": + return BackendL1, nil + case "l2": + return BackendL2, nil + default: + return 0, fmt.Errorf("Unknown Backend: %s", typ) + } +} + +const ( + // BackendL1 Backend involves syncing transactions that have been batched to + // Layer One. Once the transactions have been batched to L1, they cannot be + // removed assuming that they are not reorganized out of the chain. + BackendL1 Backend = iota + // BackendL2 Backend involves syncing transactions from the sequencer, + // meaning that the transactions may have not been batched to Layer One yet. + // This gives higher latency access to the sequencer data but no guarantees + // around the transactions as they have not been submitted via a batch to + // L1. + BackendL2 +) + func isCtcTxEqual(a, b *types.Transaction) bool { if a.To() == nil && b.To() != nil { if !bytes.Equal(b.To().Bytes(), common.Address{}.Bytes()) { From 97fd394d83309b17e98f2c3440add0c50dee15c1 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 21 Apr 2021 15:55:52 -0700 Subject: [PATCH 02/29] l2geth: move OVMContext to types file --- l2geth/rollup/sync_service.go | 7 ------- l2geth/rollup/types.go | 7 +++++++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index d9b31237bb12..47d7f6a68a66 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -23,13 +23,6 @@ import ( "github.com/ethereum/go-ethereum/eth/gasprice" ) -// OVMContext represents the blocknumber and timestamp -// that exist during L2 execution -type OVMContext struct { - blockNumber uint64 - timestamp uint64 -} - // SyncService implements the verifier functionality as well as the reorg // protection for the sequencer. type SyncService struct { diff --git a/l2geth/rollup/types.go b/l2geth/rollup/types.go index 6ce283eb973b..571a3d902ece 100644 --- a/l2geth/rollup/types.go +++ b/l2geth/rollup/types.go @@ -8,6 +8,13 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) +// OVMContext represents the blocknumber and timestamp +// that exist during L2 execution +type OVMContext struct { + blockNumber uint64 + timestamp uint64 +} + // Backend represents the type of transactions that are being synced. // The different types have different security models. type Backend uint From 60f3a2edf14ab517ededd7e121d5701d8f1a6d89 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 21 Apr 2021 17:05:48 -0700 Subject: [PATCH 03/29] l2geth: implement syncservice spec --- l2geth/eth/api_backend.go | 2 +- l2geth/rollup/client.go | 21 +- l2geth/rollup/sync_service.go | 671 +++++++++++++++++++--------------- 3 files changed, 391 insertions(+), 303 deletions(-) diff --git a/l2geth/eth/api_backend.go b/l2geth/eth/api_backend.go index 1f1b0ae6ac84..e97923ed89e7 100644 --- a/l2geth/eth/api_backend.go +++ b/l2geth/eth/api_backend.go @@ -321,7 +321,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) return fmt.Errorf("Calldata cannot be larger than %d, sent %d", b.MaxCallDataSize, len(signedTx.Data())) } } - return b.eth.syncService.ApplyTransaction(signedTx) + return b.eth.syncService.ValidateAndApplySequencerTransaction(signedTx) } // OVM Disabled return b.eth.txPool.AddLocal(signedTx) diff --git a/l2geth/rollup/client.go b/l2geth/rollup/client.go index 0e55550e46b0..d44fbf2c8ede 100644 --- a/l2geth/rollup/client.go +++ b/l2geth/rollup/client.go @@ -116,14 +116,14 @@ type decoded struct { type RollupClient interface { GetEnqueue(index uint64) (*types.Transaction, error) GetLatestEnqueue() (*types.Transaction, error) - GetTransaction(uint64) (*types.Transaction, error) - GetLatestTransaction() (*types.Transaction, error) + GetTransaction(uint64, Backend) (*types.Transaction, error) + GetLatestTransaction(Backend) (*types.Transaction, error) GetEthContext(uint64) (*EthContext, error) GetLatestEthContext() (*EthContext, error) GetLastConfirmedEnqueue() (*types.Transaction, error) GetLatestTransactionBatch() (*Batch, []*types.Transaction, error) GetTransactionBatch(uint64) (*Batch, []*types.Transaction, error) - SyncStatus() (*SyncStatus, error) + SyncStatus(Backend) (*SyncStatus, error) GetL1GasPrice() (*big.Int, error) } @@ -373,12 +373,15 @@ func batchedTransactionToTransaction(res *transaction, signer *types.OVMSigner) } // GetTransaction will get a transaction by Canonical Transaction Chain index -func (c *Client) GetTransaction(index uint64) (*types.Transaction, error) { +func (c *Client) GetTransaction(index uint64, backend Backend) (*types.Transaction, error) { str := strconv.FormatUint(index, 10) response, err := c.client.R(). SetPathParams(map[string]string{ "index": str, }). + SetQueryParams(map[string]string{ + "backend": backend.String(), + }). SetResult(&TransactionResponse{}). Get("/transaction/index/{index}") @@ -394,9 +397,12 @@ func (c *Client) GetTransaction(index uint64) (*types.Transaction, error) { // GetLatestTransaction will get the latest transaction, meaning the transaction // with the greatest Canonical Transaction Chain index -func (c *Client) GetLatestTransaction() (*types.Transaction, error) { +func (c *Client) GetLatestTransaction(backend Backend) (*types.Transaction, error) { response, err := c.client.R(). SetResult(&TransactionResponse{}). + SetQueryParams(map[string]string{ + "backend": backend.String(), + }). Get("/transaction/latest") if err != nil { @@ -483,9 +489,12 @@ func (c *Client) GetLastConfirmedEnqueue() (*types.Transaction, error) { } // SyncStatus will query the remote server to determine if it is still syncing -func (c *Client) SyncStatus() (*SyncStatus, error) { +func (c *Client) SyncStatus(backend Backend) (*SyncStatus, error) { response, err := c.client.R(). SetResult(&SyncStatus{}). + SetQueryParams(map[string]string{ + "backend": backend.String(), + }). Get("/eth/syncing") if err != nil { diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 47d7f6a68a66..60daa9454b1e 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -23,8 +23,9 @@ import ( "github.com/ethereum/go-ethereum/eth/gasprice" ) -// SyncService implements the verifier functionality as well as the reorg -// protection for the sequencer. +// SyncService implements the main functionality around pulling in transactions +// and executing them. It can be configured to run in both sequencer mode and in +// verifier mode. type SyncService struct { ctx context.Context cancel context.CancelFunc @@ -33,6 +34,7 @@ type SyncService struct { scope event.SubscriptionScope txFeed event.Feed txLock sync.Mutex + loopLock sync.Mutex enable bool eth1ChainId uint64 bc *core.BlockChain @@ -40,10 +42,13 @@ type SyncService struct { RollupGpo *gasprice.RollupOracle client RollupClient syncing atomic.Value + chainHeadSub event.Subscription OVMContext OVMContext confirmationDepth uint64 pollInterval time.Duration timestampRefreshThreshold time.Duration + chainHeadCh chan core.ChainHeadEvent + backend Backend } // NewSyncService returns an initialized sync service @@ -56,9 +61,9 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co _ = cancel // satisfy govet if cfg.IsVerifier { - log.Info("Running in verifier mode") + log.Info("Running in verifier mode", "sync-type", cfg.Backend.String()) } else { - log.Info("Running in sequencer mode") + log.Info("Running in sequencer mode", "sync-type", cfg.Backend.String()) } pollInterval := cfg.PollInterval @@ -79,7 +84,7 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co } // Initialize the rollup client client := NewClient(cfg.RollupClientHttp, chainID) - log.Info("Configured rollup client", "url", cfg.RollupClientHttp, "chain-id", chainID.Uint64(), "ctc-deploy-height", cfg.CanonicalTransactionChainDeployHeight) + log.Info("Configured rollup client", "url", cfg.RollupClientHttp, "chain-id", chainID.Uint64()) service := SyncService{ ctx: ctx, cancel: cancel, @@ -89,13 +94,23 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co syncing: atomic.Value{}, bc: bc, txpool: txpool, + chainHeadCh: make(chan core.ChainHeadEvent, 1), eth1ChainId: cfg.Eth1ChainId, client: client, db: db, pollInterval: pollInterval, timestampRefreshThreshold: timestampRefreshThreshold, + backend: cfg.Backend, } + // The chainHeadSub is used to synchronize the SyncService with the chain. + // As the SyncService processes transactions, it waits until the transaction + // is added to the chain. This synchronization is required for handling + // reorgs and also favors safety over liveliness. If a transaction breaks + // things downstream, it is expected that this channel will halt ingestion + // of additional transactions by the SyncService. + service.chainHeadSub = service.bc.SubscribeChainHeadEvent(service.chainHeadCh) + // Initial sync service setup if it is enabled. This code depends on // a remote server that indexes the layer one contracts. Place this // code behind this if statement so that this can run without the @@ -108,9 +123,9 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co return nil, fmt.Errorf("Rollup client unable to connect: %w", err) } - // Ensure that the remote is still not syncing + // Wait until the remote service is done syncing for { - status, err := service.client.SyncStatus() + status, err := service.client.SyncStatus(service.backend) if err != nil { log.Error("Cannot get sync status") continue @@ -125,30 +140,26 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co // Initialize the latest L1 data here to make sure that // it happens before the RPC endpoints open up // Only do it if the sync service is enabled so that this - // can be ran without needing to have a configured client. + // can be ran without needing to have a configured RollupClient. err = service.initializeLatestL1(cfg.CanonicalTransactionChainDeployHeight) if err != nil { return nil, fmt.Errorf("Cannot initialize latest L1 data: %w", err) } + // Log the OVMContext information on startup bn := service.GetLatestL1BlockNumber() ts := service.GetLatestL1Timestamp() log.Info("Initialized Latest L1 Info", "blocknumber", bn, "timestamp", ts) - var i, q string index := service.GetLatestIndex() queueIndex := service.GetLatestEnqueueIndex() - if index == nil { - i = "" - } else { - i = strconv.FormatUint(*index, 10) - } - if queueIndex == nil { - q = "" - } else { - q = strconv.FormatUint(*queueIndex, 10) + verifiedIndex := service.GetLatestVerifiedIndex() + block := service.bc.CurrentBlock() + if block == nil { + block = types.NewBlock(&types.Header{}, nil, nil, nil) } - log.Info("Initialized Eth Context", "index", i, "queue-index", q) + header := block.Header() + log.Info("Initial Rollup State", "state", header.Root.Hex(), "index", stringify(index), "queue-index", stringify(queueIndex), "verified-index", verifiedIndex) // The sequencer needs to sync to the tip at start up // By setting the sync status to true, it will prevent RPC calls. @@ -157,10 +168,11 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co service.setSyncStatus(true) } } - return &service, nil } +// ensureClient checks to make sure that the remote transaction source is +// available. It will return an error if it cannot connect via HTTP func (s *SyncService) ensureClient() error { _, err := s.client.GetLatestEthContext() if err != nil { @@ -169,31 +181,29 @@ func (s *SyncService) ensureClient() error { return nil } -// Start initializes the service, connecting to Ethereum1 and starting the -// subservices required for the operation of the SyncService. -// txs through syncservice go to mempool.locals -// txs through rpc go to mempool.remote +// Start initializes the service func (s *SyncService) Start() error { if !s.enable { + log.Info("Running without syncing enabled") return nil } log.Info("Initializing Sync Service", "eth1-chainid", s.eth1ChainId) - // When a sequencer, be sure to sync to the tip of the ctc before allowing - // user transactions. - if !s.verifier { - err := s.syncTransactionsToTip() + if s.verifier { + go s.VerifierLoop() + } else { + // The sequencer must sync the transactions to the tip and the + // pending queue transactions on start before setting sync status + // to false and opening up the RPC to accept transactions. + err := s.syncTransactionsToTip(s.backend) if err != nil { return fmt.Errorf("Cannot sync transactions to the tip: %w", err) } - // TODO: This should also sync the enqueue'd transactions that have not - // been synced yet + err = s.syncQueueToTip() + if err != nil { + log.Error("Sequencer cannot sync queue", "msg", err) + } s.setSyncStatus(false) - } - - if s.verifier { - go s.VerifierLoop() - } else { go s.SequencerLoop() } return nil @@ -210,6 +220,7 @@ func (s *SyncService) initializeLatestL1(ctcDeployHeight *big.Int) error { if ctcDeployHeight == nil { return errors.New("Must configure with canonical transaction chain deploy height") } + log.Info("Initializing initial OVM Context", "ctc-deploy-height", ctcDeployHeight.Uint64()) context, err := s.client.GetEthContext(ctcDeployHeight.Uint64()) if err != nil { return fmt.Errorf("Cannot fetch ctc deploy block at height %d: %w", ctcDeployHeight.Uint64(), err) @@ -223,7 +234,7 @@ func (s *SyncService) initializeLatestL1(ctcDeployHeight *big.Int) error { block = s.bc.CurrentBlock() idx := block.Number().Uint64() if idx > *index { - // This is recoverable with a reorg + // This is recoverable with a reorg but should never happen return fmt.Errorf("Current block height greater than index") } s.SetLatestIndex(&idx) @@ -237,24 +248,21 @@ func (s *SyncService) initializeLatestL1(ctcDeployHeight *big.Int) error { s.SetLatestL1Timestamp(tx.L1Timestamp()) s.SetLatestL1BlockNumber(tx.L1BlockNumber().Uint64()) } - // Only the sequencer cares about latest queue index - if !s.verifier { - queueIndex := s.GetLatestEnqueueIndex() - if queueIndex == nil { - enqueue, err := s.client.GetLastConfirmedEnqueue() - // There are no enqueues yet - if errors.Is(err, errElementNotFound) { - return nil - } - // Other unexpected error - if err != nil { - return fmt.Errorf("Cannot fetch last confirmed queue tx: %w", err) - } - // No error, the queue element was found - queueIndex = enqueue.GetMeta().QueueIndex + queueIndex := s.GetLatestEnqueueIndex() + if queueIndex == nil { + enqueue, err := s.client.GetLastConfirmedEnqueue() + // There are no enqueues yet + if errors.Is(err, errElementNotFound) { + return nil + } + // Other unexpected error + if err != nil { + return fmt.Errorf("Cannot fetch last confirmed queue tx: %w", err) } - s.SetLatestEnqueueIndex(queueIndex) + // No error, the queue element was found + queueIndex = enqueue.GetMeta().QueueIndex } + s.SetLatestEnqueueIndex(queueIndex) return nil } @@ -281,6 +289,8 @@ func (s *SyncService) IsSyncing() bool { // started by this service. func (s *SyncService) Stop() error { s.scope.Close() + s.chainHeadSub.Unsubscribe() + close(s.chainHeadCh) if s.cancel != nil { defer s.cancel() @@ -288,6 +298,7 @@ func (s *SyncService) Stop() error { return nil } +// VerifierLoop is the main loop for Verifier mode func (s *SyncService) VerifierLoop() { log.Info("Starting Verifier Loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold) for { @@ -301,43 +312,26 @@ func (s *SyncService) VerifierLoop() { } } +// verify is the main logic for the Verifier. The verifier logic is different +// depending on the Backend func (s *SyncService) verify() error { - // The verifier polls for ctc transactions. - // the ctc transactions are extending the chain. - latest, err := s.client.GetLatestTransaction() - if errors.Is(err, errElementNotFound) { - log.Debug("latest transaction not found") - return nil - } - if err != nil { - return err - } - - var start uint64 - if s.GetLatestIndex() == nil { - start = 0 - } else { - start = *s.GetLatestIndex() + 1 - } - end := *latest.GetMeta().Index - log.Info("Polling transactions", "start", start, "end", end) - for i := start; i <= end; i++ { - tx, err := s.client.GetTransaction(i) + switch s.backend { + case BackendL1: + err := s.syncTransactionBatchesToTip() if err != nil { - return fmt.Errorf("cannot get tx in loop: %w", err) + log.Error("Verifier cannot sync transaction batches to tip", "msg", err) } - - log.Debug("Applying transaction", "index", i) - err = s.maybeApplyTransaction(tx) + case BackendL2: + err := s.syncTransactionsToTip(BackendL2) if err != nil { - return fmt.Errorf("could not apply transaction: %w", err) + log.Error("Verifier cannot sync transactions with BackendL2", "msg", err) } - s.SetLatestIndex(&i) } - return nil } +// SequencerLoop is the polling loop that runs in sequencer mode. It sequences +// transactions and then updates the EthContext. func (s *SyncService) SequencerLoop() { log.Info("Starting Sequencer Loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold) for { @@ -345,92 +339,32 @@ func (s *SyncService) SequencerLoop() { log.Error("Cannot update L1 gas price", "msg", err) } s.txLock.Lock() - err := s.sequence() - if err != nil { + if err := s.sequence(); err != nil { log.Error("Could not sequence", "error", err) } s.txLock.Unlock() - - if s.updateContext() != nil { + if err := s.updateContext(); err != nil { log.Error("Could not update execution context", "error", err) } - time.Sleep(s.pollInterval) } } +// sequence is the main logic for the Sequencer. It will sync any `enqueue` +// transactions it has yet to sync and then pull in transaction batches to +// compare against the transactions it has in its local state. The sequencer +// should reorg based on the transaction batches that are posted because +// L1 is the source of truth. The sequencer concurrently accepts user +// transactions via the RPC. func (s *SyncService) sequence() error { - // Only the sequencer needs to poll for enqueue transactions - // and then can choose when to apply them. We choose to apply - // transactions such that it makes for efficient batch submitting. - // Place as many L1ToL2 transactions in the same context as possible - // by executing them one after another. - latest, err := s.client.GetLatestEnqueue() - if errors.Is(err, errElementNotFound) { - log.Debug("No enqueue transactions found") - return nil - } + err := s.syncQueueToTip() if err != nil { - return fmt.Errorf("cannot fetch latest enqueue: %w", err) - } - // Compare the remote latest queue index to the local latest - // queue index. If the remote latest queue index is greater - // than the local latest queue index, be sure to ingest more - // enqueued transactions - var start uint64 - if s.GetLatestEnqueueIndex() == nil { - start = 0 - } else { - start = *s.GetLatestEnqueueIndex() + 1 + log.Error("Sequencer cannot sync queue", "msg", err) } - end := *latest.GetMeta().QueueIndex - - log.Info("Polling enqueued transactions", "start", start, "end", end) - for i := start; i <= end; i++ { - enqueue, err := s.client.GetEnqueue(i) - if err != nil { - return fmt.Errorf("Cannot get enqueue in loop %d: %w", i, err) - } - - if enqueue == nil { - log.Debug("No enqueue transaction found") - return nil - } - - // This should never happen - if enqueue.L1BlockNumber() == nil { - return fmt.Errorf("No blocknumber for enqueue idx %d, timestamp %d, blocknumber %d", i, enqueue.L1Timestamp(), enqueue.L1BlockNumber()) - } - - // Update the timestamp and blocknumber based on the enqueued - // transactions - if enqueue.L1Timestamp() > s.GetLatestL1Timestamp() { - ts := enqueue.L1Timestamp() - bn := enqueue.L1BlockNumber().Uint64() - s.SetLatestL1Timestamp(ts) - s.SetLatestL1BlockNumber(bn) - log.Info("Updated Eth Context from enqueue", "index", i, "timestamp", ts, "blocknumber", bn) - } - - log.Debug("Applying enqueue transaction", "index", i) - err = s.applyTransaction(enqueue) - if err != nil { - return fmt.Errorf("could not apply transaction: %w", err) - } - - s.SetLatestEnqueueIndex(enqueue.GetMeta().QueueIndex) - if enqueue.GetMeta().Index == nil { - latest := s.GetLatestIndex() - index := uint64(0) - if latest != nil { - index = *latest + 1 - } - s.SetLatestIndex(&index) - } else { - s.SetLatestIndex(enqueue.GetMeta().Index) - } + err = s.syncTransactionBatchesToTip() + if err != nil { + log.Error("Sequencer cannot sync transaction batches", "msg", err) } - return nil } @@ -440,7 +374,7 @@ func (s *SyncService) sequence() error { func (s *SyncService) updateL1GasPrice() error { l1GasPrice, err := s.client.GetL1GasPrice() if err != nil { - return err + return fmt.Errorf("cannot fetch L1 gas price: %w", err) } s.RollupGpo.SetDataPrice(l1GasPrice) log.Info("Adjusted L1 Gas Price", "gasprice", l1GasPrice) @@ -454,7 +388,6 @@ func (s *SyncService) updateContext() error { if err != nil { return err } - current := time.Unix(int64(s.GetLatestL1Timestamp()), 0) next := time.Unix(int64(context.Timestamp), 0) if next.Sub(current) > s.timestampRefreshThreshold { @@ -462,81 +395,9 @@ func (s *SyncService) updateContext() error { s.SetLatestL1BlockNumber(context.BlockNumber) s.SetLatestL1Timestamp(context.Timestamp) } - return nil } -// This function must sync all the way to the tip -// TODO: it should then sync all of the enqueue transactions -func (s *SyncService) syncTransactionsToTip() error { - // Then set up a while loop that only breaks when the latest - // transaction does not change through two runs of the loop. - // The latest transaction can change during the timeframe of - // all of the transactions being sync'd. - for { - // This function must be sure to sync all the way to the tip. - // First query the latest transaction - latest, err := s.client.GetLatestTransaction() - if errors.Is(err, errElementNotFound) { - log.Info("No transactions to sync") - return nil - } - if err != nil { - log.Error("Cannot get latest transaction", "msg", err) - time.Sleep(time.Second * 2) - continue - } - tipHeight := latest.GetMeta().Index - index := rawdb.ReadHeadIndex(s.db) - start := uint64(0) - if index != nil { - start = *index + 1 - } - - log.Info("Syncing transactions to tip", "start", start, "end", *tipHeight) - for i := start; i <= *tipHeight; i++ { - tx, err := s.client.GetTransaction(i) - if err != nil { - log.Error("Cannot get transaction", "index", i, "msg", err) - time.Sleep(time.Second * 2) - continue - } - // The transaction does not yet exist in the ctc - if tx == nil { - index := latest.GetMeta().Index - if index == nil { - return fmt.Errorf("Unexpected nil index") - } - return fmt.Errorf("Transaction %d not found when %d is latest", i, *index) - } - err = s.maybeApplyTransaction(tx) - if err != nil { - return fmt.Errorf("Cannot apply transaction: %w", err) - } - if err != nil { - log.Error("Cannot ingest transaction", "index", i) - } - s.SetLatestIndex(tx.GetMeta().Index) - if types.QueueOrigin(tx.QueueOrigin().Uint64()) == types.QueueOriginL1ToL2 { - queueIndex := tx.GetMeta().QueueIndex - s.SetLatestEnqueueIndex(queueIndex) - } - } - // Be sure to check that no transactions came in while - // the above loop was running - post, err := s.client.GetLatestTransaction() - if err != nil { - return fmt.Errorf("Cannot get latest transaction: %w", err) - } - // These transactions should always have an index since they - // are already in the ctc. - if *latest.GetMeta().Index == *post.GetMeta().Index { - log.Info("Done syncing transactions to tip") - return nil - } - } -} - // Methods for safely accessing and storing the latest // L1 blocknumber and timestamp. These are held in memory. @@ -625,101 +486,149 @@ func (s *SyncService) SetLatestVerifiedIndex(index *uint64) { } } -// reorganize will reorganize to directly to the index passed in. -// The caller must handle the offset relative to the ctc. -func (s *SyncService) reorganize(index uint64) error { - if index == 0 { - return nil - } - err := s.bc.SetHead(index) - if err != nil { - return fmt.Errorf("Cannot reorganize in syncservice: %w", err) - } - - // TODO: make sure no off by one error here - s.SetLatestIndex(&index) - - // When in sequencer mode, be sure to roll back the latest queue - // index as well. - if !s.verifier { - enqueue, err := s.client.GetLastConfirmedEnqueue() - if err != nil { - return fmt.Errorf("cannot reorganize: %w", err) - } - s.SetLatestEnqueueIndex(enqueue.GetMeta().QueueIndex) +// applyTransaction is a higher level API for applying a transaction +func (s *SyncService) applyTransaction(tx *types.Transaction) error { + if tx.GetMeta().Index != nil { + return s.applyIndexedTransaction(tx) } - log.Info("Reorganizing", "height", index) - return nil + return s.applyTransactionToTip(tx) } -// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and -// starts sending event to the given channel. -func (s *SyncService) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { - return s.scope.Track(s.txFeed.Subscribe(ch)) +// applyIndexedTransaction applys a transaction that has an index. This means +// that the source of the transaction was either a L1 batch or from the +// sequencer. +func (s *SyncService) applyIndexedTransaction(tx *types.Transaction) error { + if tx == nil { + return errors.New("Transaction is nil in applyIndexedTransaction") + } + index := tx.GetMeta().Index + if index == nil { + return errors.New("No index found in applyIndexedTransaction") + } + log.Trace("Applying indexed transaction", "index", *index) + next := s.GetNextIndex() + if *index == next { + return s.applyTransactionToTip(tx) + } + if *index < next { + return s.applyHistoricalTransaction(tx) + } + return fmt.Errorf("Received tx at index %d when looking for %d", *index, next) } -// maybeApplyTransaction will potentially apply the transaction after first -// inspecting the local database. This is mean to prevent transactions from -// being replayed. -func (s *SyncService) maybeApplyTransaction(tx *types.Transaction) error { +// applyHistoricalTransaction will compare a historical transaction against what +// is locally indexed. This will trigger a reorg in the future +func (s *SyncService) applyHistoricalTransaction(tx *types.Transaction) error { if tx == nil { - return fmt.Errorf("nil transaction passed to maybeApplyTransaction") + return errors.New("Transaction is nil in applyHistoricalTransaction") } - - log.Debug("Maybe applying transaction", "hash", tx.Hash().Hex()) index := tx.GetMeta().Index if index == nil { - return fmt.Errorf("nil index in maybeApplyTransaction") + return errors.New("No index is found in applyHistoricalTransaction") } - // Handle off by one + // Handle the off by one block := s.bc.GetBlockByNumber(*index + 1) - - // The transaction has yet to be played, so it is safe to apply if block == nil { - err := s.applyTransaction(tx) - if err != nil { - return fmt.Errorf("Maybe apply transaction failed on index %d: %w", *index, err) - } - return nil + return fmt.Errorf("Block %d is not found", *index+1) } - // There is already a transaction at that index, so check - // for its equality. txs := block.Transactions() if len(txs) != 1 { - log.Info("block", "txs", len(txs), "number", block.Number().Uint64()) - return fmt.Errorf("More than 1 transaction in block") + return fmt.Errorf("More than one transaction found in block %d", *index+1) } - if isCtcTxEqual(tx, txs[0]) { - log.Info("Matching transaction found", "index", *index) + if !isCtcTxEqual(tx, txs[0]) { + log.Error("Mismatched transaction", "index", index) } else { - log.Warn("Non matching transaction found", "index", *index) + log.Debug("Batched transaction matches", "index", index, "hash", tx.Hash().Hex()) } return nil } -// Lower level API used to apply a transaction, must only be used with -// transactions that came from L1. -func (s *SyncService) applyTransaction(tx *types.Transaction) error { +// applyTransactionToTip will do sanity checks on the transaction before +// applying it to the tip. It blocks until the transaction has been included in +// the chain. +func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { + if tx == nil { + return errors.New("nil transaction passed to applyTransactionToTip") + } + log.Trace("Applying transaction to tip") + if tx.L1Timestamp() == 0 { + ts := s.GetLatestL1Timestamp() + bn := s.GetLatestL1BlockNumber() + tx.SetL1Timestamp(ts) + tx.SetL1BlockNumber(bn) + } else if tx.L1Timestamp() > s.GetLatestL1Timestamp() { + ts := tx.L1Timestamp() + bn := tx.L1BlockNumber() + s.SetLatestL1Timestamp(ts) + s.SetLatestL1BlockNumber(bn.Uint64()) + } else if tx.L1Timestamp() < s.GetLatestL1Timestamp() { + // TODO: this should force a reorg + log.Error("Timestamp monotonicity violation", "hash", tx.Hash().Hex()) + } + + if tx.GetMeta().Index == nil { + index := s.GetLatestIndex() + if index == nil { + tx.SetIndex(0) + } else { + tx.SetIndex(*index + 1) + } + } + s.SetLatestIndex(tx.GetMeta().Index) + if tx.GetMeta().QueueIndex != nil { + s.SetLatestEnqueueIndex(tx.GetMeta().QueueIndex) + } + + // This is a temporary fix for a bug in the SequencerEntrypoint. It will + // be removed when the custom batch serialization is removed in favor of + // batching RLP encoded transactions. tx = fixType(tx) + txs := types.Transactions{tx} s.txFeed.Send(core.NewTxsEvent{Txs: txs}) + // Block until the transaction has been added to the chain + log.Debug("Waiting for transaction to be added to chain", "hash", tx.Hash().Hex()) + <-s.chainHeadCh + + return nil +} + +// applyBatchedTransaction applies transactions that were batched to layer one. +// The sequencer checks for batches over time to make sure that it does not +// deviate from the L1 state and this is the main method of transaction +// ingestion for the verifier. +func (s *SyncService) applyBatchedTransaction(tx *types.Transaction) error { + if tx == nil { + return errors.New("nil transaction passed into applyBatchedTransaction") + } + index := tx.GetMeta().Index + if index == nil { + return errors.New("No index found on transaction") + } + log.Trace("Applying batched transaction", "index", *index) + err := s.applyIndexedTransaction(tx) + if err != nil { + return fmt.Errorf("Cannot apply batched transaction: %w", err) + } + s.SetLatestVerifiedIndex(index) return nil } // Higher level API for applying transactions. Should only be called for // queue origin sequencer transactions, as the contracts on L1 manage the same // validity checks that are done here. -func (s *SyncService) ApplyTransaction(tx *types.Transaction) error { +func (s *SyncService) ValidateAndApplySequencerTransaction(tx *types.Transaction) error { + if s.verifier { + return errors.New("Verifier does not accept transactions out of band") + } if tx == nil { - return fmt.Errorf("nil transaction passed to ApplyTransaction") + return errors.New("nil transaction passed to ValidateAndApplySequencerTransaction") } - log.Debug("Sending transaction to sync service", "hash", tx.Hash().Hex()) s.txLock.Lock() defer s.txLock.Unlock() - if s.verifier { - return errors.New("Verifier does not accept transactions out of band") - } + log.Trace("Sequencer transaction validation", "hash", tx.Hash().Hex()) + qo := tx.QueueOrigin() if qo == nil { return errors.New("invalid transaction with no queue origin") @@ -732,14 +641,7 @@ func (s *SyncService) ApplyTransaction(tx *types.Transaction) error { return fmt.Errorf("invalid transaction: %w", err) } - if tx.L1Timestamp() == 0 { - ts := s.GetLatestL1Timestamp() - bn := s.GetLatestL1BlockNumber() - tx.SetL1Timestamp(ts) - tx.SetL1BlockNumber(bn) - } - - // Set the raw transaction data in the meta + // Set the raw transaction data in the meta. txRaw, err := getRawTransaction(tx) if err != nil { return fmt.Errorf("invalid transaction: %w", err) @@ -756,10 +658,176 @@ func (s *SyncService) ApplyTransaction(tx *types.Transaction) error { txRaw, ) tx.SetTransactionMeta(newMeta) - return s.applyTransaction(tx) } +// syncTransactionsToTip will sync all of the transactions to the tip. +// The Backend determines the source of the transactions. +func (s *SyncService) syncTransactionsToTip(backend Backend) error { + s.loopLock.Lock() + defer s.loopLock.Unlock() + + for { + latest, err := s.client.GetLatestTransaction(backend) + if err != nil { + return fmt.Errorf("Cannot get latest transaction: %w", err) + } + if latest == nil { + log.Info("No transactions to sync") + return nil + } + latestIndex := latest.GetMeta().Index + if latestIndex == nil { + return errors.New("Latest index is nil") + } + nextIndex := s.GetNextIndex() + log.Info("Syncing transactions to tip", "start", *latestIndex, "end", nextIndex) + + for i := nextIndex; i <= *latestIndex; i++ { + tx, err := s.client.GetTransaction(i, backend) + if err != nil { + log.Error("Cannot get latest transaction", "msg", err) + time.Sleep(time.Second * 2) + continue + } + if tx == nil { + return fmt.Errorf("Transaction %d is nil", i) + } + err = s.applyTransaction(tx) + if err != nil { + return fmt.Errorf("Cannot apply transaction: %w", err) + } + } + + post, err := s.client.GetLatestTransaction(backend) + if err != nil { + return fmt.Errorf("Cannot get latest transaction: %w", err) + } + postLatestIndex := post.GetMeta().Index + if postLatestIndex == nil { + return errors.New("Latest index is nil") + } + if *postLatestIndex == *latestIndex { + return nil + } + } +} + +// syncTransactionBatchesToTip will sync all of the transaction batches to the +// tip +func (s *SyncService) syncTransactionBatchesToTip() error { + s.loopLock.Lock() + defer s.loopLock.Unlock() + log.Debug("Syncing transaction batches to tip") + + for { + latest, _, err := s.client.GetLatestTransactionBatch() + if err != nil { + return fmt.Errorf("Cannot get latest transaction batch: %w", err) + } + if latest == nil { + log.Info("No transaction batches to sync") + return nil + } + latestIndex := latest.Index + nextIndex := s.GetNextVerifiedIndex() + + for i := nextIndex; i <= latestIndex; i++ { + log.Debug("Fetching transaction batch", "index", i) + _, txs, err := s.client.GetTransactionBatch(i) + if err != nil { + return fmt.Errorf("Cannot get transaction batch: %w", err) + } + for _, tx := range txs { + s.applyBatchedTransaction(tx) + } + } + post, _, err := s.client.GetLatestTransactionBatch() + if err != nil { + return fmt.Errorf("Cannot get latest transaction batch: %w", err) + } + if post.Index == latest.Index { + return nil + } + } +} + +// syncQueueToTip will sync the `enqueue` transactions to the tip +// from the last known `enqueue` transaction +func (s *SyncService) syncQueueToTip() error { + s.loopLock.Lock() + defer s.loopLock.Unlock() + + for { + latest, err := s.client.GetLatestEnqueue() + if err != nil { + return fmt.Errorf("Cannot get latest enqueue transaction: %w", err) + } + if latest == nil { + log.Info("No enqueue transactions to sync") + return nil + } + latestIndex := latest.GetMeta().QueueIndex + if latestIndex == nil { + return errors.New("Latest queue transaction has no queue index") + } + nextIndex := s.GetNextEnqueueIndex() + log.Info("Syncing enqueue transactions to tip", "start", *latestIndex, "end", nextIndex) + + for i := nextIndex; i <= *latestIndex; i++ { + tx, err := s.client.GetEnqueue(i) + if err != nil { + return fmt.Errorf("Canot get enqueue transaction; %w", err) + } + if tx == nil { + return fmt.Errorf("Cannot get queue tx at index %d", i) + } + err = s.applyTransaction(tx) + if err != nil { + return fmt.Errorf("Cannot apply transaction: %w", err) + } + } + post, err := s.client.GetLatestEnqueue() + if err != nil { + return fmt.Errorf("Cannot get latest transaction: %w", err) + } + postLatestIndex := post.GetMeta().QueueIndex + if postLatestIndex == nil { + return errors.New("Latest queue index is nil") + } + if *latestIndex == *postLatestIndex { + return nil + } + } +} + +// updateEthContext will update the OVM execution context's +// timestamp and blocknumber if enough time has passed since +// it was last updated. This is a sequencer only function. +func (s *SyncService) updateEthContext() error { + context, err := s.client.GetLatestEthContext() + if err != nil { + return fmt.Errorf("Cannot get eth context: %w", err) + } + current := time.Unix(int64(s.GetLatestL1Timestamp()), 0) + next := time.Unix(int64(context.Timestamp), 0) + if next.Sub(current) > s.timestampRefreshThreshold { + log.Info("Updating Eth Context", "timetamp", context.Timestamp, "blocknumber", context.BlockNumber) + s.SetLatestL1BlockNumber(context.BlockNumber) + s.SetLatestL1Timestamp(context.Timestamp) + } + return nil +} + +// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and +// starts sending event to the given channel. +func (s *SyncService) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { + return s.scope.Track(s.txFeed.Subscribe(ch)) +} + +// getRawTransaction will return the raw serialization of the transaction. This +// function will be deprecated in the near future when the batch serialization +// is RLP encoded transactions. func getRawTransaction(tx *types.Transaction) ([]byte, error) { if tx == nil { return nil, errors.New("Cannot process nil transaction") @@ -807,6 +875,8 @@ func getRawTransaction(tx *types.Transaction) ([]byte, error) { return data.Bytes(), nil } +// fillBytes is taken from a newer version of the golang standard library +// will panic if the provided biginteger does not fit in the provided `size` number of bytes func fillBytes(x *big.Int, size int) []byte { b := x.Bytes() switch { @@ -821,6 +891,8 @@ func fillBytes(x *big.Int, size int) []byte { } } +// getSignatureType is a patch to fix a bug in the contracts. Will be deprecated +// with the move to RLP encoded transactions in batches. func getSignatureType(tx *types.Transaction) uint8 { if tx.SignatureHashType() == 0 { return 0 @@ -849,3 +921,10 @@ func fixType(tx *types.Transaction) *types.Transaction { tx.SetTransactionMeta(fixed) return tx } + +func stringify(i *uint64) string { + if i == nil { + return "" + } + return strconv.FormatUint(*i, 10) +} From d8359ce63574b7f5c4b49ac08576010a20dfa3ff Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 21 Apr 2021 18:01:47 -0700 Subject: [PATCH 04/29] l2geth: fix error handling for get tx batch --- l2geth/rollup/client.go | 4 ++-- l2geth/rollup/sync_service.go | 28 +++++++++++++++------------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/l2geth/rollup/client.go b/l2geth/rollup/client.go index d44fbf2c8ede..26241f4e821c 100644 --- a/l2geth/rollup/client.go +++ b/l2geth/rollup/client.go @@ -548,8 +548,8 @@ func (c *Client) GetTransactionBatch(index uint64) (*Batch, []*types.Transaction // parseTransactionBatchResponse will turn a TransactionBatchResponse into a // Batch and its corresponding types.Transactions func parseTransactionBatchResponse(txBatch *TransactionBatchResponse, signer *types.OVMSigner) (*Batch, []*types.Transaction, error) { - if txBatch == nil { - return nil, nil, nil + if txBatch == nil || txBatch.Batch == nil { + return nil, nil, errElementNotFound } batch := txBatch.Batch txs := make([]*types.Transaction, len(txBatch.Transactions)) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 60daa9454b1e..223cece4b330 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -669,13 +669,13 @@ func (s *SyncService) syncTransactionsToTip(backend Backend) error { for { latest, err := s.client.GetLatestTransaction(backend) - if err != nil { - return fmt.Errorf("Cannot get latest transaction: %w", err) - } - if latest == nil { + if errors.Is(err, errElementNotFound) { log.Info("No transactions to sync") return nil } + if err != nil { + return fmt.Errorf("Cannot get latest transaction: %w", err) + } latestIndex := latest.GetMeta().Index if latestIndex == nil { return errors.New("Latest index is nil") @@ -722,13 +722,13 @@ func (s *SyncService) syncTransactionBatchesToTip() error { for { latest, _, err := s.client.GetLatestTransactionBatch() - if err != nil { - return fmt.Errorf("Cannot get latest transaction batch: %w", err) - } - if latest == nil { + if errors.Is(err, errElementNotFound) { log.Info("No transaction batches to sync") return nil } + if err != nil { + return fmt.Errorf("Cannot get latest transaction batch: %w", err) + } latestIndex := latest.Index nextIndex := s.GetNextVerifiedIndex() @@ -739,7 +739,9 @@ func (s *SyncService) syncTransactionBatchesToTip() error { return fmt.Errorf("Cannot get transaction batch: %w", err) } for _, tx := range txs { - s.applyBatchedTransaction(tx) + if err := s.applyBatchedTransaction(tx); err != nil { + return fmt.Errorf("cannot apply batched transaction: %w", err) + } } } post, _, err := s.client.GetLatestTransactionBatch() @@ -760,13 +762,13 @@ func (s *SyncService) syncQueueToTip() error { for { latest, err := s.client.GetLatestEnqueue() - if err != nil { - return fmt.Errorf("Cannot get latest enqueue transaction: %w", err) - } - if latest == nil { + if errors.Is(err, errElementNotFound) { log.Info("No enqueue transactions to sync") return nil } + if err != nil { + return fmt.Errorf("Cannot get latest enqueue transaction: %w", err) + } latestIndex := latest.GetMeta().QueueIndex if latestIndex == nil { return errors.New("Latest queue transaction has no queue index") From 87742d949f6f51ff90a63ac37066c23479b556bd Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 21 Apr 2021 19:05:24 -0700 Subject: [PATCH 05/29] l2geth: update tests to compile and pass --- l2geth/rollup/sync_service_test.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/l2geth/rollup/sync_service_test.go b/l2geth/rollup/sync_service_test.go index f5e2ea175bfa..29cc9e493c5b 100644 --- a/l2geth/rollup/sync_service_test.go +++ b/l2geth/rollup/sync_service_test.go @@ -111,7 +111,7 @@ func TestSyncServiceTransactionEnqueued(t *testing.T) { // The queue index of the L1 to L2 transaction queueIndex := uint64(0) // The index in the ctc - index := uint64(5) + index := uint64(0) tx := types.NewTransaction(0, target, big.NewInt(0), gasLimit, big.NewInt(0), data) txMeta := types.NewTransactionMeta( @@ -133,14 +133,16 @@ func TestSyncServiceTransactionEnqueued(t *testing.T) { }) // Run an iteration of the eloop - err = service.sequence() - if err != nil { - t.Fatal("sequencing failed", err) - } - + err = nil + go func() { + err = service.syncQueueToTip() + }() // Wait for the tx to be confirmed into the chain and then // make sure it is the transactions that was set up with in the mockclient event := <-txCh + if err != nil { + t.Fatal("sequencing failed", err) + } if len(event.Txs) != 1 { t.Fatal("Unexpected number of transactions") } @@ -217,12 +219,15 @@ func TestSyncServiceSync(t *testing.T) { }, }) - err = service.verify() + err = nil + go func() { + err = service.syncTransactionsToTip(BackendL2) + }() + event := <-txCh if err != nil { t.Fatal("verification failed", err) } - event := <-txCh if len(event.Txs) != 1 { t.Fatal("Unexpected number of transactions") } @@ -320,6 +325,7 @@ func newTestSyncService(isVerifier bool) (*SyncService, chan core.NewTxsEvent, e // Set as an empty string as this is a dummy value anyways. // The client needs to be mocked with a mockClient RollupClientHttp: "", + Backend: BackendL2, } service, err := NewSyncService(context.Background(), cfg, txPool, chain, db) @@ -395,7 +401,7 @@ func (m *mockClient) GetLatestEnqueue() (*types.Transaction, error) { return m.getEnqueue[len(m.getEnqueue)-1], nil } -func (m *mockClient) GetTransaction(index uint64) (*types.Transaction, error) { +func (m *mockClient) GetTransaction(index uint64, backend Backend) (*types.Transaction, error) { if m.getTransactionCallCount < len(m.getTransaction) { tx := m.getTransaction[m.getTransactionCallCount] m.getTransactionCallCount++ @@ -404,7 +410,7 @@ func (m *mockClient) GetTransaction(index uint64) (*types.Transaction, error) { return nil, errors.New("") } -func (m *mockClient) GetLatestTransaction() (*types.Transaction, error) { +func (m *mockClient) GetLatestTransaction(backend Backend) (*types.Transaction, error) { if len(m.getTransaction) == 0 { return nil, errors.New("") } @@ -425,7 +431,7 @@ func (m *mockClient) GetLatestEthContext() (*EthContext, error) { } func (m *mockClient) GetLastConfirmedEnqueue() (*types.Transaction, error) { - return nil, nil + return nil, errElementNotFound } func (m *mockClient) GetLatestTransactionBatch() (*Batch, []*types.Transaction, error) { @@ -436,7 +442,7 @@ func (m *mockClient) GetTransactionBatch(index uint64) (*Batch, []*types.Transac return nil, nil, nil } -func (m *mockClient) SyncStatus() (*SyncStatus, error) { +func (m *mockClient) SyncStatus(backend Backend) (*SyncStatus, error) { return &SyncStatus{ Syncing: false, }, nil From 82aea429912d8188c6975bdb173e90b48e6926be Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 21 Apr 2021 21:55:32 -0700 Subject: [PATCH 06/29] l2geth: add sync range functions --- l2geth/rollup/sync_service.go | 119 +++++++++++++++++++++------------- 1 file changed, 75 insertions(+), 44 deletions(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 223cece4b330..7161cdd4665b 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -680,23 +680,14 @@ func (s *SyncService) syncTransactionsToTip(backend Backend) error { if latestIndex == nil { return errors.New("Latest index is nil") } - nextIndex := s.GetNextIndex() - log.Info("Syncing transactions to tip", "start", *latestIndex, "end", nextIndex) - for i := nextIndex; i <= *latestIndex; i++ { - tx, err := s.client.GetTransaction(i, backend) - if err != nil { - log.Error("Cannot get latest transaction", "msg", err) - time.Sleep(time.Second * 2) - continue - } - if tx == nil { - return fmt.Errorf("Transaction %d is nil", i) - } - err = s.applyTransaction(tx) - if err != nil { - return fmt.Errorf("Cannot apply transaction: %w", err) - } + nextIndex := s.GetNextIndex() + if nextIndex-1 == *latestIndex { + log.Info("No new transactions to sync", "tip", *latestIndex, "backend", backend.String()) + return nil + } + if err := s.syncTransactionRange(nextIndex, *latestIndex, backend); err != nil { + return err } post, err := s.client.GetLatestTransaction(backend) @@ -707,12 +698,29 @@ func (s *SyncService) syncTransactionsToTip(backend Backend) error { if postLatestIndex == nil { return errors.New("Latest index is nil") } + if *postLatestIndex == *latestIndex { return nil } } } +// syncTransactionRange will sync a range of transactions from +// start to end (inclusive) from a specific Backend +func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) error { + log.Info("Syncing transactions to tip", "start", start, "end", end) + for i := start; i <= end; i++ { + tx, err := s.client.GetTransaction(i, backend) + if err != nil { + return fmt.Errorf("cannot fetch transaction %d: %w", i, err) + } + if err = s.applyTransaction(tx); err != nil { + return fmt.Errorf("Cannot apply transaction: %w", err) + } + } + return nil +} + // syncTransactionBatchesToTip will sync all of the transaction batches to the // tip func (s *SyncService) syncTransactionBatchesToTip() error { @@ -729,21 +737,16 @@ func (s *SyncService) syncTransactionBatchesToTip() error { if err != nil { return fmt.Errorf("Cannot get latest transaction batch: %w", err) } - latestIndex := latest.Index - nextIndex := s.GetNextVerifiedIndex() - for i := nextIndex; i <= latestIndex; i++ { - log.Debug("Fetching transaction batch", "index", i) - _, txs, err := s.client.GetTransactionBatch(i) - if err != nil { - return fmt.Errorf("Cannot get transaction batch: %w", err) - } - for _, tx := range txs { - if err := s.applyBatchedTransaction(tx); err != nil { - return fmt.Errorf("cannot apply batched transaction: %w", err) - } - } + nextIndex := s.GetNextVerifiedIndex() + if nextIndex-1 == latest.Index { + log.Info("No new batches to sync", "tip", latest.Index) + return nil + } + if err := s.syncTransactionBatchRange(nextIndex, latest.Index); err != nil { + return err } + post, _, err := s.client.GetLatestTransactionBatch() if err != nil { return fmt.Errorf("Cannot get latest transaction batch: %w", err) @@ -754,6 +757,24 @@ func (s *SyncService) syncTransactionBatchesToTip() error { } } +// syncTransactionBatchRange will sync a range of batched transactions from +// start to end (inclusive) +func (s *SyncService) syncTransactionBatchRange(start, end uint64) error { + for i := start; i <= end; i++ { + log.Debug("Fetching transaction batch", "index", i) + _, txs, err := s.client.GetTransactionBatch(i) + if err != nil { + return fmt.Errorf("Cannot get transaction batch: %w", err) + } + for _, tx := range txs { + if err := s.applyBatchedTransaction(tx); err != nil { + return fmt.Errorf("cannot apply batched transaction: %w", err) + } + } + } + return nil +} + // syncQueueToTip will sync the `enqueue` transactions to the tip // from the last known `enqueue` transaction func (s *SyncService) syncQueueToTip() error { @@ -763,7 +784,7 @@ func (s *SyncService) syncQueueToTip() error { for { latest, err := s.client.GetLatestEnqueue() if errors.Is(err, errElementNotFound) { - log.Info("No enqueue transactions to sync") + log.Info("No queue transactions to sync") return nil } if err != nil { @@ -773,22 +794,16 @@ func (s *SyncService) syncQueueToTip() error { if latestIndex == nil { return errors.New("Latest queue transaction has no queue index") } - nextIndex := s.GetNextEnqueueIndex() - log.Info("Syncing enqueue transactions to tip", "start", *latestIndex, "end", nextIndex) - for i := nextIndex; i <= *latestIndex; i++ { - tx, err := s.client.GetEnqueue(i) - if err != nil { - return fmt.Errorf("Canot get enqueue transaction; %w", err) - } - if tx == nil { - return fmt.Errorf("Cannot get queue tx at index %d", i) - } - err = s.applyTransaction(tx) - if err != nil { - return fmt.Errorf("Cannot apply transaction: %w", err) - } + nextIndex := s.GetNextEnqueueIndex() + if nextIndex-1 == *latestIndex { + log.Info("No new queue transactions to sync", "tip", *latestIndex) + return nil } + if err := s.syncQueueTransactionRange(nextIndex, *latestIndex); err != nil { + return err + } + post, err := s.client.GetLatestEnqueue() if err != nil { return fmt.Errorf("Cannot get latest transaction: %w", err) @@ -803,6 +818,22 @@ func (s *SyncService) syncQueueToTip() error { } } +// syncQueueTransactionRange will apply a range of queue transactions from +// start to end (inclusive) +func (s *SyncService) syncQueueTransactionRange(start, end uint64) error { + log.Info("Syncing enqueue transactions to tip", "start", start, "end", end) + for i := start; i <= end; i++ { + tx, err := s.client.GetEnqueue(i) + if err != nil { + return fmt.Errorf("Canot get enqueue transaction; %w", err) + } + if err := s.applyTransaction(tx); err != nil { + return fmt.Errorf("Cannot apply transaction: %w", err) + } + } + return nil +} + // updateEthContext will update the OVM execution context's // timestamp and blocknumber if enough time has passed since // it was last updated. This is a sequencer only function. From c0c3561a32e0aced1451f6fc1077ef4cadb6f5e9 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Thu, 22 Apr 2021 14:46:33 -0700 Subject: [PATCH 07/29] l2geth: add batch index indexing --- l2geth/core/rawdb/rollup_indexes.go | 21 +++++++++++++++++++++ l2geth/core/rawdb/schema.go | 2 ++ l2geth/rollup/sync_service.go | 21 +++++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/l2geth/core/rawdb/rollup_indexes.go b/l2geth/core/rawdb/rollup_indexes.go index 91b1b6385463..be604becc004 100644 --- a/l2geth/core/rawdb/rollup_indexes.go +++ b/l2geth/core/rawdb/rollup_indexes.go @@ -69,3 +69,24 @@ func WriteHeadVerifiedIndex(db ethdb.KeyValueWriter, index uint64) { log.Crit("Failed to store verifier index", "err", err) } } + +// ReadHeadBatchIndex will read the known tip of the processed batches +func ReadHeadBatchIndex(db ethdb.KeyValueReader) *uint64 { + data, _ := db.Get(headBatchKey) + if len(data) == 0 { + return nil + } + ret := new(big.Int).SetBytes(data).Uint64() + return &ret +} + +// WriteHeadBatchIndex will write the known tip of the processed batches +func WriteHeadBatchIndex(db ethdb.KeyValueWriter, index uint64) { + value := new(big.Int).SetUint64(index).Bytes() + if index == 0 { + value = []byte{0} + } + if err := db.Put(headBatchKey, value); err != nil { + log.Crit("Failed to store verifier index", "err", err) + } +} diff --git a/l2geth/core/rawdb/schema.go b/l2geth/core/rawdb/schema.go index d0f951e2bdc8..5563af68fba3 100644 --- a/l2geth/core/rawdb/schema.go +++ b/l2geth/core/rawdb/schema.go @@ -62,6 +62,8 @@ var ( headQueueIndexKey = []byte("LastQueueIndex") // headVerifiedIndexKey tracks the latest verified index headVerifiedIndexKey = []byte("LastVerifiedIndex") + // headBatchKey tracks the latest processed batch + headBatchKey = []byte("LastBatch") preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 7161cdd4665b..72e26ef57e1b 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -486,6 +486,27 @@ func (s *SyncService) SetLatestVerifiedIndex(index *uint64) { } } +// GetLatestBatchIndex reads the last processed transaction batch +func (s *SyncService) GetLatestBatchIndex() *uint64 { + return rawdb.ReadHeadBatchIndex(s.db) +} + +// GetNextBatchIndex reads the index of the next transaction batch to process +func (s *SyncService) GetNextBatchIndex() uint64 { + index := s.GetLatestBatchIndex() + if index == nil { + return 0 + } + return *index + 1 +} + +// SetLatestBatchIndex writes the last index of the transaction batch that was processed +func (s *SyncService) SetLatestBatchIndex(index *uint64) { + if index != nil { + rawdb.WriteHeadBatchIndex(s.db, *index) + } +} + // applyTransaction is a higher level API for applying a transaction func (s *SyncService) applyTransaction(tx *types.Transaction) error { if tx.GetMeta().Index != nil { From 6aba901871cba37ef02240680318c6b276aa01a4 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Thu, 22 Apr 2021 14:48:05 -0700 Subject: [PATCH 08/29] l2geth: update syncservice hot path logging --- l2geth/rollup/sync_service.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 72e26ef57e1b..37a1a93a6485 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -559,7 +559,7 @@ func (s *SyncService) applyHistoricalTransaction(tx *types.Transaction) error { if !isCtcTxEqual(tx, txs[0]) { log.Error("Mismatched transaction", "index", index) } else { - log.Debug("Batched transaction matches", "index", index, "hash", tx.Hash().Hex()) + log.Debug("Historical transaction matches", "index", *index, "hash", tx.Hash().Hex()) } return nil } @@ -571,7 +571,6 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { if tx == nil { return errors.New("nil transaction passed to applyTransactionToTip") } - log.Trace("Applying transaction to tip") if tx.L1Timestamp() == 0 { ts := s.GetLatestL1Timestamp() bn := s.GetLatestL1BlockNumber() @@ -599,6 +598,8 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { if tx.GetMeta().QueueIndex != nil { s.SetLatestEnqueueIndex(tx.GetMeta().QueueIndex) } + // The index was set above so it is safe to dereference + log.Trace("Applying transaction to tip", "index", *tx.GetMeta().Index) // This is a temporary fix for a bug in the SequencerEntrypoint. It will // be removed when the custom batch serialization is removed in favor of From c9027662cef88d9871982713da9fe497f35f3dba Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Thu, 22 Apr 2021 14:48:34 -0700 Subject: [PATCH 09/29] l2geth: use indexed batch index --- l2geth/rollup/sync_service.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 37a1a93a6485..7476801e440f 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -760,7 +760,7 @@ func (s *SyncService) syncTransactionBatchesToTip() error { return fmt.Errorf("Cannot get latest transaction batch: %w", err) } - nextIndex := s.GetNextVerifiedIndex() + nextIndex := s.GetNextBatchIndex() if nextIndex-1 == latest.Index { log.Info("No new batches to sync", "tip", latest.Index) return nil @@ -782,6 +782,7 @@ func (s *SyncService) syncTransactionBatchesToTip() error { // syncTransactionBatchRange will sync a range of batched transactions from // start to end (inclusive) func (s *SyncService) syncTransactionBatchRange(start, end uint64) error { + log.Debug("Syncing transaction batch range", "start", start, "end", end) for i := start; i <= end; i++ { log.Debug("Fetching transaction batch", "index", i) _, txs, err := s.client.GetTransactionBatch(i) @@ -793,6 +794,7 @@ func (s *SyncService) syncTransactionBatchRange(start, end uint64) error { return fmt.Errorf("cannot apply batched transaction: %w", err) } } + s.SetLatestBatchIndex(&i) } return nil } From 570b79a2409ad9a8ad998ac6aa564e016674826f Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Thu, 22 Apr 2021 14:52:05 -0700 Subject: [PATCH 10/29] chore: add changeset --- .changeset/nervous-bobcats-grow.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/nervous-bobcats-grow.md diff --git a/.changeset/nervous-bobcats-grow.md b/.changeset/nervous-bobcats-grow.md new file mode 100644 index 000000000000..fc963b069701 --- /dev/null +++ b/.changeset/nervous-bobcats-grow.md @@ -0,0 +1,5 @@ +--- +"@eth-optimism/l2geth": patch +--- + +Refactor the SyncService to more closely implement the specification. This includes using query params to select the backend from the DTL, trailing syncing of batches for the sequencer, syncing by batches as the verifier as well as unified code paths for transaction ingestion to prevent double ingestion or missed ingestion From 964e05ea05879b675cf5e03e154db13b5beef8b2 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 12 May 2021 14:23:24 -0700 Subject: [PATCH 11/29] l2geth: sync spec refactor (#822) * l2geth: more in depth usage string * l2geth: add standard client getters for index * l2geth: refactor sync service into shared codepaths * l2geth: clean up tests * l2geth: better logging and error handling * test: improve test coverage around timestamps * l2geth: improve docstring * l2geth: rename variable --- l2geth/cmd/utils/flags.go | 2 +- l2geth/rollup/client.go | 40 ++++ l2geth/rollup/sync_service.go | 313 ++++++++++++++++------------- l2geth/rollup/sync_service_test.go | 216 ++++++++++++++++++-- 4 files changed, 420 insertions(+), 151 deletions(-) diff --git a/l2geth/cmd/utils/flags.go b/l2geth/cmd/utils/flags.go index de88994d2a99..8a293c39156c 100644 --- a/l2geth/cmd/utils/flags.go +++ b/l2geth/cmd/utils/flags.go @@ -851,7 +851,7 @@ var ( } RollupBackendFlag = cli.StringFlag{ Name: "rollup.backend", - Usage: "Sync backend for verifiers (\"l1\" or \"l2\")", + Usage: "Sync backend for verifiers (\"l1\" or \"l2\"), defaults to l1", Value: "l1", EnvVar: "ROLLUP_BACKEND", } diff --git a/l2geth/rollup/client.go b/l2geth/rollup/client.go index 26241f4e821c..d16c485a5cec 100644 --- a/l2geth/rollup/client.go +++ b/l2geth/rollup/client.go @@ -116,12 +116,15 @@ type decoded struct { type RollupClient interface { GetEnqueue(index uint64) (*types.Transaction, error) GetLatestEnqueue() (*types.Transaction, error) + GetLatestEnqueueIndex() (*uint64, error) GetTransaction(uint64, Backend) (*types.Transaction, error) GetLatestTransaction(Backend) (*types.Transaction, error) + GetLatestTransactionIndex(Backend) (*uint64, error) GetEthContext(uint64) (*EthContext, error) GetLatestEthContext() (*EthContext, error) GetLastConfirmedEnqueue() (*types.Transaction, error) GetLatestTransactionBatch() (*Batch, []*types.Transaction, error) + GetLatestTransactionBatchIndex() (*uint64, error) GetTransactionBatch(uint64) (*Batch, []*types.Transaction, error) SyncStatus(Backend) (*SyncStatus, error) GetL1GasPrice() (*big.Int, error) @@ -269,6 +272,43 @@ func (c *Client) GetLatestEnqueue() (*types.Transaction, error) { return tx, nil } +// GetLatestEnqueueIndex returns the latest `enqueue()` index +func (c *Client) GetLatestEnqueueIndex() (*uint64, error) { + tx, err := c.GetLatestEnqueue() + if err != nil { + return nil, err + } + index := tx.GetMeta().QueueIndex + if index == nil { + return nil, errors.New("Latest queue index is nil") + } + return index, nil +} + +// GetLatestTransactionIndex returns the latest CTC index that has been batch +// submitted or not, depending on the backend +func (c *Client) GetLatestTransactionIndex(backend Backend) (*uint64, error) { + tx, err := c.GetLatestTransaction(backend) + if err != nil { + return nil, err + } + index := tx.GetMeta().Index + if index == nil { + return nil, errors.New("Latest index is nil") + } + return index, nil +} + +// GetLatestTransactionBatchIndex returns the latest transaction batch index +func (c *Client) GetLatestTransactionBatchIndex() (*uint64, error) { + batch, _, err := c.GetLatestTransactionBatch() + if err != nil { + return nil, err + } + index := batch.Index + return &index, nil +} + // batchedTransactionToTransaction converts a transaction into a // types.Transaction that can be consumed by the SyncService func batchedTransactionToTransaction(res *transaction, signer *types.OVMSigner) (*types.Transaction, error) { diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 7476801e440f..b6f1e4579ed6 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -195,12 +195,10 @@ func (s *SyncService) Start() error { // The sequencer must sync the transactions to the tip and the // pending queue transactions on start before setting sync status // to false and opening up the RPC to accept transactions. - err := s.syncTransactionsToTip(s.backend) - if err != nil { - return fmt.Errorf("Cannot sync transactions to the tip: %w", err) + if err := s.syncTransactionsToTip(); err != nil { + return fmt.Errorf("Sequencer cannot sync transactions to tip: %w", err) } - err = s.syncQueueToTip() - if err != nil { + if err := s.syncQueueToTip(); err != nil { log.Error("Sequencer cannot sync queue", "msg", err) } s.setSyncStatus(false) @@ -317,14 +315,12 @@ func (s *SyncService) VerifierLoop() { func (s *SyncService) verify() error { switch s.backend { case BackendL1: - err := s.syncTransactionBatchesToTip() - if err != nil { - log.Error("Verifier cannot sync transaction batches to tip", "msg", err) + if err := s.syncBatchesToTip(); err != nil { + return fmt.Errorf("Verifier cannot sync transaction batches to tip: %w", err) } case BackendL2: - err := s.syncTransactionsToTip(BackendL2) - if err != nil { - log.Error("Verifier cannot sync transactions with BackendL2", "msg", err) + if err := s.syncTransactionsToTip(); err != nil { + return fmt.Errorf("Verifier cannot sync transactions with BackendL2: %w", err) } } return nil @@ -357,13 +353,38 @@ func (s *SyncService) SequencerLoop() { // L1 is the source of truth. The sequencer concurrently accepts user // transactions via the RPC. func (s *SyncService) sequence() error { - err := s.syncQueueToTip() - if err != nil { - log.Error("Sequencer cannot sync queue", "msg", err) + if err := s.syncQueueToTip(); err != nil { + return fmt.Errorf("Sequencer cannot sequence queue: %w", err) } - err = s.syncTransactionBatchesToTip() - if err != nil { - log.Error("Sequencer cannot sync transaction batches", "msg", err) + if err := s.syncBatchesToTip(); err != nil { + return fmt.Errorf("Sequencer cannot sync transaction batches: %w", err) + } + return nil +} + +func (s *SyncService) syncQueueToTip() error { + if err := s.syncToTip(s.syncQueue, s.client.GetLatestEnqueueIndex); err != nil { + return fmt.Errorf("Cannot sync queue to tip: %w", err) + } + return nil +} + +func (s *SyncService) syncBatchesToTip() error { + if err := s.syncToTip(s.syncBatches, s.client.GetLatestTransactionBatchIndex); err != nil { + return fmt.Errorf("Cannot sync transaction batches to tip: %w", err) + } + return nil +} + +func (s *SyncService) syncTransactionsToTip() error { + sync := func() (*uint64, error) { + return s.syncTransactions(s.backend) + } + check := func() (*uint64, error) { + return s.client.GetLatestTransactionIndex(s.backend) + } + if err := s.syncToTip(sync, check); err != nil { + return fmt.Errorf("Verifier cannot sync transactions with backend %s: %w", s.backend.String(), err) } return nil } @@ -566,23 +587,36 @@ func (s *SyncService) applyHistoricalTransaction(tx *types.Transaction) error { // applyTransactionToTip will do sanity checks on the transaction before // applying it to the tip. It blocks until the transaction has been included in -// the chain. +// the chain. It is assumed that validation around the index has already +// happened. func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { if tx == nil { return errors.New("nil transaction passed to applyTransactionToTip") } + // If there is no OVM timestamp assigned to the transaction, then assign a + // timestamp and blocknumber to it. This should only be the case for queue + // origin sequencer transactions that come in via RPC. The L1 to L2 + // transactions that come in via `enqueue` should have a timestamp set based + // on the L1 block that it was included in. + // Note that Ethereum Layer one consensus rules dictate that the timestamp + // must be strictly increasing between blocks, so no need to check both the + // timestamp and the blocknumber. if tx.L1Timestamp() == 0 { ts := s.GetLatestL1Timestamp() bn := s.GetLatestL1BlockNumber() tx.SetL1Timestamp(ts) tx.SetL1BlockNumber(bn) } else if tx.L1Timestamp() > s.GetLatestL1Timestamp() { + // If the timestamp of the transaction is greater than the sync + // service's locally maintained timestamp, update the timestamp and + // blocknumber to equal that of the transaction's. This should happend + // with `enqueue` transactions. ts := tx.L1Timestamp() bn := tx.L1BlockNumber() s.SetLatestL1Timestamp(ts) s.SetLatestL1BlockNumber(bn.Uint64()) + log.Debug("Updating OVM context based on new transaction", "timestamp", ts, "blocknumber", bn.Uint64()) } else if tx.L1Timestamp() < s.GetLatestL1Timestamp() { - // TODO: this should force a reorg log.Error("Timestamp monotonicity violation", "hash", tx.Hash().Hex()) } @@ -599,7 +633,7 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { s.SetLatestEnqueueIndex(tx.GetMeta().QueueIndex) } // The index was set above so it is safe to dereference - log.Trace("Applying transaction to tip", "index", *tx.GetMeta().Index) + log.Debug("Applying transaction to tip", "index", *tx.GetMeta().Index, "hash", tx.Hash().Hex()) // This is a temporary fix for a bug in the SequencerEntrypoint. It will // be removed when the custom batch serialization is removed in favor of @@ -609,7 +643,7 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { txs := types.Transactions{tx} s.txFeed.Send(core.NewTxsEvent{Txs: txs}) // Block until the transaction has been added to the chain - log.Debug("Waiting for transaction to be added to chain", "hash", tx.Hash().Hex()) + log.Trace("Waiting for transaction to be added to chain", "hash", tx.Hash().Hex()) <-s.chainHeadCh return nil @@ -683,100 +717,110 @@ func (s *SyncService) ValidateAndApplySequencerTransaction(tx *types.Transaction return s.applyTransaction(tx) } -// syncTransactionsToTip will sync all of the transactions to the tip. -// The Backend determines the source of the transactions. -func (s *SyncService) syncTransactionsToTip(backend Backend) error { +// syncer represents a function that can sync remote items and then returns the +// index that it synced to as well as an error if it encountered one. It has +// side effects on the state and its functionality depends on the current state +type syncer func() (*uint64, error) + +// rangeSyncer represents a function that syncs a range of items between its two +// arguments (inclusive) +type rangeSyncer func(uint64, uint64) error + +// nextGetter is a type that represents a function that will return the next +// index +type nextGetter func() uint64 + +// indexGetter is a type that represents a function that returns an index and an +// error if there is a problem fetching the index. The different types of +// indices are canonical transaction chain indices, queue indices and batch +// indices. It does not induce side effects on state +type indexGetter func() (*uint64, error) + +// isAtTip is a function that will determine if the local chain is at the tip +// of the remote datasource +func (s *SyncService) isAtTip(index *uint64, get indexGetter) (bool, error) { + latest, err := get() + if errors.Is(err, errElementNotFound) { + if index == nil { + return true, nil + } + return false, nil + } + if err != nil { + return false, err + } + // There are no known enqueue transactions locally or remotely + if latest == nil && index == nil { + return true, nil + } + // Only one of the transactions are nil due to the check above so they + // cannot be equal + if latest == nil || index == nil { + return false, nil + } + // The indices are equal + if *latest == *index { + return true, nil + } + // The indices are not equal + return false, nil +} + +// syncToTip is a function that can be used to sync to the tip of an ordered +// list of things. It is used to sync transactions, enqueue elements and batches +func (s *SyncService) syncToTip(sync syncer, getTip indexGetter) error { s.loopLock.Lock() defer s.loopLock.Unlock() for { - latest, err := s.client.GetLatestTransaction(backend) + index, err := sync() if errors.Is(err, errElementNotFound) { - log.Info("No transactions to sync") return nil } if err != nil { - return fmt.Errorf("Cannot get latest transaction: %w", err) - } - latestIndex := latest.GetMeta().Index - if latestIndex == nil { - return errors.New("Latest index is nil") - } - - nextIndex := s.GetNextIndex() - if nextIndex-1 == *latestIndex { - log.Info("No new transactions to sync", "tip", *latestIndex, "backend", backend.String()) - return nil - } - if err := s.syncTransactionRange(nextIndex, *latestIndex, backend); err != nil { return err } - - post, err := s.client.GetLatestTransaction(backend) + isAtTip, err := s.isAtTip(index, getTip) if err != nil { - return fmt.Errorf("Cannot get latest transaction: %w", err) - } - postLatestIndex := post.GetMeta().Index - if postLatestIndex == nil { - return errors.New("Latest index is nil") + return err } - - if *postLatestIndex == *latestIndex { + if isAtTip { return nil } } } -// syncTransactionRange will sync a range of transactions from -// start to end (inclusive) from a specific Backend -func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) error { - log.Info("Syncing transactions to tip", "start", start, "end", end) - for i := start; i <= end; i++ { - tx, err := s.client.GetTransaction(i, backend) - if err != nil { - return fmt.Errorf("cannot fetch transaction %d: %w", i, err) - } - if err = s.applyTransaction(tx); err != nil { - return fmt.Errorf("Cannot apply transaction: %w", err) - } +// sync will sync a range of items +func (s *SyncService) sync(getLatest indexGetter, getNext nextGetter, syncer rangeSyncer) (*uint64, error) { + latestIndex, err := getLatest() + if errors.Is(err, errElementNotFound) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("Cannot sync: %w", err) + } + if latestIndex == nil { + return nil, errors.New("Latest index is not defined") } - return nil -} - -// syncTransactionBatchesToTip will sync all of the transaction batches to the -// tip -func (s *SyncService) syncTransactionBatchesToTip() error { - s.loopLock.Lock() - defer s.loopLock.Unlock() - log.Debug("Syncing transaction batches to tip") - - for { - latest, _, err := s.client.GetLatestTransactionBatch() - if errors.Is(err, errElementNotFound) { - log.Info("No transaction batches to sync") - return nil - } - if err != nil { - return fmt.Errorf("Cannot get latest transaction batch: %w", err) - } - nextIndex := s.GetNextBatchIndex() - if nextIndex-1 == latest.Index { - log.Info("No new batches to sync", "tip", latest.Index) - return nil - } - if err := s.syncTransactionBatchRange(nextIndex, latest.Index); err != nil { - return err - } + nextIndex := getNext() + if nextIndex == *latestIndex+1 { + return latestIndex, nil + } + if err := syncer(nextIndex, *latestIndex); err != nil { + return nil, err + } + return latestIndex, nil +} - post, _, err := s.client.GetLatestTransactionBatch() - if err != nil { - return fmt.Errorf("Cannot get latest transaction batch: %w", err) - } - if post.Index == latest.Index { - return nil - } +// syncBatches will sync a range of batches from the current known tip to the +// remote tip. +func (s *SyncService) syncBatches() (*uint64, error) { + index, err := s.sync(s.client.GetLatestTransactionBatchIndex, s.GetNextBatchIndex, s.syncTransactionBatchRange) + if err != nil { + return nil, fmt.Errorf("Cannot sync batches: %w", err) } + return index, nil } // syncTransactionBatchRange will sync a range of batched transactions from @@ -799,53 +843,20 @@ func (s *SyncService) syncTransactionBatchRange(start, end uint64) error { return nil } -// syncQueueToTip will sync the `enqueue` transactions to the tip -// from the last known `enqueue` transaction -func (s *SyncService) syncQueueToTip() error { - s.loopLock.Lock() - defer s.loopLock.Unlock() - - for { - latest, err := s.client.GetLatestEnqueue() - if errors.Is(err, errElementNotFound) { - log.Info("No queue transactions to sync") - return nil - } - if err != nil { - return fmt.Errorf("Cannot get latest enqueue transaction: %w", err) - } - latestIndex := latest.GetMeta().QueueIndex - if latestIndex == nil { - return errors.New("Latest queue transaction has no queue index") - } - - nextIndex := s.GetNextEnqueueIndex() - if nextIndex-1 == *latestIndex { - log.Info("No new queue transactions to sync", "tip", *latestIndex) - return nil - } - if err := s.syncQueueTransactionRange(nextIndex, *latestIndex); err != nil { - return err - } - - post, err := s.client.GetLatestEnqueue() - if err != nil { - return fmt.Errorf("Cannot get latest transaction: %w", err) - } - postLatestIndex := post.GetMeta().QueueIndex - if postLatestIndex == nil { - return errors.New("Latest queue index is nil") - } - if *latestIndex == *postLatestIndex { - return nil - } +// syncQueue will sync from the local tip to the known tip of the remote +// enqueue transaction feed. +func (s *SyncService) syncQueue() (*uint64, error) { + index, err := s.sync(s.client.GetLatestEnqueueIndex, s.GetNextEnqueueIndex, s.syncQueueTransactionRange) + if err != nil { + return nil, fmt.Errorf("Cannot sync queue: %w", err) } + return index, nil } // syncQueueTransactionRange will apply a range of queue transactions from // start to end (inclusive) func (s *SyncService) syncQueueTransactionRange(start, end uint64) error { - log.Info("Syncing enqueue transactions to tip", "start", start, "end", end) + log.Info("Syncing enqueue transactions range", "start", start, "end", end) for i := start; i <= end; i++ { tx, err := s.client.GetEnqueue(i) if err != nil { @@ -858,6 +869,38 @@ func (s *SyncService) syncQueueTransactionRange(start, end uint64) error { return nil } +// syncTransactions will sync transactions to the remote tip based on the +// backend +func (s *SyncService) syncTransactions(backend Backend) (*uint64, error) { + getLatest := func() (*uint64, error) { + return s.client.GetLatestTransactionIndex(backend) + } + sync := func(start, end uint64) error { + return s.syncTransactionRange(start, end, backend) + } + index, err := s.sync(getLatest, s.GetNextIndex, sync) + if err != nil { + return nil, fmt.Errorf("Cannot sync transactions with backend %s: %w", backend.String(), err) + } + return index, nil +} + +// syncTransactionRange will sync a range of transactions from +// start to end (inclusive) from a specific Backend +func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) error { + log.Info("Syncing transaction range", "start", start, "end", end) + for i := start; i <= end; i++ { + tx, err := s.client.GetTransaction(i, backend) + if err != nil { + return fmt.Errorf("cannot fetch transaction %d: %w", i, err) + } + if err = s.applyTransaction(tx); err != nil { + return fmt.Errorf("Cannot apply transaction: %w", err) + } + } + return nil +} + // updateEthContext will update the OVM execution context's // timestamp and blocknumber if enough time has passed since // it was last updated. This is a sequencer only function. diff --git a/l2geth/rollup/sync_service_test.go b/l2geth/rollup/sync_service_test.go index 29cc9e493c5b..fa298766ca49 100644 --- a/l2geth/rollup/sync_service_test.go +++ b/l2geth/rollup/sync_service_test.go @@ -2,6 +2,7 @@ package rollup import ( "context" + "crypto/rand" "errors" "fmt" "math/big" @@ -153,6 +154,119 @@ func TestSyncServiceTransactionEnqueued(t *testing.T) { } } +func TestTransactionToTipNoIndex(t *testing.T) { + service, txCh, _, err := newTestSyncService(false) + if err != nil { + t.Fatal(err) + } + + // Get a reference to the current next index to compare with the index that + // is set to the transaction that is ingested + nextIndex := service.GetNextIndex() + + timestamp := uint64(24) + target := common.HexToAddress("0x04668ec2f57cc15c381b461b9fedab5d451c8f7f") + l1TxOrigin := common.HexToAddress("0xEA674fdDe714fd979de3EdF0F56AA9716B898ec8") + gasLimit := uint64(66) + data := []byte{0x02, 0x92} + l1BlockNumber := big.NewInt(100) + + tx := types.NewTransaction(0, target, big.NewInt(0), gasLimit, big.NewInt(0), data) + meta := types.NewTransactionMeta( + l1BlockNumber, + timestamp, + &l1TxOrigin, + types.SighashEIP155, + types.QueueOriginL1ToL2, + nil, // The index is `nil`, expect it to be set afterwards + nil, + nil, + ) + tx.SetTransactionMeta(meta) + + go func() { + err = service.applyTransactionToTip(tx) + }() + event := <-txCh + if err != nil { + t.Fatal("Cannot apply transaction to the tip") + } + confirmed := event.Txs[0] + // The transaction was applied without an index so the chain gave it the + // next index + index := confirmed.GetMeta().Index + if index == nil { + t.Fatal("Did not set index after applying tx to tip") + } + if *index != *service.GetLatestIndex() { + t.Fatal("Incorrect latest index") + } + if *index != nextIndex { + t.Fatal("Incorrect index") + } +} + +func TestTransactionToTipTimestamps(t *testing.T) { + service, txCh, _, err := newTestSyncService(false) + if err != nil { + t.Fatal(err) + } + + // Create two mock transactions with `nil` indices. This will allow + // assertions around the indices being updated correctly. Set the timestamp + // to 1 and 2 and assert that the timestamps in the sync service are updated + // correctly + tx1 := setMockTxL1Timestamp(mockTx(), 1) + tx2 := setMockTxL1Timestamp(mockTx(), 2) + + txs := []*types.Transaction{ + tx1, + tx2, + } + + for i, tx := range txs { + go func() { + err = service.applyTransactionToTip(tx) + }() + event := <-txCh + if err != nil { + t.Fatal(err) + } + + conf := event.Txs[0] + // The index should be set to the next + if conf.GetMeta().Index == nil { + t.Fatal("Index is nil") + } + // The indexes should be incrementing by 1 + if *conf.GetMeta().Index != uint64(i) { + t.Fatal("Mismatched index") + } + // The index that the sync service is tracking should be updated + if *conf.GetMeta().Index != *service.GetLatestIndex() { + t.Fatal("Mismatched index") + } + // The tx timestamp should be setting the services timestamp + ts := service.GetLatestL1Timestamp() + if conf.L1Timestamp() != ts { + t.Fatal("Mismatched timestamp") + } + } + + // Send a transaction with no timestamp and then let it be updated + // by the sync service. This will prevent monotonicity errors as well + // as give timestamps to queue origin sequencer transactions + ts := service.GetLatestL1Timestamp() + tx3 := setMockTxL1Timestamp(mockTx(), 0) + go func() { + err = service.applyTransactionToTip(tx3) + }() + result := <-txCh + if result.Txs[0].L1Timestamp() != ts { + t.Fatal("Timestamp not updated correctly") + } +} + func TestSyncServiceL1GasPrice(t *testing.T) { service, _, _, err := newTestSyncService(true) setupMockClient(service, map[string]interface{}{}) @@ -221,7 +335,7 @@ func TestSyncServiceSync(t *testing.T) { err = nil go func() { - err = service.syncTransactionsToTip(BackendL2) + err = service.syncTransactionsToTip() }() event := <-txCh if err != nil { @@ -340,13 +454,15 @@ func newTestSyncService(isVerifier bool) (*SyncService, chan core.NewTxsEvent, e } type mockClient struct { - getEnqueueCallCount int - getEnqueue []*types.Transaction - getTransactionCallCount int - getTransaction []*types.Transaction - getEthContextCallCount int - getEthContext []*EthContext - getLatestEthContext *EthContext + getEnqueueCallCount int + getEnqueue []*types.Transaction + getTransactionCallCount int + getTransaction []*types.Transaction + getEthContextCallCount int + getEthContext []*EthContext + getLatestEthContext *EthContext + getLatestEnqueueIndex []func() (*uint64, error) + getLatestEnqueueIndexCallCount int } func setupMockClient(service *SyncService, responses map[string]interface{}) { @@ -360,6 +476,7 @@ func newMockClient(responses map[string]interface{}) *mockClient { getTransactionResponses := []*types.Transaction{} getEthContextResponses := []*EthContext{} getLatestEthContextResponse := &EthContext{} + getLatestEnqueueIndexResponses := []func() (*uint64, error){} enqueue, ok := responses["GetEnqueue"] if ok { @@ -377,11 +494,17 @@ func newMockClient(responses map[string]interface{}) *mockClient { if ok { getLatestEthContextResponse = getLatestCtx.(*EthContext) } + getLatestEnqueueIdx, ok := responses["GetLatestEnqueueIndex"] + if ok { + getLatestEnqueueIndexResponses = getLatestEnqueueIdx.([]func() (*uint64, error)) + } + return &mockClient{ - getEnqueue: getEnqueueResponses, - getTransaction: getTransactionResponses, - getEthContext: getEthContextResponses, - getLatestEthContext: getLatestEthContextResponse, + getEnqueue: getEnqueueResponses, + getTransaction: getTransactionResponses, + getEthContext: getEthContextResponses, + getLatestEthContext: getLatestEthContextResponse, + getLatestEnqueueIndex: getLatestEnqueueIndexResponses, } } @@ -407,12 +530,12 @@ func (m *mockClient) GetTransaction(index uint64, backend Backend) (*types.Trans m.getTransactionCallCount++ return tx, nil } - return nil, errors.New("") + return nil, fmt.Errorf("Cannot get transaction: mocks (%d), call count (%d)", len(m.getTransaction), m.getTransactionCallCount) } func (m *mockClient) GetLatestTransaction(backend Backend) (*types.Transaction, error) { if len(m.getTransaction) == 0 { - return nil, errors.New("") + return nil, errors.New("No transactions") } return m.getTransaction[len(m.getTransaction)-1], nil } @@ -423,7 +546,7 @@ func (m *mockClient) GetEthContext(index uint64) (*EthContext, error) { m.getEthContextCallCount++ return ctx, nil } - return nil, errors.New("") + return nil, errors.New("Cannot get eth context") } func (m *mockClient) GetLatestEthContext() (*EthContext, error) { @@ -451,3 +574,66 @@ func (m *mockClient) SyncStatus(backend Backend) (*SyncStatus, error) { func (m *mockClient) GetL1GasPrice() (*big.Int, error) { return big.NewInt(100 * int64(params.GWei)), nil } + +func (m *mockClient) GetLatestEnqueueIndex() (*uint64, error) { + enqueue, err := m.GetLatestEnqueue() + if err != nil { + return nil, err + } + if enqueue == nil { + return nil, errElementNotFound + } + return enqueue.GetMeta().Index, nil +} + +func (m *mockClient) GetLatestTransactionBatchIndex() (*uint64, error) { + return nil, nil +} + +func (m *mockClient) GetLatestTransactionIndex(backend Backend) (*uint64, error) { + tx, err := m.GetLatestTransaction(backend) + if err != nil { + return nil, err + } + return tx.GetMeta().Index, nil +} + +func newUint64(n uint64) *uint64 { + return &n +} + +func mockTx() *types.Transaction { + address := make([]byte, 20) + rand.Read(address) + + target := common.BytesToAddress(address) + timestamp := uint64(0) + + rand.Read(address) + l1TxOrigin := common.BytesToAddress(address) + + gasLimit := uint64(0) + data := []byte{0x00, 0x00} + l1BlockNumber := big.NewInt(0) + + tx := types.NewTransaction(0, target, big.NewInt(0), gasLimit, big.NewInt(0), data) + meta := types.NewTransactionMeta( + l1BlockNumber, + timestamp, + &l1TxOrigin, + types.SighashEIP155, + types.QueueOriginL1ToL2, + nil, + nil, + nil, + ) + tx.SetTransactionMeta(meta) + return tx +} + +func setMockTxL1Timestamp(tx *types.Transaction, ts uint64) *types.Transaction { + meta := tx.GetMeta() + meta.L1Timestamp = ts + tx.SetTransactionMeta(meta) + return tx +} From c3bedee70d7a0ff742df70882e3c4dd75ebc7953 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 12 May 2021 15:13:44 -0700 Subject: [PATCH 12/29] sync-service: better logline --- l2geth/rollup/sync_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index d20dc8ab5fd3..d1f991aef376 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -613,7 +613,7 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { bn := tx.L1BlockNumber() s.SetLatestL1Timestamp(ts) s.SetLatestL1BlockNumber(bn.Uint64()) - log.Debug("Updating OVM context based on new transaction", "timestamp", ts, "blocknumber", bn.Uint64()) + log.Debug("Updating OVM context based on new transaction", "timestamp", ts, "blocknumber", bn.Uint64(), "queue-origin", tx.QueueOrigin().Uint64()) } else if tx.L1Timestamp() < s.GetLatestL1Timestamp() { log.Error("Timestamp monotonicity violation", "hash", tx.Hash().Hex()) } From e8a350d1371f97aa0a90f1f89033ee4d5e36198c Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 12 May 2021 15:41:07 -0700 Subject: [PATCH 13/29] l2geth: better logline --- l2geth/rollup/sync_service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index d1f991aef376..001e832591c3 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -59,9 +59,9 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co _ = cancel // satisfy govet if cfg.IsVerifier { - log.Info("Running in verifier mode", "sync-type", cfg.Backend.String()) + log.Info("Running in verifier mode", "sync-backend", cfg.Backend.String()) } else { - log.Info("Running in sequencer mode", "sync-type", cfg.Backend.String()) + log.Info("Running in sequencer mode", "sync-backend", cfg.Backend.String()) } pollInterval := cfg.PollInterval From db1f5470a0e9a093c38921b13c8968d438318694 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 12 May 2021 16:03:16 -0700 Subject: [PATCH 14/29] l2geth: test apply indexed transaction --- l2geth/rollup/sync_service_test.go | 50 ++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/l2geth/rollup/sync_service_test.go b/l2geth/rollup/sync_service_test.go index fa298766ca49..e6d8ec18a702 100644 --- a/l2geth/rollup/sync_service_test.go +++ b/l2geth/rollup/sync_service_test.go @@ -267,6 +267,49 @@ func TestTransactionToTipTimestamps(t *testing.T) { } } +func TestApplyIndexedTransaction(t *testing.T) { + service, txCh, _, err := newTestSyncService(true) + if err != nil { + t.Fatal(err) + } + + // Create three transactions, two of which have a dupliate index. + // The first two transactions can be ingested without a problem and the + // third transaction has a duplicate index so it will not be ingested. + // Expect an error for the third transaction and expect the SyncService + // global index to be updated with the first two transactions + tx0 := setMockTxIndex(mockTx(), 0) + tx1 := setMockTxIndex(mockTx(), 1) + tx1a := setMockTxIndex(mockTx(), 1) + + go func() { + err = service.applyIndexedTransaction(tx0) + }() + <-txCh + if err != nil { + t.Fatal(err) + } + if *tx0.GetMeta().Index != *service.GetLatestIndex() { + t.Fatal("Latest index mismatch") + } + + go func() { + err = service.applyIndexedTransaction(tx1) + }() + <-txCh + if err != nil { + t.Fatal(err) + } + if *tx1.GetMeta().Index != *service.GetLatestIndex() { + t.Fatal("Latest index mismatch") + } + + err = service.applyIndexedTransaction(tx1a) + if err == nil { + t.Fatal(err) + } +} + func TestSyncServiceL1GasPrice(t *testing.T) { service, _, _, err := newTestSyncService(true) setupMockClient(service, map[string]interface{}{}) @@ -637,3 +680,10 @@ func setMockTxL1Timestamp(tx *types.Transaction, ts uint64) *types.Transaction { tx.SetTransactionMeta(meta) return tx } + +func setMockTxIndex(tx *types.Transaction, index uint64) *types.Transaction { + meta := tx.GetMeta() + meta.Index = &index + tx.SetTransactionMeta(meta) + return tx +} From 084970308500bc85e61bfedbe8de5926f922df19 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 12 May 2021 16:03:31 -0700 Subject: [PATCH 15/29] l2geth: better logline --- l2geth/rollup/sync_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 001e832591c3..c3a4a335d42f 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -801,7 +801,7 @@ func (s *SyncService) syncBatches() (*uint64, error) { // syncTransactionBatchRange will sync a range of batched transactions from // start to end (inclusive) func (s *SyncService) syncTransactionBatchRange(start, end uint64) error { - log.Debug("Syncing transaction batch range", "start", start, "end", end) + log.Info("Syncing transaction batch range", "start", start, "end", end) for i := start; i <= end; i++ { log.Debug("Fetching transaction batch", "index", i) _, txs, err := s.client.GetTransactionBatch(i) From 059d7937159843d9d67c1e3cb845fbfa86e6e8d5 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 12 May 2021 16:12:58 -0700 Subject: [PATCH 16/29] linting: fix --- l2geth/rollup/sync_service.go | 2 +- l2geth/rollup/sync_service_test.go | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index c3a4a335d42f..c66449c71ac9 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -607,7 +607,7 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { } else if tx.L1Timestamp() > s.GetLatestL1Timestamp() { // If the timestamp of the transaction is greater than the sync // service's locally maintained timestamp, update the timestamp and - // blocknumber to equal that of the transaction's. This should happend + // blocknumber to equal that of the transaction's. This should happen // with `enqueue` transactions. ts := tx.L1Timestamp() bn := tx.L1BlockNumber() diff --git a/l2geth/rollup/sync_service_test.go b/l2geth/rollup/sync_service_test.go index e6d8ec18a702..2312e803df43 100644 --- a/l2geth/rollup/sync_service_test.go +++ b/l2geth/rollup/sync_service_test.go @@ -641,10 +641,6 @@ func (m *mockClient) GetLatestTransactionIndex(backend Backend) (*uint64, error) return tx.GetMeta().Index, nil } -func newUint64(n uint64) *uint64 { - return &n -} - func mockTx() *types.Transaction { address := make([]byte, 20) rand.Read(address) From 913a14dcc93890cd10060c51390833b1f0ce7fae Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 12 May 2021 16:43:46 -0700 Subject: [PATCH 17/29] syncservice: fix logline --- l2geth/rollup/sync_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index c66449c71ac9..efc176f76037 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -863,7 +863,7 @@ func (s *SyncService) syncTransactions(backend Backend) (*uint64, error) { // syncTransactionRange will sync a range of transactions from // start to end (inclusive) from a specific Backend func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) error { - log.Info("Syncing transaction range", "start", start, "end", end) + log.Info("Syncing transaction range", "start", start, "end", end, "backend", backend.String()) for i := start; i <= end; i++ { tx, err := s.client.GetTransaction(i, backend) if err != nil { From 51ef245277bfb3e85b664701692eb8fa21a085c4 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 12 May 2021 18:18:43 -0700 Subject: [PATCH 18/29] l2geth: add error and fix logline --- l2geth/rollup/sync_service.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index efc176f76037..8b5ad3a21237 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -21,6 +21,10 @@ import ( "github.com/ethereum/go-ethereum/eth/gasprice" ) +// errShortRemoteTip is an error for when the remote tip is shorter than the +// local tip +var errShortRemoteTip = errors.New("Unexpected remote less than tip") + // SyncService implements the main functionality around pulling in transactions // and executing them. It can be configured to run in both sequencer mode and in // verifier mode. @@ -576,7 +580,7 @@ func (s *SyncService) applyHistoricalTransaction(tx *types.Transaction) error { return fmt.Errorf("More than one transaction found in block %d", *index+1) } if !isCtcTxEqual(tx, txs[0]) { - log.Error("Mismatched transaction", "index", index) + log.Error("Mismatched transaction", "index", *index) } else { log.Debug("Historical transaction matches", "index", *index, "hash", tx.Hash().Hex()) } @@ -737,6 +741,10 @@ func (s *SyncService) isAtTip(index *uint64, get indexGetter) (bool, error) { if *latest == *index { return true, nil } + // The local tip is greater than the remote tip. This should never happen + if *latest < *index { + return false, fmt.Errorf("is at tip mismatch: remote (%d) - local (%d): %w", *latest, *index, errShortRemoteTip) + } // The indices are not equal return false, nil } From 8b5ead5a0797b9e54bb7c2f970fdc5c206fad2d1 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 12 May 2021 18:19:07 -0700 Subject: [PATCH 19/29] l2geth: sync service tests --- l2geth/rollup/sync_service_test.go | 184 ++++++++++++++++++++++++++++- 1 file changed, 182 insertions(+), 2 deletions(-) diff --git a/l2geth/rollup/sync_service_test.go b/l2geth/rollup/sync_service_test.go index 2312e803df43..138dab75f0dd 100644 --- a/l2geth/rollup/sync_service_test.go +++ b/l2geth/rollup/sync_service_test.go @@ -7,7 +7,9 @@ import ( "fmt" "math/big" "reflect" + "sync" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" @@ -310,6 +312,173 @@ func TestApplyIndexedTransaction(t *testing.T) { } } +func TestApplyBatchedTransaction(t *testing.T) { + service, txCh, _, err := newTestSyncService(true) + if err != nil { + t.Fatal(err) + } + + // Create a transactoin with the index of 0 + tx0 := setMockTxIndex(mockTx(), 0) + + // Ingest through applyBatchedTransaction which should set the latest + // verified index to the index of the transaction + go func() { + err = service.applyBatchedTransaction(tx0) + }() + service.chainHeadCh <- core.ChainHeadEvent{} + <-txCh + + // Catch race conditions with the database write + wg := new(sync.WaitGroup) + wg.Add(1) + go func() { + for { + if service.GetLatestVerifiedIndex() != nil { + wg.Done() + return + } + time.Sleep(100 * time.Millisecond) + } + }() + wg.Wait() + + // Assert that the verified index is the same as the transaction index + if *tx0.GetMeta().Index != *service.GetLatestVerifiedIndex() { + t.Fatal("Latest verified index mismatch") + } +} + +func TestIsAtTip(t *testing.T) { + service, _, _, err := newTestSyncService(true) + if err != nil { + t.Fatal(err) + } + + data := []struct { + tip *uint64 + get indexGetter + expect bool + err error + }{ + { + tip: newUint64(1), + get: func() (*uint64, error) { + return newUint64(1), nil + }, + expect: true, + err: nil, + }, + { + tip: newUint64(0), + get: func() (*uint64, error) { + return newUint64(1), nil + }, + expect: false, + err: nil, + }, + { + tip: newUint64(1), + get: func() (*uint64, error) { + return newUint64(0), nil + }, + expect: false, + err: errShortRemoteTip, + }, + { + tip: nil, + get: func() (*uint64, error) { + return nil, nil + }, + expect: true, + err: nil, + }, + { + tip: nil, + get: func() (*uint64, error) { + return nil, errElementNotFound + }, + expect: true, + err: nil, + }, + { + tip: newUint64(0), + get: func() (*uint64, error) { + return nil, errElementNotFound + }, + expect: false, + err: nil, + }, + } + + for _, d := range data { + isAtTip, err := service.isAtTip(d.tip, d.get) + if isAtTip != d.expect { + t.Fatal("expected does not match") + } + if !errors.Is(err, d.err) { + t.Fatal("error no match") + } + } +} + +func TestSyncQueue(t *testing.T) { + service, txCh, _, err := newTestSyncService(true) + if err != nil { + t.Fatal(err) + } + + setupMockClient(service, map[string]interface{}{ + "GetEnqueue": []*types.Transaction{ + setMockQueueIndex(mockTx(), 0), + setMockQueueIndex(mockTx(), 1), + setMockQueueIndex(mockTx(), 2), + setMockQueueIndex(mockTx(), 3), + }, + }) + + var tip *uint64 + go func() { + tip, err = service.syncQueue() + }() + + for i := 0; i < 4; i++ { + service.chainHeadCh <- core.ChainHeadEvent{} + event := <-txCh + tx := event.Txs[0] + if *tx.GetMeta().QueueIndex != uint64(i) { + t.Fatal("queue index mismatch") + } + } + + wg := new(sync.WaitGroup) + wg.Add(1) + go func() { + for { + if tip != nil { + wg.Done() + return + } + time.Sleep(100 * time.Millisecond) + } + }() + wg.Wait() + if tip == nil { + t.Fatal("tip is nil") + } + // There were a total of 4 transactions synced and the indexing starts at 0 + if *service.GetLatestIndex() != 3 { + t.Fatalf("Latest index mismatch") + } + // All of the transactions are `enqueue()`s + if *service.GetLatestEnqueueIndex() != 3 { + t.Fatal("Latest queue index mismatch") + } + if *tip != 3 { + t.Fatal("Tip mismatch") + } +} + func TestSyncServiceL1GasPrice(t *testing.T) { service, _, _, err := newTestSyncService(true) setupMockClient(service, map[string]interface{}{}) @@ -562,7 +731,7 @@ func (m *mockClient) GetEnqueue(index uint64) (*types.Transaction, error) { func (m *mockClient) GetLatestEnqueue() (*types.Transaction, error) { if len(m.getEnqueue) == 0 { - return &types.Transaction{}, errors.New("") + return &types.Transaction{}, errors.New("enqueue not found") } return m.getEnqueue[len(m.getEnqueue)-1], nil } @@ -626,7 +795,7 @@ func (m *mockClient) GetLatestEnqueueIndex() (*uint64, error) { if enqueue == nil { return nil, errElementNotFound } - return enqueue.GetMeta().Index, nil + return enqueue.GetMeta().QueueIndex, nil } func (m *mockClient) GetLatestTransactionBatchIndex() (*uint64, error) { @@ -683,3 +852,14 @@ func setMockTxIndex(tx *types.Transaction, index uint64) *types.Transaction { tx.SetTransactionMeta(meta) return tx } + +func setMockQueueIndex(tx *types.Transaction, index uint64) *types.Transaction { + meta := tx.GetMeta() + meta.QueueIndex = &index + tx.SetTransactionMeta(meta) + return tx +} + +func newUint64(n uint64) *uint64 { + return &n +} From cac3daff67f8e5d709e56fd03eaac197c4e0db3e Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 12 May 2021 15:57:43 -0700 Subject: [PATCH 20/29] fix: get last confirmed enqueue (#846) * l2geth: fix get last confirmed enqueue * chore: add changeset * client: return error correctly --- .changeset/big-moose-type.md | 5 +++++ l2geth/rollup/client.go | 9 ++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) create mode 100644 .changeset/big-moose-type.md diff --git a/.changeset/big-moose-type.md b/.changeset/big-moose-type.md new file mode 100644 index 000000000000..e87e713c4622 --- /dev/null +++ b/.changeset/big-moose-type.md @@ -0,0 +1,5 @@ +--- +'@eth-optimism/l2geth': patch +--- + +Fixes a bug in L2geth that causes it to skip the first deposit if there have been no deposits batch-submitted yet diff --git a/l2geth/rollup/client.go b/l2geth/rollup/client.go index fb5fdf7f33ff..5fbb134af5ad 100644 --- a/l2geth/rollup/client.go +++ b/l2geth/rollup/client.go @@ -495,23 +495,26 @@ func (c *Client) GetLastConfirmedEnqueue() (*types.Transaction, error) { if err != nil { return nil, fmt.Errorf("Cannot get latest enqueue: %w", err) } - // This should only happen if the database is empty + // This should only happen if there are no L1 to L2 transactions yet if enqueue == nil { - return nil, nil + return nil, errElementNotFound } // Work backwards looking for the first enqueue // to have an index, which means it has been included // in the canonical transaction chain. for { meta := enqueue.GetMeta() + // The enqueue has an index so it has been confirmed if meta.Index != nil { return enqueue, nil } + // There is no queue index so this is a bug if meta.QueueIndex == nil { return nil, fmt.Errorf("queue index is nil") } + // No enqueue transactions have been confirmed yet if *meta.QueueIndex == uint64(0) { - return enqueue, nil + return nil, errElementNotFound } next, err := c.GetEnqueue(*meta.QueueIndex - 1) if err != nil { From 53b12c5b92d589b722b441d6bee094a4a08a2bc8 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Wed, 12 May 2021 20:56:47 -0700 Subject: [PATCH 21/29] batch-submitter: updated config (#847) * batch-submitter: backwards compatible configuration * chore: add changeset * deps: update * js: move bcfg interface to core-utils * batch-submitter: parse USE_SENTRY and add to env example * chore: add changeset * batch-submitter: parse as float instead of int * batch-submitter: better error logging --- .changeset/lovely-plants-clean.md | 7 + .changeset/soft-squids-switch.md | 5 + packages/batch-submitter/.env.example | 1 + packages/batch-submitter/package.json | 1 + .../src/exec/run-batch-submitter.ts | 340 +++++++++++------- packages/core-utils/src/bcfg.ts | 8 + packages/core-utils/src/index.ts | 1 + .../data-transport-layer/src/services/run.ts | 11 +- packages/message-relayer/src/exec/run.ts | 9 +- 9 files changed, 244 insertions(+), 139 deletions(-) create mode 100644 .changeset/lovely-plants-clean.md create mode 100644 .changeset/soft-squids-switch.md create mode 100644 packages/core-utils/src/bcfg.ts diff --git a/.changeset/lovely-plants-clean.md b/.changeset/lovely-plants-clean.md new file mode 100644 index 000000000000..6ea61e2d79ad --- /dev/null +++ b/.changeset/lovely-plants-clean.md @@ -0,0 +1,7 @@ +--- +'@eth-optimism/core-utils': patch +'@eth-optimism/data-transport-layer': patch +'@eth-optimism/message-relayer': patch +--- + +Migrate bcfg interface to core-utils diff --git a/.changeset/soft-squids-switch.md b/.changeset/soft-squids-switch.md new file mode 100644 index 000000000000..0ff4c22153fb --- /dev/null +++ b/.changeset/soft-squids-switch.md @@ -0,0 +1,5 @@ +--- +'@eth-optimism/batch-submitter': patch +--- + +Updates the configuration to use bcfg in a backwards compatible way diff --git a/packages/batch-submitter/.env.example b/packages/batch-submitter/.env.example index 54576ac4f2fe..fe7a77704cd2 100644 --- a/packages/batch-submitter/.env.example +++ b/packages/batch-submitter/.env.example @@ -7,6 +7,7 @@ DEBUG=info*,error*,warn*,debug* # Leave the SENTRY_DSN variable unset during local development SENTRY_DSN= SENTRY_TRACE_RATE= +USE_SENTRY= L1_NODE_WEB3_URL=http://localhost:9545 L2_NODE_WEB3_URL=http://localhost:8545 diff --git a/packages/batch-submitter/package.json b/packages/batch-submitter/package.json index 5ed68527cf99..9a887ab5c410 100644 --- a/packages/batch-submitter/package.json +++ b/packages/batch-submitter/package.json @@ -37,6 +37,7 @@ "@eth-optimism/ynatm": "^0.2.2", "@ethersproject/abstract-provider": "^5.0.5", "@ethersproject/providers": "^5.0.14", + "bcfg": "^0.1.6", "bluebird": "^3.7.2", "dotenv": "^8.2.0", "ethers": "5.0.0", diff --git a/packages/batch-submitter/src/exec/run-batch-submitter.ts b/packages/batch-submitter/src/exec/run-batch-submitter.ts index 00319f59330b..6ef7e8445287 100644 --- a/packages/batch-submitter/src/exec/run-batch-submitter.ts +++ b/packages/batch-submitter/src/exec/run-batch-submitter.ts @@ -1,11 +1,11 @@ /* External Imports */ -import { injectL2Context } from '@eth-optimism/core-utils' +import { injectL2Context, Bcfg } from '@eth-optimism/core-utils' import { Logger, Metrics } from '@eth-optimism/common-ts' import { exit } from 'process' import { Signer, Wallet } from 'ethers' import { JsonRpcProvider, TransactionReceipt } from '@ethersproject/providers' -import { config } from 'dotenv' -config() +import * as dotenv from 'dotenv' +import Config from 'bcfg' /* Internal Imports */ import { @@ -16,89 +16,41 @@ import { TX_BATCH_SUBMITTER_LOG_TAG, } from '..' -const environment = process.env.NODE_ENV -const network = process.env.ETH_NETWORK_NAME -const release = `batch-submitter@${process.env.npm_package_version}` - -/* Logger */ -const name = 'oe:batch_submitter:init' -let logger - -if (network) { - // Initialize Sentry for Batch Submitter deployed to a network - logger = new Logger({ - name, - sentryOptions: { - release, - dsn: process.env.SENTRY_DSN, - tracesSampleRate: parseInt(process.env.SENTRY_TRACE_RATE, 10) || 0.05, - environment: network, // separate our Sentry errors by network instead of node environment - }, - }) -} else { - // Skip initializing Sentry - logger = new Logger({ name }) -} - -/* Metrics */ -const metrics = new Metrics({ - prefix: name, - labels: { environment, release, network }, -}) - interface RequiredEnvVars { // The HTTP provider URL for L1. - L1_NODE_WEB3_URL: 'L1_NODE_WEB3_URL' + L1_NODE_WEB3_URL: string // The HTTP provider URL for L2. - L2_NODE_WEB3_URL: 'L2_NODE_WEB3_URL' + L2_NODE_WEB3_URL: string // The layer one address manager address - ADDRESS_MANAGER_ADDRESS: 'ADDRESS_MANAGER_ADDRESS' + ADDRESS_MANAGER_ADDRESS: string // The minimum size in bytes of any L1 transactions generated by the batch submitter. - MIN_L1_TX_SIZE: 'MIN_L1_TX_SIZE' + MIN_L1_TX_SIZE: number // The maximum size in bytes of any L1 transactions generated by the batch submitter. - MAX_L1_TX_SIZE: 'MAX_L1_TX_SIZE' + MAX_L1_TX_SIZE: number // The maximum number of L2 transactions that can ever be in a batch. - MAX_TX_BATCH_COUNT: 'MAX_TX_BATCH_COUNT' + MAX_TX_BATCH_COUNT: number // The maximum number of L2 state roots that can ever be in a batch. - MAX_STATE_BATCH_COUNT: 'MAX_STATE_BATCH_COUNT' + MAX_STATE_BATCH_COUNT: number // The maximum amount of time (seconds) that we will wait before submitting an under-sized batch. - MAX_BATCH_SUBMISSION_TIME: 'MAX_BATCH_SUBMISSION_TIME' + MAX_BATCH_SUBMISSION_TIME: number // The delay in milliseconds between querying L2 for more transactions / to create a new batch. - POLL_INTERVAL: 'POLL_INTERVAL' + POLL_INTERVAL: number // The number of confirmations which we will wait after appending new batches. - NUM_CONFIRMATIONS: 'NUM_CONFIRMATIONS' + NUM_CONFIRMATIONS: number // The number of seconds to wait before resubmitting a transaction. - RESUBMISSION_TIMEOUT: 'RESUBMISSION_TIMEOUT' + RESUBMISSION_TIMEOUT: number // The number of confirmations that we should wait before submitting state roots for CTC elements. - FINALITY_CONFIRMATIONS: 'FINALITY_CONFIRMATIONS' + FINALITY_CONFIRMATIONS: number // Whether or not to run the tx batch submitter. - RUN_TX_BATCH_SUBMITTER: 'true' | 'false' | 'RUN_TX_BATCH_SUBMITTER' + RUN_TX_BATCH_SUBMITTER: boolean // Whether or not to run the state batch submitter. - RUN_STATE_BATCH_SUBMITTER: 'true' | 'false' | 'RUN_STATE_BATCH_SUBMITTER' + RUN_STATE_BATCH_SUBMITTER: boolean // The safe minimum amount of ether the batch submitter key should // hold before it starts to log errors. - SAFE_MINIMUM_ETHER_BALANCE: 'SAFE_MINIMUM_ETHER_BALANCE' + SAFE_MINIMUM_ETHER_BALANCE: number // A boolean to clear the pending transactions in the mempool // on start up. - CLEAR_PENDING_TXS: 'true' | 'false' | 'CLEAR_PENDING_TXS' -} -const requiredEnvVars: RequiredEnvVars = { - L1_NODE_WEB3_URL: 'L1_NODE_WEB3_URL', - L2_NODE_WEB3_URL: 'L2_NODE_WEB3_URL', - ADDRESS_MANAGER_ADDRESS: 'ADDRESS_MANAGER_ADDRESS', - MIN_L1_TX_SIZE: 'MIN_L1_TX_SIZE', - MAX_L1_TX_SIZE: 'MAX_L1_TX_SIZE', - MAX_TX_BATCH_COUNT: 'MAX_TX_BATCH_COUNT', - MAX_STATE_BATCH_COUNT: 'MAX_STATE_BATCH_COUNT', - MAX_BATCH_SUBMISSION_TIME: 'MAX_BATCH_SUBMISSION_TIME', - POLL_INTERVAL: 'POLL_INTERVAL', - NUM_CONFIRMATIONS: 'NUM_CONFIRMATIONS', - RESUBMISSION_TIMEOUT: 'RESUBMISSION_TIMEOUT', - FINALITY_CONFIRMATIONS: 'FINALITY_CONFIRMATIONS', - RUN_TX_BATCH_SUBMITTER: 'RUN_TX_BATCH_SUBMITTER', - RUN_STATE_BATCH_SUBMITTER: 'RUN_STATE_BATCH_SUBMITTER', - SAFE_MINIMUM_ETHER_BALANCE: 'SAFE_MINIMUM_ETHER_BALANCE', - CLEAR_PENDING_TXS: 'CLEAR_PENDING_TXS', + CLEAR_PENDING_TXS: boolean } /* Optional Env Vars @@ -112,50 +64,196 @@ const requiredEnvVars: RequiredEnvVars = { * SEQUENCER_HD_PATH * PROPOSER_HD_PATH */ -const env = process.env -const FRAUD_SUBMISSION_ADDRESS = env.FRAUD_SUBMISSION_ADDRESS || 'no fraud' -const DISABLE_QUEUE_BATCH_APPEND = !!env.DISABLE_QUEUE_BATCH_APPEND -const MIN_GAS_PRICE_IN_GWEI = parseInt(env.MIN_GAS_PRICE_IN_GWEI, 10) || 0 -const MAX_GAS_PRICE_IN_GWEI = parseInt(env.MAX_GAS_PRICE_IN_GWEI, 10) || 70 -const GAS_RETRY_INCREMENT = parseInt(env.GAS_RETRY_INCREMENT, 10) || 5 -const GAS_THRESHOLD_IN_GWEI = parseInt(env.GAS_THRESHOLD_IN_GWEI, 10) || 100 - -// Private keys & mnemonics -const SEQUENCER_PRIVATE_KEY = env.SEQUENCER_PRIVATE_KEY -const PROPOSER_PRIVATE_KEY = - env.PROPOSER_PRIVATE_KEY || env.SEQUENCER_PRIVATE_KEY // Kept for backwards compatibility -const SEQUENCER_MNEMONIC = env.SEQUENCER_MNEMONIC || env.MNEMONIC -const PROPOSER_MNEMONIC = env.PROPOSER_MNEMONIC || env.MNEMONIC -const SEQUENCER_HD_PATH = env.SEQUENCER_HD_PATH || env.HD_PATH -const PROPOSER_HD_PATH = env.PROPOSER_HD_PATH || env.HD_PATH -// Auto fix batch options -- TODO: Remove this very hacky config -const AUTO_FIX_BATCH_OPTIONS_CONF = env.AUTO_FIX_BATCH_OPTIONS_CONF -const autoFixBatchOptions: AutoFixBatchOptions = { - fixDoublePlayedDeposits: AUTO_FIX_BATCH_OPTIONS_CONF - ? AUTO_FIX_BATCH_OPTIONS_CONF.includes('fixDoublePlayedDeposits') - : false, - fixMonotonicity: AUTO_FIX_BATCH_OPTIONS_CONF - ? AUTO_FIX_BATCH_OPTIONS_CONF.includes('fixMonotonicity') - : false, - fixSkippedDeposits: AUTO_FIX_BATCH_OPTIONS_CONF - ? AUTO_FIX_BATCH_OPTIONS_CONF.includes('fixSkippedDeposits') - : false, -} export const run = async () => { + dotenv.config() + + const config: Bcfg = new Config('batch-submitter') + config.load({ + env: true, + argv: true, + }) + + // Parse config + const env = process.env + const environment = config.str('node-env', env.NODE_ENV) + const network = config.str('eth-network-name', env.ETH_NETWORK_NAME) + const release = `batch-submitter@${env.npm_package_version}` + const sentryDsn = config.str('sentry-dsn', env.SENTRY_DSN) + const sentryTraceRate = config.ufloat( + 'sentry-trace-rate', + parseFloat(env.SENTRY_TRACE_RATE) || 0.05 + ) + + /* Logger */ + const name = 'oe:batch_submitter:init' + let logger + + if (config.bool('use-sentry', env.USE_SENTRY === 'true')) { + // Initialize Sentry for Batch Submitter deployed to a network + logger = new Logger({ + name, + sentryOptions: { + release, + dsn: sentryDsn, + tracesSampleRate: sentryTraceRate, + environment: network, + }, + }) + } else { + // Skip initializing Sentry + logger = new Logger({ name }) + } + + /* Metrics */ + const metrics = new Metrics({ + prefix: name, + labels: { environment, release, network }, + }) + + const FRAUD_SUBMISSION_ADDRESS = config.str( + 'fraud-submisison-address', + env.FRAUD_SUBMISSION_ADDRESS || 'no fraud' + ) + const DISABLE_QUEUE_BATCH_APPEND = config.bool( + 'disable-queue-batch-append', + !!env.DISABLE_QUEUE_BATCH_APPEND + ) + const MIN_GAS_PRICE_IN_GWEI = config.uint( + 'min-gas-price-in-gwei', + parseInt(env.MIN_GAS_PRICE_IN_GWEI, 10) || 0 + ) + const MAX_GAS_PRICE_IN_GWEI = config.uint( + 'max-gas-price-in-gwei', + parseInt(env.MAX_GAS_PRICE_IN_GWEI, 10) || 70 + ) + const GAS_RETRY_INCREMENT = config.uint( + 'gas-retry-increment', + parseInt(env.GAS_RETRY_INCREMENT, 10) || 5 + ) + const GAS_THRESHOLD_IN_GWEI = config.uint( + 'gas-threshold-in-gwei', + parseInt(env.GAS_THRESHOLD_IN_GWEI, 10) || 100 + ) + + // Private keys & mnemonics + const SEQUENCER_PRIVATE_KEY = config.str( + 'sequencer-private-key', + env.SEQUENCER_PRIVATE_KEY + ) + // Kept for backwards compatibility + const PROPOSER_PRIVATE_KEY = config.str( + 'proposer-private-key', + env.PROPOSER_PRIVATE_KEY || env.SEQUENCER_PRIVATE_KEY + ) + const SEQUENCER_MNEMONIC = config.str( + 'sequencer-mnemonic', + env.SEQUENCER_MNEMONIC || env.MNEMONIC + ) + const PROPOSER_MNEMONIC = config.str( + 'proposer-mnemonic', + env.PROPOSER_MNEMONIC || env.MNEMONIC + ) + const SEQUENCER_HD_PATH = config.str( + 'sequencer-hd-path', + env.SEQUENCER_HD_PATH || env.HD_PATH + ) + const PROPOSER_HD_PATH = config.str( + 'proposer-hd-path', + env.PROPOSER_HD_PATH || env.HD_PATH + ) + + // Auto fix batch options -- TODO: Remove this very hacky config + const AUTO_FIX_BATCH_OPTIONS_CONF = config.str( + 'auto-fix-batch-conf', + env.AUTO_FIX_BATCH_OPTIONS_CONF || '' + ) + const autoFixBatchOptions: AutoFixBatchOptions = { + fixDoublePlayedDeposits: AUTO_FIX_BATCH_OPTIONS_CONF + ? AUTO_FIX_BATCH_OPTIONS_CONF.includes('fixDoublePlayedDeposits') + : false, + fixMonotonicity: AUTO_FIX_BATCH_OPTIONS_CONF + ? AUTO_FIX_BATCH_OPTIONS_CONF.includes('fixMonotonicity') + : false, + fixSkippedDeposits: AUTO_FIX_BATCH_OPTIONS_CONF + ? AUTO_FIX_BATCH_OPTIONS_CONF.includes('fixSkippedDeposits') + : false, + } + logger.info('Starting batch submitter...') - for (const [i, val] of Object.entries(requiredEnvVars)) { - if (!process.env[val]) { + const requiredEnvVars: RequiredEnvVars = { + L1_NODE_WEB3_URL: config.str('l1-node-web3-url', env.L1_NODE_WEB3_URL), + L2_NODE_WEB3_URL: config.str('l2-node-web3-url', env.L2_NODE_WEB3_URL), + ADDRESS_MANAGER_ADDRESS: config.str( + 'address-manager-address', + env.ADDRESS_MANAGER_ADDRESS + ), + MIN_L1_TX_SIZE: config.uint( + 'min-l1-tx-size', + parseInt(env.MIN_L1_TX_SIZE, 10) + ), + MAX_L1_TX_SIZE: config.uint( + 'max-l1-tx-size', + parseInt(env.MAX_L1_TX_SIZE, 10) + ), + MAX_TX_BATCH_COUNT: config.uint( + 'max-tx-batch-count', + parseInt(env.MAX_TX_BATCH_COUNT, 10) + ), + MAX_STATE_BATCH_COUNT: config.uint( + 'max-state-batch-count', + parseInt(env.MAX_STATE_BATCH_COUNT, 10) + ), + MAX_BATCH_SUBMISSION_TIME: config.uint( + 'max-batch-submisison-time', + parseInt(env.MAX_BATCH_SUBMISSION_TIME, 10) + ), + POLL_INTERVAL: config.uint( + 'poll-interval', + parseInt(env.POLL_INTERVAL, 10) + ), + NUM_CONFIRMATIONS: config.uint( + 'num-confirmations', + parseInt(env.NUM_CONFIRMATIONS, 10) + ), + RESUBMISSION_TIMEOUT: config.uint( + 'resubmission-timeout', + parseInt(env.RESUBMISSION_TIMEOUT, 10) + ), + FINALITY_CONFIRMATIONS: config.uint( + 'finality-confirmations', + parseInt(env.FINALITY_CONFIRMATIONS, 10) + ), + RUN_TX_BATCH_SUBMITTER: config.bool( + 'run-tx-batch-submitter', + env.RUN_TX_BATCH_SUBMITTER === 'true' + ), + RUN_STATE_BATCH_SUBMITTER: config.bool( + 'run-state-batch-submitter', + env.RUN_STATE_BATCH_SUBMITTER === 'true' + ), + SAFE_MINIMUM_ETHER_BALANCE: config.ufloat( + 'safe-minimum-ether-balance', + parseFloat(env.SAFE_MINIMUM_ETHER_BALANCE) + ), + CLEAR_PENDING_TXS: config.bool( + 'clear-pending-txs', + env.CLEAR_PENDING_TXS === 'true' + ), + } + + for (const [key, val] of Object.entries(requiredEnvVars)) { + if (val === null || val === undefined) { logger.warn('Missing environment variable', { - varName: val, + key, + value: val, }) exit(1) } - requiredEnvVars[val] = process.env[val] } - const clearPendingTxs = requiredEnvVars.CLEAR_PENDING_TXS === 'true' + const clearPendingTxs = requiredEnvVars.CLEAR_PENDING_TXS const l1Provider = new JsonRpcProvider(requiredEnvVars.L1_NODE_WEB3_URL) const l2Provider = injectL2Context( @@ -206,14 +304,14 @@ export const run = async () => { const txBatchSubmitter = new TransactionBatchSubmitter( sequencerSigner, l2Provider, - parseInt(requiredEnvVars.MIN_L1_TX_SIZE, 10), - parseInt(requiredEnvVars.MAX_L1_TX_SIZE, 10), - parseInt(requiredEnvVars.MAX_TX_BATCH_COUNT, 10), - parseInt(requiredEnvVars.MAX_BATCH_SUBMISSION_TIME, 10) * 1_000, - parseInt(requiredEnvVars.NUM_CONFIRMATIONS, 10), - parseInt(requiredEnvVars.RESUBMISSION_TIMEOUT, 10) * 1_000, + requiredEnvVars.MIN_L1_TX_SIZE, + requiredEnvVars.MAX_L1_TX_SIZE, + requiredEnvVars.MAX_TX_BATCH_COUNT, + requiredEnvVars.MAX_BATCH_SUBMISSION_TIME * 1_000, + requiredEnvVars.NUM_CONFIRMATIONS, + requiredEnvVars.RESUBMISSION_TIMEOUT * 1_000, requiredEnvVars.ADDRESS_MANAGER_ADDRESS, - parseFloat(requiredEnvVars.SAFE_MINIMUM_ETHER_BALANCE), + requiredEnvVars.SAFE_MINIMUM_ETHER_BALANCE, MIN_GAS_PRICE_IN_GWEI, MAX_GAS_PRICE_IN_GWEI, GAS_RETRY_INCREMENT, @@ -230,15 +328,15 @@ export const run = async () => { const stateBatchSubmitter = new StateBatchSubmitter( proposerSigner, l2Provider, - parseInt(requiredEnvVars.MIN_L1_TX_SIZE, 10), - parseInt(requiredEnvVars.MAX_L1_TX_SIZE, 10), - parseInt(requiredEnvVars.MAX_STATE_BATCH_COUNT, 10), - parseInt(requiredEnvVars.MAX_BATCH_SUBMISSION_TIME, 10) * 1_000, - parseInt(requiredEnvVars.NUM_CONFIRMATIONS, 10), - parseInt(requiredEnvVars.RESUBMISSION_TIMEOUT, 10) * 1_000, - parseInt(requiredEnvVars.FINALITY_CONFIRMATIONS, 10), + requiredEnvVars.MIN_L1_TX_SIZE, + requiredEnvVars.MAX_L1_TX_SIZE, + requiredEnvVars.MAX_STATE_BATCH_COUNT, + requiredEnvVars.MAX_BATCH_SUBMISSION_TIME * 1_000, + requiredEnvVars.NUM_CONFIRMATIONS, + requiredEnvVars.RESUBMISSION_TIMEOUT * 1_000, + requiredEnvVars.FINALITY_CONFIRMATIONS, requiredEnvVars.ADDRESS_MANAGER_ADDRESS, - parseFloat(requiredEnvVars.SAFE_MINIMUM_ETHER_BALANCE), + requiredEnvVars.SAFE_MINIMUM_ETHER_BALANCE, MIN_GAS_PRICE_IN_GWEI, MAX_GAS_PRICE_IN_GWEI, GAS_RETRY_INCREMENT, @@ -281,7 +379,7 @@ export const run = async () => { }) await sequencerSigner.provider.waitForTransaction( response.hash, - parseInt(requiredEnvVars.NUM_CONFIRMATIONS, 10) + requiredEnvVars.NUM_CONFIRMATIONS ) } } @@ -307,17 +405,15 @@ export const run = async () => { logger.info('Retrying...') } // Sleep - await new Promise((r) => - setTimeout(r, parseInt(requiredEnvVars.POLL_INTERVAL, 10)) - ) + await new Promise((r) => setTimeout(r, requiredEnvVars.POLL_INTERVAL)) } } // Run batch submitters in two seperate infinite loops! - if (requiredEnvVars.RUN_TX_BATCH_SUBMITTER === 'true') { + if (requiredEnvVars.RUN_TX_BATCH_SUBMITTER) { loop(() => txBatchSubmitter.submitNextBatch()) } - if (requiredEnvVars.RUN_STATE_BATCH_SUBMITTER === 'true') { + if (requiredEnvVars.RUN_STATE_BATCH_SUBMITTER) { loop(() => stateBatchSubmitter.submitNextBatch()) } } diff --git a/packages/core-utils/src/bcfg.ts b/packages/core-utils/src/bcfg.ts new file mode 100644 index 000000000000..8b40de591d2b --- /dev/null +++ b/packages/core-utils/src/bcfg.ts @@ -0,0 +1,8 @@ +export interface Bcfg { + load: (options: { env?: boolean; argv?: boolean }) => void + str: (name: string, defaultValue?: string) => string + uint: (name: string, defaultValue?: number) => number + bool: (name: string, defaultValue?: boolean) => boolean + ufloat: (name: string, defaultValue?: number) => number + has: (name: string) => boolean +} diff --git a/packages/core-utils/src/index.ts b/packages/core-utils/src/index.ts index 2cc9c43e9097..aa5525f4dda4 100644 --- a/packages/core-utils/src/index.ts +++ b/packages/core-utils/src/index.ts @@ -4,3 +4,4 @@ export * from './watcher' export * from './l2context' export * from './events' export * from './batches' +export * from './bcfg' diff --git a/packages/data-transport-layer/src/services/run.ts b/packages/data-transport-layer/src/services/run.ts index 9074916a4858..877aaeded2cb 100644 --- a/packages/data-transport-layer/src/services/run.ts +++ b/packages/data-transport-layer/src/services/run.ts @@ -1,18 +1,11 @@ /* Imports: External */ import * as dotenv from 'dotenv' -import Config from 'bcfg' // TODO: Add some types for bcfg if we get the chance. +import { Bcfg } from '@eth-optimism/core-utils' +import Config from 'bcfg' /* Imports: Internal */ import { L1DataTransportService } from './main/service' -interface Bcfg { - load: (options: { env?: boolean; argv?: boolean }) => void - str: (name: string, defaultValue?: string) => string - uint: (name: string, defaultValue?: number) => number - bool: (name: string, defaultValue?: boolean) => boolean - ufloat: (name: string, defaultValue?: number) => number -} - type ethNetwork = 'mainnet' | 'kovan' | 'goerli' ;(async () => { try { diff --git a/packages/message-relayer/src/exec/run.ts b/packages/message-relayer/src/exec/run.ts index 50ff1a72c12f..c12c67d09215 100644 --- a/packages/message-relayer/src/exec/run.ts +++ b/packages/message-relayer/src/exec/run.ts @@ -1,17 +1,10 @@ import { Wallet, providers } from 'ethers' import { MessageRelayerService } from '../service' +import { Bcfg } from '@eth-optimism/core-utils' import SpreadSheet from '../spreadsheet' import * as dotenv from 'dotenv' import Config from 'bcfg' -interface Bcfg { - load: (options: { env?: boolean; argv?: boolean }) => void - str: (name: string, defaultValue?: string) => string - uint: (name: string, defaultValue?: number) => number - bool: (name: string, defaultValue?: boolean) => boolean - ufloat: (name: string, defaultValue?: number) => number -} - dotenv.config() const main = async () => { From 9d91c2cb70378c15a75866d80da92b35aebf0ac8 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Thu, 13 May 2021 12:21:40 -0700 Subject: [PATCH 22/29] l2geth: update rawdb logline Co-authored-by: Georgios Konstantopoulos --- l2geth/core/rawdb/rollup_indexes.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/l2geth/core/rawdb/rollup_indexes.go b/l2geth/core/rawdb/rollup_indexes.go index be604becc004..abed95aff40a 100644 --- a/l2geth/core/rawdb/rollup_indexes.go +++ b/l2geth/core/rawdb/rollup_indexes.go @@ -87,6 +87,6 @@ func WriteHeadBatchIndex(db ethdb.KeyValueWriter, index uint64) { value = []byte{0} } if err := db.Put(headBatchKey, value); err != nil { - log.Crit("Failed to store verifier index", "err", err) + log.Crit("Failed to store head batch index", "err", err) } } From 1b83b4b29f0753ecb1301848482918198d448bc8 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Thu, 13 May 2021 12:22:33 -0700 Subject: [PATCH 23/29] l2geth: more robust testing Co-authored-by: Georgios Konstantopoulos --- l2geth/rollup/sync_service_test.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/l2geth/rollup/sync_service_test.go b/l2geth/rollup/sync_service_test.go index 138dab75f0dd..402877f34167 100644 --- a/l2geth/rollup/sync_service_test.go +++ b/l2geth/rollup/sync_service_test.go @@ -226,7 +226,9 @@ func TestTransactionToTipTimestamps(t *testing.T) { tx2, } - for i, tx := range txs { + for _, tx := range txs { + nextIndex := service.GetNextIndex() + go func() { err = service.applyTransactionToTip(tx) }() @@ -240,17 +242,16 @@ func TestTransactionToTipTimestamps(t *testing.T) { if conf.GetMeta().Index == nil { t.Fatal("Index is nil") } - // The indexes should be incrementing by 1 - if *conf.GetMeta().Index != uint64(i) { - t.Fatal("Mismatched index") - } // The index that the sync service is tracking should be updated if *conf.GetMeta().Index != *service.GetLatestIndex() { - t.Fatal("Mismatched index") + t.Fatal("index on the service was not updated") + } + // The indexes should be incrementing by 1 + if *conf.GetMeta().Index != nextIndex { + t.Fatalf("Mismatched index: got %d, expect %d", *conf.GetMeta().Index, nextIndex) } // The tx timestamp should be setting the services timestamp - ts := service.GetLatestL1Timestamp() - if conf.L1Timestamp() != ts { + if conf.L1Timestamp() != service.GetLatestL1Timestamp() { t.Fatal("Mismatched timestamp") } } @@ -275,7 +276,7 @@ func TestApplyIndexedTransaction(t *testing.T) { t.Fatal(err) } - // Create three transactions, two of which have a dupliate index. + // Create three transactions, two of which have a duplicate index. // The first two transactions can be ingested without a problem and the // third transaction has a duplicate index so it will not be ingested. // Expect an error for the third transaction and expect the SyncService From 9ada2d0a3332f0c62f37d8a82abd29fbb2b013f7 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Thu, 13 May 2021 13:18:58 -0700 Subject: [PATCH 24/29] l2geth: add sanity check for L1ToL2 timestamps --- l2geth/rollup/sync_service.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 8b5ad3a21237..74abab68afc5 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -595,6 +595,14 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { if tx == nil { return errors.New("nil transaction passed to applyTransactionToTip") } + // Queue Origin L1 to L2 transactions must have a timestamp that is set by + // the L1 block that holds the transaction. This should never happen but is + // a sanity check to prevent fraudulent execution. + if tx.QueueOrigin().Uint64() == uint64(types.QueueOriginL1ToL2) { + if tx.L1Timestamp() == 0 { + return fmt.Errorf("Queue origin L1 to L2 transaction without a timestamp: %s", tx.Hash().Hex()) + } + } // If there is no OVM timestamp assigned to the transaction, then assign a // timestamp and blocknumber to it. This should only be the case for queue // origin sequencer transactions that come in via RPC. The L1 to L2 From 4c9ceb75be6d4c1f6c089371fe71efb834275006 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Thu, 13 May 2021 13:45:15 -0700 Subject: [PATCH 25/29] l2geth: handle error in single place --- l2geth/rollup/sync_service.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 74abab68afc5..397e924f3326 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -784,9 +784,6 @@ func (s *SyncService) syncToTip(sync syncer, getTip indexGetter) error { // sync will sync a range of items func (s *SyncService) sync(getLatest indexGetter, getNext nextGetter, syncer rangeSyncer) (*uint64, error) { latestIndex, err := getLatest() - if errors.Is(err, errElementNotFound) { - return nil, nil - } if err != nil { return nil, fmt.Errorf("Cannot sync: %w", err) } From 435e25d399468e7c59518e4ffff563efada72148 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Thu, 13 May 2021 17:15:25 -0700 Subject: [PATCH 26/29] l2geth: fix test tx queue origin --- l2geth/rollup/sync_service_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/l2geth/rollup/sync_service_test.go b/l2geth/rollup/sync_service_test.go index 402877f34167..2afe8545de93 100644 --- a/l2geth/rollup/sync_service_test.go +++ b/l2geth/rollup/sync_service_test.go @@ -265,6 +265,8 @@ func TestTransactionToTipTimestamps(t *testing.T) { err = service.applyTransactionToTip(tx3) }() result := <-txCh + service.chainHeadCh <- core.ChainHeadEvent{} + if result.Txs[0].L1Timestamp() != ts { t.Fatal("Timestamp not updated correctly") } @@ -831,7 +833,7 @@ func mockTx() *types.Transaction { timestamp, &l1TxOrigin, types.SighashEIP155, - types.QueueOriginL1ToL2, + types.QueueOriginSequencer, nil, nil, nil, From f7d18c1802994d48519b93839a7e367f3f353ff2 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Thu, 13 May 2021 17:15:59 -0700 Subject: [PATCH 27/29] l2geth: add new arg to start.sh --- l2geth/scripts/start.sh | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/l2geth/scripts/start.sh b/l2geth/scripts/start.sh index 7dca8ce9aa5b..cef5597044db 100755 --- a/l2geth/scripts/start.sh +++ b/l2geth/scripts/start.sh @@ -20,6 +20,7 @@ CACHE=1024 RPC_PORT=8545 WS_PORT=8546 VERBOSITY=3 +ROLLUP_BACKEND=l1 USAGE=" Start the Sequencer or Verifier with most configuration pre-set. @@ -189,6 +190,15 @@ while (( "$#" )); do exit 1 fi ;; + --rollup.backend) + if [ -n "$2" ] && [ ${2:0:1} != "-" ]; then + ROLLUP_BACKEND="$2" + shift 2 + else + echo "Error: Argument for $1 is missing" >&2 + exit 1 + fi + ;; --cache) if [ -n "$2" ] && [ ${2:0:1} != "-" ]; then CACHE="$2" @@ -227,6 +237,7 @@ cmd="$cmd --eth1.l1ethgatewayaddress $ETH1_L1_GATEWAY_ADDRESS" cmd="$cmd --rollup.clienthttp $ROLLUP_CLIENT_HTTP" cmd="$cmd --rollup.pollinterval $ROLLUP_POLL_INTERVAL" cmd="$cmd --rollup.timestamprefresh $ROLLUP_TIMESTAMP_REFRESH" +cmd="$cmd --rollup.backend $ROLLUP_BACKEND" cmd="$cmd --cache $CACHE" cmd="$cmd --rpc" cmd="$cmd --dev" From c38ec873e408ac047cd41d1a6d48fa69a09d37b2 Mon Sep 17 00:00:00 2001 From: Mark Tyneway Date: Thu, 13 May 2021 17:25:18 -0700 Subject: [PATCH 28/29] l2geth: return error in the SyncService.Start() --- l2geth/rollup/sync_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 397e924f3326..9c8fc98d8e16 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -201,7 +201,7 @@ func (s *SyncService) Start() error { return fmt.Errorf("Sequencer cannot sync transactions to tip: %w", err) } if err := s.syncQueueToTip(); err != nil { - log.Error("Sequencer cannot sync queue", "msg", err) + return fmt.Errorf("Sequencer cannot sync queue to tip: %w", err) } s.setSyncStatus(false) go s.SequencerLoop() From 112f0e8450c10cd8af88d7856079096160560e23 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Wed, 26 May 2021 13:01:13 +0300 Subject: [PATCH 29/29] chore: go fmt --- l2geth/cmd/utils/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/l2geth/cmd/utils/flags.go b/l2geth/cmd/utils/flags.go index e35849c99222..6c5c4447401b 100644 --- a/l2geth/cmd/utils/flags.go +++ b/l2geth/cmd/utils/flags.go @@ -1202,7 +1202,7 @@ func setRollup(ctx *cli.Context, cfg *rollup.Config) { backend, _ = rollup.NewBackend("l1") } cfg.Backend = backend - } + } if ctx.GlobalIsSet(RollupGasPriceOracleAddressFlag.Name) { addr := ctx.GlobalString(RollupGasPriceOracleAddressFlag.Name) cfg.GasPriceOracleAddress = common.HexToAddress(addr)