diff --git a/config/config_test.go b/config/config_test.go index 7350421fe2..aebc82fe90 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -56,6 +56,10 @@ func Test_Defaults(t *testing.T) { path: "Synchronizer.L2Synchronization.AcceptEmptyClosedBatches", expectedValue: false, }, + { + path: "Synchronizer.L2Synchronization.ReprocessFullBatchOnClose", + expectedValue: true, + }, { path: "Sequencer.DeletePoolTxsL1BlockConfirmations", expectedValue: uint64(100), diff --git a/config/default.go b/config/default.go index c087b0fa6f..e5a77ab641 100644 --- a/config/default.go +++ b/config/default.go @@ -118,6 +118,7 @@ L1SynchronizationMode = "sequential" ApplyAfterNumRollupReceived = 10 [Synchronizer.L2Synchronization] AcceptEmptyClosedBatches = false + ReprocessFullBatchOnClose = true [Sequencer] DeletePoolTxsL1BlockConfirmations = 100 diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index 1ab65c4a31..6b95d5c91b 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -28,7 +28,7 @@
"300ms"
 

Default: "5s"Type: string

RollupInfoRetriesSpacing is the minimum time between retries to request rollup info (it will sleep for fulfill this time) to avoid spamming L1


Examples:

"1m"
 
"300ms"
-

Default: falseType: boolean

FallbackToSequentialModeOnSynchronized if true switch to sequential mode if the system is synchronized


L2Synchronization Configuration for L2 synchronization
Default: falseType: boolean

AcceptEmptyClosedBatches is a flag to enable or disable the acceptance of empty batches.
if true, the synchronizer will accept empty batches and process them.


Configuration of the sequencer service
Default: 100Type: integer

DeletePoolTxsL1BlockConfirmations is blocks amount after which txs will be deleted from the pool


Default: "12h0m0s"Type: string

DeletePoolTxsCheckInterval is frequency with which txs will be checked for deleting


Examples:

"1m"
+

Default: falseType: boolean

FallbackToSequentialModeOnSynchronized if true switch to sequential mode if the system is synchronized


L2Synchronization Configuration for L2 synchronization
Default: falseType: boolean

AcceptEmptyClosedBatches is a flag to enable or disable the acceptance of empty batches.
if true, the synchronizer will accept empty batches and process them.


Default: trueType: boolean

ReprocessFullBatchOnClose if is true when a batch is closed is force to reprocess again


Configuration of the sequencer service
Default: 100Type: integer

DeletePoolTxsL1BlockConfirmations is blocks amount after which txs will be deleted from the pool


Default: "12h0m0s"Type: string

DeletePoolTxsCheckInterval is frequency with which txs will be checked for deleting


Examples:

"1m"
 
"300ms"
 

Default: "10m0s"Type: string

TxLifetimeCheckInterval is the time the sequencer waits to check txs lifetime


Examples:

"1m"
 
"300ms"
diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md
index 4492a475cf..2a7c8b9810 100644
--- a/docs/config-file/node-config-doc.md
+++ b/docs/config-file/node-config-doc.md
@@ -1683,9 +1683,10 @@ FallbackToSequentialModeOnSynchronized=false
 **Type:** : `object`
 **Description:** L2Synchronization Configuration for L2 synchronization
 
-| Property                                                                                | Pattern | Type    | Deprecated | Definition | Title/Description                                                                                                                                                   |
-| --------------------------------------------------------------------------------------- | ------- | ------- | ---------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| - [AcceptEmptyClosedBatches](#Synchronizer_L2Synchronization_AcceptEmptyClosedBatches ) | No      | boolean | No         | -          | AcceptEmptyClosedBatches is a flag to enable or disable the acceptance of empty batches.
if true, the synchronizer will accept empty batches and process them. | +| Property | Pattern | Type | Deprecated | Definition | Title/Description | +| ----------------------------------------------------------------------------------------- | ------- | ------- | ---------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| - [AcceptEmptyClosedBatches](#Synchronizer_L2Synchronization_AcceptEmptyClosedBatches ) | No | boolean | No | - | AcceptEmptyClosedBatches is a flag to enable or disable the acceptance of empty batches.
if true, the synchronizer will accept empty batches and process them. | +| - [ReprocessFullBatchOnClose](#Synchronizer_L2Synchronization_ReprocessFullBatchOnClose ) | No | boolean | No | - | ReprocessFullBatchOnClose if is true when a batch is closed is force to reprocess again | #### 9.6.1. `Synchronizer.L2Synchronization.AcceptEmptyClosedBatches` @@ -1702,6 +1703,20 @@ if true, the synchronizer will accept empty batches and process them. AcceptEmptyClosedBatches=false ``` +#### 9.6.2. `Synchronizer.L2Synchronization.ReprocessFullBatchOnClose` + +**Type:** : `boolean` + +**Default:** `true` + +**Description:** ReprocessFullBatchOnClose if is true when a batch is closed is force to reprocess again + +**Example setting the default value** (true): +``` +[Synchronizer.L2Synchronization] +ReprocessFullBatchOnClose=true +``` + ## 10. `[Sequencer]` **Type:** : `object` diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json index aeb3345481..867558ea25 100644 --- a/docs/config-file/node-config-schema.json +++ b/docs/config-file/node-config-schema.json @@ -631,6 +631,11 @@ "type": "boolean", "description": "AcceptEmptyClosedBatches is a flag to enable or disable the acceptance of empty batches.\nif true, the synchronizer will accept empty batches and process them.", "default": false + }, + "ReprocessFullBatchOnClose": { + "type": "boolean", + "description": "ReprocessFullBatchOnClose if is true when a batch is closed is force to reprocess again", + "default": true } }, "additionalProperties": false, diff --git a/state/batch.go b/state/batch.go index 09b329ee5f..7cf10ebeab 100644 --- a/state/batch.go +++ b/state/batch.go @@ -173,7 +173,7 @@ func (s *State) OpenBatch(ctx context.Context, processingContext ProcessingConte return err } if prevTimestamp.Unix() > processingContext.Timestamp.Unix() { - return ErrTimestampGE + return fmt.Errorf(" oldBatch(%d) tstamp=%d > openingBatch(%d)=%d err: %w", lastBatchNum, prevTimestamp.Unix(), processingContext.BatchNumber, processingContext.Timestamp.Unix(), ErrTimestampGE) } return s.OpenBatchInStorage(ctx, processingContext, dbTx) } diff --git a/state/pgstatestorage/l2block.go b/state/pgstatestorage/l2block.go index f86b10d26d..fac23ce6ce 100644 --- a/state/pgstatestorage/l2block.go +++ b/state/pgstatestorage/l2block.go @@ -197,7 +197,8 @@ func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2 } uncles = string(unclesBytes) } - + l2blockNumber := l2Block.Number().Uint64() + log.Debugf("[AddL2Block] adding L2 block %d", l2blockNumber) if _, err := e.Exec(ctx, addL2BlockSQL, l2Block.Number().Uint64(), l2Block.Hash().String(), header, uncles, l2Block.ParentHash().String(), l2Block.Root().String(), diff --git a/synchronizer/ext_control.go b/synchronizer/ext_control.go index 0ca2840dca..289d0e27ea 100644 --- a/synchronizer/ext_control.go +++ b/synchronizer/ext_control.go @@ -2,6 +2,8 @@ package synchronizer import ( "bufio" + "errors" + "fmt" "io" "os" "strconv" @@ -14,6 +16,7 @@ import ( const ( externalControlFilename = "/tmp/synchronizer_in" + externalOutputFilename = "/tmp/synchronizer_out" filePermissions = 0644 sleepTimeToReadFile = 500 * time.Millisecond ) @@ -28,16 +31,58 @@ const ( // example of usage (first you need to run the service): // echo "l1_producer_stop" >> /tmp/synchronizer_in // echo "l1_orchestrator_reset|8577060" >> /tmp/synchronizer_in -type externalControl struct { - producer *l1_parallel_sync.L1RollupInfoProducer - orquestrator *l1_parallel_sync.L1SyncOrchestration + +// ExtCmdArgs is the type of the arguments of the command +type ExtCmdArgs []string + +// ExtControlCmd is the interface of the external command +type ExtControlCmd interface { + // FunctionName returns the name of the function to be called example: "l1_producer_stop" + FunctionName() string + // ValidateArguments validates the arguments of the command, returns nil if ok, error if not + ValidateArguments(ExtCmdArgs) error + // Process the command + // args: the arguments of the command + // return: string with the output and an error + Process(ExtCmdArgs) (string, error) + // Help returns the help of the command + Help() string +} + +type externalCmdControl struct { + //producer *l1_parallel_sync.L1RollupInfoProducer + //orquestrator *l1_parallel_sync.L1SyncOrchestration + RegisteredCmds map[string]ExtControlCmd +} + +func newExternalCmdControl(producer *l1_parallel_sync.L1RollupInfoProducer, orquestrator *l1_parallel_sync.L1SyncOrchestration) *externalCmdControl { + res := &externalCmdControl{ + RegisteredCmds: make(map[string]ExtControlCmd), + } + res.RegisterCmd(&helpCmd{externalControl: res}) + res.RegisterCmd(&l1OrchestratorResetCmd{orquestrator: orquestrator}) + res.RegisterCmd(&l1ProducerStopCmd{producer: producer}) + return res } -func newExternalControl(producer *l1_parallel_sync.L1RollupInfoProducer, orquestrator *l1_parallel_sync.L1SyncOrchestration) *externalControl { - return &externalControl{producer: producer, orquestrator: orquestrator} +// RegisterCmd registers a command +func (e *externalCmdControl) RegisterCmd(cmd ExtControlCmd) { + if e.RegisteredCmds == nil { + e.RegisteredCmds = make(map[string]ExtControlCmd) + } + e.RegisteredCmds[cmd.FunctionName()] = cmd } -func (e *externalControl) start() { +// GetCmd returns a command by its name +func (e *externalCmdControl) GetCmd(functionName string) (ExtControlCmd, error) { + cmd, ok := e.RegisteredCmds[functionName] + if !ok { + return nil, errors.New("command not found") + } + return cmd, nil +} + +func (e *externalCmdControl) start() { log.Infof("EXT:start: starting external control opening %s", externalControlFilename) file, err := os.OpenFile(externalControlFilename, os.O_APPEND|os.O_CREATE|os.O_RDONLY, filePermissions) if err != nil { @@ -52,7 +97,7 @@ func (e *externalControl) start() { } // https://medium.com/@arunprabhu.1/tailing-a-file-in-golang-72944204f22b -func (e *externalControl) readFile(file *os.File) { +func (e *externalCmdControl) readFile(file *os.File) { defer file.Close() reader := bufio.NewReader(file) for { @@ -65,66 +110,134 @@ func (e *externalControl) readFile(file *os.File) { time.Sleep(sleepTimeToReadFile) continue } - break } log.Infof("EXT:readFile: new command: %s", line) - e.process(line) + cmd, cmdArgs, err := e.parse(line) + if err != nil { + log.Warnf("EXT:readFile: error parsing command %s:err %s", line, err) + continue + } + e.process(cmd, cmdArgs) } } } -func (e *externalControl) process(line string) { +func (e *externalCmdControl) parse(line string) (ExtControlCmd, ExtCmdArgs, error) { cmd := strings.Split(line, "|") if len(cmd) < 1 { - return + return nil, nil, errors.New("invalid command") } - switch strings.TrimSpace(cmd[0]) { - case "l1_producer_stop": - e.cmdL1ProducerStop(cmd[1:]) - case "l1_orchestrator_reset": - e.cmdL1OrchestratorReset(cmd[1:]) - case "l1_orchestrator_stop": - e.cmdL1OrchestratorAbort(cmd[1:]) - default: - log.Warnf("EXT:process: unknown command: %s", cmd[0]) + functionName := strings.TrimSpace(cmd[0]) + args := cmd[1:] + cmdObj, err := e.GetCmd(functionName) + if err != nil { + return nil, nil, err } + err = cmdObj.ValidateArguments(args) + if err != nil { + return nil, nil, err + } + return cmdObj, args, nil } -func (e *externalControl) cmdL1OrchestratorReset(args []string) { - log.Infof("EXT:cmdL1OrchestratorReset: %s", args) - if len(args) < 1 { - log.Infof("EXT:cmdL1OrchestratorReset: missing block number") +func (e *externalCmdControl) process(cmd ExtControlCmd, args ExtCmdArgs) { + fullFunc, err := fmt.Printf("%s(%s)", cmd.FunctionName(), strings.Join(args, ",")) + if err != nil { + log.Warnf("EXT:readFile: error composing cmd %s:err %s", cmd.FunctionName(), err) return } - blockNumber, err := strconv.ParseUint(strings.TrimSpace(args[0]), 10, 64) + output, err := cmd.Process(args) if err != nil { - log.Infof("EXT:cmdL1OrchestratorReset: error parsing block number: %s", err) + log.Warnf("EXT:readFile: error processing command %s:err %s", fullFunc, err) return } - log.Infof("EXT:cmdL1OrchestratorReset: calling orchestrator reset(%d)", blockNumber) - e.orquestrator.Reset(blockNumber) - log.Infof("EXT:cmdL1OrchestratorReset: calling orchestrator reset(%d) returned", blockNumber) + log.Warnf("EXT:readFile: command %s processed with output: %s", fullFunc, output) } -func (e *externalControl) cmdL1OrchestratorAbort(args []string) { - log.Infof("EXT:cmdL1OrchestratorAbort: %s", args) - if e.orquestrator == nil { - log.Infof("EXT:cmdL1OrchestratorAbort: orquestrator is nil") - return +// COMMANDS IMPLEMENTATION +// HELP +type helpCmd struct { + externalControl *externalCmdControl +} + +func (h *helpCmd) FunctionName() string { + return "help" +} +func (h *helpCmd) ValidateArguments(args ExtCmdArgs) error { + if len(args) > 0 { + return errors.New(h.FunctionName() + " command does not accept arguments") } - log.Infof("EXT:cmdL1OrchestratorAbort: calling orquestrator stop") - e.orquestrator.Abort() - log.Infof("EXT:cmdL1OrchestratorAbort: calling orquestrator stop returned") + return nil } -func (e *externalControl) cmdL1ProducerStop(args []string) { - log.Infof("EXT:cmdL1Stop: %s", args) - if e.producer == nil { - log.Infof("EXT:cmdL1Stop: producer is nil") - return +func (h *helpCmd) Process(args ExtCmdArgs) (string, error) { + var help string + for _, cmd := range h.externalControl.RegisteredCmds { + help += cmd.Help() + "\n" } - log.Infof("EXT:cmdL1Stop: calling producer stop") - e.producer.Stop() - log.Infof("EXT:cmdL1Stop: calling producer stop returned") + return help, nil +} +func (h *helpCmd) Help() string { + return h.FunctionName() + ": show the help of the commands" +} + +// COMMANDS "l1_orchestrator_reset" +type l1OrchestratorResetCmd struct { + orquestrator *l1_parallel_sync.L1SyncOrchestration +} + +func (h *l1OrchestratorResetCmd) FunctionName() string { + return "l1_orchestrator_reset" +} + +func (h *l1OrchestratorResetCmd) ValidateArguments(args ExtCmdArgs) error { + if len(args) != 1 { + return errors.New(h.FunctionName() + " needs 1 argument") + } + _, err := strconv.ParseUint(strings.TrimSpace(args[0]), 10, 64) + if err != nil { + return fmt.Errorf("error parsing block number: %s err:%w", args[0], err) + } + return nil +} +func (h *l1OrchestratorResetCmd) Process(args ExtCmdArgs) (string, error) { + blockNumber, err := strconv.ParseUint(strings.TrimSpace(args[0]), 10, 64) + if err != nil { + return "error param", err + } + log.Warnf("EXT:"+h.FunctionName()+": calling orchestrator reset(%d)", blockNumber) + h.orquestrator.Reset(blockNumber) + res := fmt.Sprintf("EXT: "+h.FunctionName()+": reset to block %d", blockNumber) + return res, nil +} + +func (h *l1OrchestratorResetCmd) Help() string { + return h.FunctionName() + ": reset L1 parallel sync orchestrator to a given block number" +} + +// COMMANDS l1_producer_stop +type l1ProducerStopCmd struct { + producer *l1_parallel_sync.L1RollupInfoProducer +} + +func (h *l1ProducerStopCmd) FunctionName() string { + return "l1_producer_stop" +} + +func (h *l1ProducerStopCmd) ValidateArguments(args ExtCmdArgs) error { + if len(args) > 0 { + return errors.New(h.FunctionName() + " command does not accept arguments") + } + return nil +} +func (h *l1ProducerStopCmd) Process(args ExtCmdArgs) (string, error) { + log.Warnf("EXT:" + h.FunctionName() + ": calling producer stop") + h.producer.Stop() + res := "EXT: " + h.FunctionName() + ": producer stopped" + return res, nil +} + +func (h *l1ProducerStopCmd) Help() string { + return h.FunctionName() + ": stop L1 rollup info producer" } diff --git a/synchronizer/l2_sync/config.go b/synchronizer/l2_sync/config.go index 292d726caf..7166765b88 100644 --- a/synchronizer/l2_sync/config.go +++ b/synchronizer/l2_sync/config.go @@ -5,4 +5,7 @@ type Config struct { // AcceptEmptyClosedBatches is a flag to enable or disable the acceptance of empty batches. // if true, the synchronizer will accept empty batches and process them. AcceptEmptyClosedBatches bool `mapstructure:"AcceptEmptyClosedBatches"` + + // ReprocessFullBatchOnClose if is true when a batch is closed is force to reprocess again + ReprocessFullBatchOnClose bool `mapstructure:"ReprocessFullBatchOnClose"` } diff --git a/synchronizer/l2_sync/l2_shared/mocks/l1_sync_global_exit_root_checker.go b/synchronizer/l2_sync/l2_shared/mocks/l1_sync_global_exit_root_checker.go new file mode 100644 index 0000000000..0c50000d4d --- /dev/null +++ b/synchronizer/l2_sync/l2_shared/mocks/l1_sync_global_exit_root_checker.go @@ -0,0 +1,89 @@ +// Code generated by mockery. DO NOT EDIT. + +package mock_l2_shared + +import ( + context "context" + + common "github.com/ethereum/go-ethereum/common" + + mock "github.com/stretchr/testify/mock" + + pgx "github.com/jackc/pgx/v4" +) + +// L1SyncGlobalExitRootChecker is an autogenerated mock type for the L1SyncGlobalExitRootChecker type +type L1SyncGlobalExitRootChecker struct { + mock.Mock +} + +type L1SyncGlobalExitRootChecker_Expecter struct { + mock *mock.Mock +} + +func (_m *L1SyncGlobalExitRootChecker) EXPECT() *L1SyncGlobalExitRootChecker_Expecter { + return &L1SyncGlobalExitRootChecker_Expecter{mock: &_m.Mock} +} + +// CheckL1SyncGlobalExitRootEnoughToProcessBatch provides a mock function with given fields: ctx, batchNumber, globalExitRoot, dbTx +func (_m *L1SyncGlobalExitRootChecker) CheckL1SyncGlobalExitRootEnoughToProcessBatch(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx) error { + ret := _m.Called(ctx, batchNumber, globalExitRoot, dbTx) + + if len(ret) == 0 { + panic("no return value specified for CheckL1SyncGlobalExitRootEnoughToProcessBatch") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, common.Hash, pgx.Tx) error); ok { + r0 = rf(ctx, batchNumber, globalExitRoot, dbTx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckL1SyncGlobalExitRootEnoughToProcessBatch' +type L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call struct { + *mock.Call +} + +// CheckL1SyncGlobalExitRootEnoughToProcessBatch is a helper method to define mock.On call +// - ctx context.Context +// - batchNumber uint64 +// - globalExitRoot common.Hash +// - dbTx pgx.Tx +func (_e *L1SyncGlobalExitRootChecker_Expecter) CheckL1SyncGlobalExitRootEnoughToProcessBatch(ctx interface{}, batchNumber interface{}, globalExitRoot interface{}, dbTx interface{}) *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call { + return &L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call{Call: _e.mock.On("CheckL1SyncGlobalExitRootEnoughToProcessBatch", ctx, batchNumber, globalExitRoot, dbTx)} +} + +func (_c *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call) Run(run func(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx)) *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint64), args[2].(common.Hash), args[3].(pgx.Tx)) + }) + return _c +} + +func (_c *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call) Return(_a0 error) *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call) RunAndReturn(run func(context.Context, uint64, common.Hash, pgx.Tx) error) *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call { + _c.Call.Return(run) + return _c +} + +// NewL1SyncGlobalExitRootChecker creates a new instance of L1SyncGlobalExitRootChecker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewL1SyncGlobalExitRootChecker(t interface { + mock.TestingT + Cleanup(func()) +}) *L1SyncGlobalExitRootChecker { + mock := &L1SyncGlobalExitRootChecker{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/synchronizer/l2_sync/l2_shared/processor_trusted_batch_sync.go b/synchronizer/l2_sync/l2_shared/processor_trusted_batch_sync.go index 6f5c2b48c1..94535ebe4e 100644 --- a/synchronizer/l2_sync/l2_shared/processor_trusted_batch_sync.go +++ b/synchronizer/l2_sync/l2_shared/processor_trusted_batch_sync.go @@ -127,24 +127,31 @@ type SyncTrustedBatchExecutor interface { NothingProcess(ctx context.Context, data *ProcessData, dbTx pgx.Tx) (*ProcessResponse, error) } +// L1SyncGlobalExitRootChecker is the interface to check if the required GlobalExitRoot is already synced from L1 +type L1SyncGlobalExitRootChecker interface { + CheckL1SyncGlobalExitRootEnoughToProcessBatch(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx) error +} + // ProcessorTrustedBatchSync is a template to sync trusted state. It classify what kind of update is needed and call to SyncTrustedStateBatchExecutorSteps // // that is the one that execute the sync process // // the real implementation of the steps is in the SyncTrustedStateBatchExecutorSteps interface that known how to process a batch type ProcessorTrustedBatchSync struct { - Steps SyncTrustedBatchExecutor - timeProvider syncCommon.TimeProvider - Cfg l2_sync.Config + Steps SyncTrustedBatchExecutor + timeProvider syncCommon.TimeProvider + l1SyncChecker L1SyncGlobalExitRootChecker + Cfg l2_sync.Config } // NewProcessorTrustedBatchSync creates a new SyncTrustedStateBatchExecutorTemplate func NewProcessorTrustedBatchSync(steps SyncTrustedBatchExecutor, - timeProvider syncCommon.TimeProvider, cfg l2_sync.Config) *ProcessorTrustedBatchSync { + timeProvider syncCommon.TimeProvider, l1SyncChecker L1SyncGlobalExitRootChecker, cfg l2_sync.Config) *ProcessorTrustedBatchSync { return &ProcessorTrustedBatchSync{ - Steps: steps, - timeProvider: timeProvider, - Cfg: cfg, + Steps: steps, + timeProvider: timeProvider, + l1SyncChecker: l1SyncChecker, + Cfg: cfg, } } @@ -152,6 +159,15 @@ func NewProcessorTrustedBatchSync(steps SyncTrustedBatchExecutor, func (s *ProcessorTrustedBatchSync) ProcessTrustedBatch(ctx context.Context, trustedBatch *types.Batch, status TrustedState, dbTx pgx.Tx, debugPrefix string) (*TrustedState, error) { log.Debugf("%s Processing trusted batch: %v", debugPrefix, trustedBatch.Number) stateCurrentBatch, statePreviousBatch := s.GetCurrentAndPreviousBatchFromCache(&status) + if s.l1SyncChecker != nil { + err := s.l1SyncChecker.CheckL1SyncGlobalExitRootEnoughToProcessBatch(ctx, uint64(trustedBatch.Number), trustedBatch.GlobalExitRoot, dbTx) + if err != nil { + log.Errorf("%s error checking GlobalExitRoot from TrustedBatch. Error: ", debugPrefix, err) + return nil, err + } + } else { + log.Infof("Disabled check L1 sync status for process batch") + } processMode, err := s.GetModeForProcessBatch(trustedBatch, stateCurrentBatch, statePreviousBatch, debugPrefix) if err != nil { log.Error("%s error getting processMode. Error: ", debugPrefix, trustedBatch.Number, err) @@ -275,7 +291,7 @@ func (s *ProcessorTrustedBatchSync) GetModeForProcessBatch(trustedNodeBatch *typ Description: "Batch is not on database, so is the first time we process it", } } else { - _, strDiffsBatches := AreEqualStateBatchAndTrustedBatch(stateBatch, trustedNodeBatch, CMP_BATCH_IGNORE_TSTAMP) + areBatchesExactlyEqual, strDiffsBatches := AreEqualStateBatchAndTrustedBatch(stateBatch, trustedNodeBatch, CMP_BATCH_IGNORE_TSTAMP) newL2DataFlag, err := ThereAreNewBatchL2Data(stateBatch.BatchL2Data, trustedNodeBatch.BatchL2Data) if err != nil { return ProcessData{}, err @@ -288,6 +304,10 @@ func (s *ProcessorTrustedBatchSync) GetModeForProcessBatch(trustedNodeBatch *typ BatchMustBeClosed: isTrustedBatchClosed(trustedNodeBatch) && stateBatch.WIP, Description: "no new data on batch. Diffs: " + strDiffsBatches, } + if areBatchesExactlyEqual { + result.BatchMustBeClosed = false + result.Description = "exactly batches: " + strDiffsBatches + } } else { // We have a previous batch, but in node something change // We have processed this batch before, and we have the intermediate state root, so is going to be process only new Tx. @@ -310,6 +330,16 @@ func (s *ProcessorTrustedBatchSync) GetModeForProcessBatch(trustedNodeBatch *typ } } + if s.Cfg.ReprocessFullBatchOnClose && result.BatchMustBeClosed { + if result.Mode == IncrementalProcessMode || result.Mode == NothingProcessMode { + result.Description = "forced reprocess due to batch closed and ReprocessFullBatchOnClose" + log.Infof("%s Batch %v: Converted mode %s to %s because cfg.ReprocessFullBatchOnClose", debugPrefix, trustedNodeBatch.Number, result.Mode, ReprocessProcessMode) + result.Mode = ReprocessProcessMode + result.OldStateRoot = statePreviousBatch.StateRoot + result.BatchMustBeClosed = true + } + } + if result.Mode == "" { return result, fmt.Errorf("batch %v: failed to get mode for process ", trustedNodeBatch.Number) } diff --git a/synchronizer/l2_sync/l2_shared/tests/processor_trusted_batch_sync_test.go b/synchronizer/l2_sync/l2_shared/tests/processor_trusted_batch_sync_test.go index 9b5ac1f466..79e62c93d5 100644 --- a/synchronizer/l2_sync/l2_shared/tests/processor_trusted_batch_sync_test.go +++ b/synchronizer/l2_sync/l2_shared/tests/processor_trusted_batch_sync_test.go @@ -27,7 +27,8 @@ var ( func TestCacheEmpty(t *testing.T) { mockExecutor := mock_l2_shared.NewSyncTrustedBatchExecutor(t) mockTimer := &commonSync.MockTimerProvider{} - sut := l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, cfg) + mockL1SyncChecker := mock_l2_shared.NewL1SyncGlobalExitRootChecker(t) + sut := l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, mockL1SyncChecker, cfg) current, previous := sut.GetCurrentAndPreviousBatchFromCache(&l2_shared.TrustedState{ LastTrustedBatches: []*state.Batch{nil, nil}, @@ -57,7 +58,7 @@ func TestCacheJustCurrent(t *testing.T) { status := l2_shared.TrustedState{ LastTrustedBatches: []*state.Batch{&batchA}, } - sut := l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, cfg) + sut := l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, nil, cfg) current, previous := sut.GetCurrentAndPreviousBatchFromCache(&status) require.Nil(t, previous) @@ -75,7 +76,7 @@ func TestCacheJustPrevious(t *testing.T) { status := l2_shared.TrustedState{ LastTrustedBatches: []*state.Batch{nil, &batchA}, } - sut := l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, cfg) + sut := l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, nil, cfg) current, previous := sut.GetCurrentAndPreviousBatchFromCache(&status) require.Nil(t, current) @@ -98,7 +99,7 @@ func newTestDataForProcessorTrustedBatchSync(t *testing.T) *TestDataForProcessor return &TestDataForProcessorTrustedBatchSync{ mockTimer: mockTimer, mockExecutor: mockExecutor, - sut: l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, cfg), + sut: l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, nil, cfg), stateCurrentBatch: &state.Batch{ BatchNumber: 123, Coinbase: common.HexToAddress("0x1230"), @@ -211,6 +212,7 @@ func TestGetModeForProcessBatchNothing(t *testing.T) { func TestGetModeForEmptyAndClosedBatchConfiguredToReject(t *testing.T) { testData := newTestDataForProcessorTrustedBatchSync(t) testData.sut.Cfg.AcceptEmptyClosedBatches = false + testData.sut.Cfg.ReprocessFullBatchOnClose = true testData.stateCurrentBatch.WIP = true testData.trustedNodeBatch.Closed = true processData, err := testData.sut.GetModeForProcessBatch(testData.trustedNodeBatch, testData.stateCurrentBatch, testData.statePreviousBatch, "test") @@ -241,6 +243,32 @@ func TestGetModeForEmptyAndClosedBatchConfiguredToReject(t *testing.T) { require.Error(t, err) } +func TestGetModeReprocessFullBatchOnCloseTrue(t *testing.T) { + testData := newTestDataForProcessorTrustedBatchSync(t) + testData.sut.Cfg.AcceptEmptyClosedBatches = true + testData.sut.Cfg.ReprocessFullBatchOnClose = true + testData.stateCurrentBatch.WIP = true + testData.stateCurrentBatch.BatchL2Data = common.Hex2Bytes("112233") + testData.trustedNodeBatch.BatchL2Data = common.Hex2Bytes("11223344") + testData.trustedNodeBatch.Closed = true + // Is a incremental converted to reprocess + testData.sut.Cfg.ReprocessFullBatchOnClose = true + processData, err := testData.sut.GetModeForProcessBatch(testData.trustedNodeBatch, testData.stateCurrentBatch, testData.statePreviousBatch, "test") + require.NoError(t, err) + require.Equal(t, l2_shared.ReprocessProcessMode, processData.Mode, "current batch and trusted batch are the same, just need to be closed") + // Is a incremental to close + testData.sut.Cfg.ReprocessFullBatchOnClose = false + processData, err = testData.sut.GetModeForProcessBatch(testData.trustedNodeBatch, testData.stateCurrentBatch, testData.statePreviousBatch, "test") + require.NoError(t, err) + require.Equal(t, l2_shared.IncrementalProcessMode, processData.Mode, "increment of batchl2data, need to incremental execution") + // No previous batch, is a fullprocess + testData.sut.Cfg.ReprocessFullBatchOnClose = true + processData, err = testData.sut.GetModeForProcessBatch(testData.trustedNodeBatch, nil, testData.statePreviousBatch, "test") + require.NoError(t, err) + require.Equal(t, l2_shared.FullProcessMode, processData.Mode, "no previous batch and close, fullprocess") + +} + func TestGetNextStatusClear(t *testing.T) { testData := newTestDataForProcessorTrustedBatchSync(t) previousStatus := l2_shared.TrustedState{ diff --git a/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch.go b/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch.go index da6fc75bdd..019c487231 100644 --- a/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch.go +++ b/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch.go @@ -33,11 +33,11 @@ func NewCheckSyncStatusToProcessBatch(zkevmRPCClient syncinterfaces.ZKEVMClientG } } -// CheckL1SyncStatusEnoughToProcessBatch returns nil if the are sync and could process the batch +// CheckL1SyncGlobalExitRootEnoughToProcessBatch returns nil if the are sync and could process the batch // if not: // - returns syncinterfaces.ErrMissingSyncFromL1 if we are behind the block number that contains the GlobalExitRoot // - returns l2_shared.NewDeSyncPermissionlessAndTrustedNodeError if trusted and and permissionless are not in same page! pass also the discrepance point -func (c *CheckSyncStatusToProcessBatch) CheckL1SyncStatusEnoughToProcessBatch(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx) error { +func (c *CheckSyncStatusToProcessBatch) CheckL1SyncGlobalExitRootEnoughToProcessBatch(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx) error { // Find out if this node have GlobalExitRoot // If not: ask to zkevm-RPC the block number of this GlobalExitRoot // If we are behind this block number returns ErrMissingSyncFromL1 @@ -47,14 +47,14 @@ func (c *CheckSyncStatusToProcessBatch) CheckL1SyncStatusEnoughToProcessBatch(ct return nil } debugStr := fmt.Sprintf("CheckL1SyncStatusEnoughToProcessBatch batchNumber:%d globalExitRoot: %s ", batchNumber, globalExitRoot.Hex()) - _, err := c.state.GetExitRootByGlobalExitRoot(ctx, globalExitRoot, dbTx) + localGERInfo, err := c.state.GetExitRootByGlobalExitRoot(ctx, globalExitRoot, dbTx) if err != nil && !errors.Is(err, state.ErrNotFound) { log.Errorf("error getting GetExitRootByGlobalExitRoot %s . Error: ", debugStr, err) return err } if err == nil { // We have this GlobalExitRoot, so we are synced from L1 - log.Infof("We have this GlobalExitRoot, so we are synced from L1 %s", debugStr) + log.Infof("We have this GlobalExitRoot (%s) in L1block %d, so we are synced from L1 %s", globalExitRoot.String(), localGERInfo.BlockNumber, debugStr) return nil } // this means err != state.ErrNotFound -> so we have to ask to zkevm-RPC the block number of this GlobalExitRoot diff --git a/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch_test.go b/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch_test.go index 9acee59955..84d11dddc8 100644 --- a/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch_test.go +++ b/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch_test.go @@ -42,13 +42,13 @@ func NewTestData(t *testing.T) *testData { func TestCheckL1SyncStatusEnoughToProcessBatchGerZero(t *testing.T) { testData := NewTestData(t) - err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, state.ZeroHash, nil) + err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, state.ZeroHash, nil) require.NoError(t, err) } func TestCheckL1SyncStatusEnoughToProcessBatchGerOnDB(t *testing.T) { testData := NewTestData(t) testData.stateMock.EXPECT().GetExitRootByGlobalExitRoot(testData.ctx, globalExitRootNonZero, nil).Return(&state.GlobalExitRoot{}, nil).Once() - err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil) + err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil) require.NoError(t, err) } @@ -57,7 +57,7 @@ func TestCheckL1SyncStatusEnoughToProcessBatchGerDatabaseFails(t *testing.T) { testData.stateMock.EXPECT().GetExitRootByGlobalExitRoot(testData.ctx, globalExitRootNonZero, nil).Return(nil, randomError).Once() - err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil) + err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil) require.Error(t, err) } @@ -67,7 +67,7 @@ func TestCheckL1SyncStatusEnoughToProcessBatchGerNoOnDBFailsCallToZkevm(t *testi testData.stateMock.EXPECT().GetExitRootByGlobalExitRoot(testData.ctx, globalExitRootNonZero, nil).Return(nil, state.ErrNotFound).Once() testData.zkevmMock.EXPECT().ExitRootsByGER(testData.ctx, globalExitRootNonZero).Return(nil, randomError).Once() - err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil) + err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil) require.Error(t, err) } @@ -79,7 +79,7 @@ func TestCheckL1SyncStatusEnoughToProcessBatchGerNoOnDBWeAre1BlockBehind(t *test testData.zkevmMock.EXPECT().ExitRootsByGER(testData.ctx, globalExitRootNonZero).Return(&types.ExitRoots{BlockNumber: types.ArgUint64(l1Block)}, nil).Once() testData.stateMock.EXPECT().GetLastBlock(testData.ctx, nil).Return(&state.Block{BlockNumber: l1Block - 1}, nil).Once() - err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil) + err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil) require.ErrorIs(t, err, syncinterfaces.ErrMissingSyncFromL1) } @@ -91,7 +91,7 @@ func TestCheckL1SyncStatusEnoughToProcessBatchGerNoOnDBWeAre1BlockBeyond(t *test testData.zkevmMock.EXPECT().ExitRootsByGER(testData.ctx, globalExitRootNonZero).Return(&types.ExitRoots{BlockNumber: types.ArgUint64(l1Block)}, nil).Once() testData.stateMock.EXPECT().GetLastBlock(testData.ctx, nil).Return(&state.Block{BlockNumber: l1Block + 1}, nil).Once() - err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil) + err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil) require.ErrorIs(t, err, syncinterfaces.ErrFatalDesyncFromL1) l1BlockNumber := err.(*l2_shared.DeSyncPermissionlessAndTrustedNodeError).L1BlockNumber require.Equal(t, l1Block, l1BlockNumber, "returns the block where is the discrepancy") @@ -105,7 +105,7 @@ func TestCheckL1SyncStatusEnoughToProcessBatchGerNoOnDBWeAreLastBlockSynced(t *t testData.zkevmMock.EXPECT().ExitRootsByGER(testData.ctx, globalExitRootNonZero).Return(&types.ExitRoots{BlockNumber: types.ArgUint64(l1Block)}, nil).Once() testData.stateMock.EXPECT().GetLastBlock(testData.ctx, nil).Return(&state.Block{BlockNumber: l1Block}, nil).Once() - err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil) + err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil) require.ErrorIs(t, err, syncinterfaces.ErrFatalDesyncFromL1) l1BlockNumber := err.(*l2_shared.DeSyncPermissionlessAndTrustedNodeError).L1BlockNumber require.Equal(t, l1Block, l1BlockNumber, "returns the block where is the discrepancy") diff --git a/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync.go b/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync.go index 05bfdf915d..1975f6c42b 100644 --- a/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync.go +++ b/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync.go @@ -18,6 +18,10 @@ import ( "github.com/jackc/pgx/v4" ) +const ( + timeOfLiveBatchOnCache = 5 * time.Minute +) + var ( // ErrNotImplemented is returned when a method is not implemented ErrNotImplemented = errors.New("not implemented") @@ -41,34 +45,28 @@ type StateInterface interface { ProcessBatchV2(ctx context.Context, request state.ProcessRequest, updateMerkleTree bool) (*state.ProcessBatchResponse, error) StoreL2Block(ctx context.Context, batchNumber uint64, l2Block *state.ProcessBlockResponse, txsEGPLog []*state.EffectiveGasPriceLog, dbTx pgx.Tx) error GetL1InfoTreeDataFromBatchL2Data(ctx context.Context, batchL2Data []byte, dbTx pgx.Tx) (map[uint32]state.L1DataV2, common.Hash, common.Hash, error) -} - -// L1SyncChecker is the interface to check if we are synced from L1 to process a batch -type L1SyncChecker interface { - CheckL1SyncStatusEnoughToProcessBatch(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx) error + GetLastVirtualBatchNum(ctx context.Context, dbTx pgx.Tx) (uint64, error) } // SyncTrustedBatchExecutorForEtrog is the implementation of the SyncTrustedStateBatchExecutorSteps that // have the functions to sync a fullBatch, incrementalBatch and reprocessBatch type SyncTrustedBatchExecutorForEtrog struct { - state StateInterface - sync syncinterfaces.SynchronizerFlushIDManager - l1SyncChecker L1SyncChecker + state StateInterface + sync syncinterfaces.SynchronizerFlushIDManager } // NewSyncTrustedBatchExecutorForEtrog creates a new prcessor for sync with L2 batches func NewSyncTrustedBatchExecutorForEtrog(zkEVMClient syncinterfaces.ZKEVMClientTrustedBatchesGetter, state l2_shared.StateInterface, stateBatchExecutor StateInterface, - sync syncinterfaces.SynchronizerFlushIDManager, timeProvider syncCommon.TimeProvider, l1SyncChecker L1SyncChecker, + sync syncinterfaces.SynchronizerFlushIDManager, timeProvider syncCommon.TimeProvider, l1SyncChecker l2_shared.L1SyncGlobalExitRootChecker, cfg l2_sync.Config) *l2_shared.TrustedBatchesRetrieve { executorSteps := &SyncTrustedBatchExecutorForEtrog{ - state: stateBatchExecutor, - sync: sync, - l1SyncChecker: l1SyncChecker, + state: stateBatchExecutor, + sync: sync, } - executor := l2_shared.NewProcessorTrustedBatchSync(executorSteps, timeProvider, cfg) - a := l2_shared.NewTrustedBatchesRetrieve(executor, zkEVMClient, state, sync, *l2_shared.NewTrustedStateManager(timeProvider, time.Hour)) + executor := l2_shared.NewProcessorTrustedBatchSync(executorSteps, timeProvider, l1SyncChecker, cfg) + a := l2_shared.NewTrustedBatchesRetrieve(executor, zkEVMClient, state, sync, *l2_shared.NewTrustedStateManager(timeProvider, timeOfLiveBatchOnCache)) return a } @@ -144,12 +142,7 @@ func (b *SyncTrustedBatchExecutorForEtrog) FullProcess(ctx context.Context, data data.DebugPrefix += " (emptyBatch) " return b.CreateEmptyBatch(ctx, data, dbTx) } - err := b.checkIfWeAreSyncedFromL1ToProcessGlobalExitRoot(ctx, data, dbTx) - if err != nil { - log.Errorf("%s error checkIfWeAreSyncedFromL1ToProcessGlobalExitRoot. Error: %v", data.DebugPrefix, err) - return nil, err - } - err = b.openBatch(ctx, data.TrustedBatch, dbTx, data.DebugPrefix) + err := b.openBatch(ctx, data.TrustedBatch, dbTx, data.DebugPrefix) if err != nil { log.Errorf("%s error openning batch. Error: %v", data.DebugPrefix, err) return nil, err @@ -209,11 +202,6 @@ func (b *SyncTrustedBatchExecutorForEtrog) IncrementalProcess(ctx context.Contex log.Errorf("%s error checkThatL2DataIsIncremental. Error: %v", data.DebugPrefix, err) return nil, err } - err = b.checkIfWeAreSyncedFromL1ToProcessGlobalExitRoot(ctx, data, dbTx) - if err != nil { - log.Errorf("%s error checkIfWeAreSyncedFromL1ToProcessGlobalExitRoot. Error: %v", data.DebugPrefix, err) - return nil, err - } PartialBatchL2Data, err := b.composePartialBatch(data.StateBatch, data.TrustedBatch) if err != nil { @@ -260,21 +248,17 @@ func (b *SyncTrustedBatchExecutorForEtrog) IncrementalProcess(ctx context.Contex } updatedBatch := *data.StateBatch + updatedBatch.LocalExitRoot = data.TrustedBatch.LocalExitRoot + updatedBatch.AccInputHash = data.TrustedBatch.AccInputHash + updatedBatch.GlobalExitRoot = data.TrustedBatch.GlobalExitRoot updatedBatch.BatchL2Data = data.TrustedBatch.BatchL2Data updatedBatch.WIP = !data.BatchMustBeClosed + res := l2_shared.NewProcessResponse() res.UpdateCurrentBatchWithExecutionResult(&updatedBatch, processBatchResp) return &res, nil } -func (b *SyncTrustedBatchExecutorForEtrog) checkIfWeAreSyncedFromL1ToProcessGlobalExitRoot(ctx context.Context, data *l2_shared.ProcessData, dbTx pgx.Tx) error { - if b.l1SyncChecker == nil { - log.Infof("Disabled check L1 sync status for process batch") - return nil - } - return b.l1SyncChecker.CheckL1SyncStatusEnoughToProcessBatch(ctx, data.BatchNumber, data.TrustedBatch.GlobalExitRoot, dbTx) -} - func (b *SyncTrustedBatchExecutorForEtrog) updateWIPBatch(ctx context.Context, data *l2_shared.ProcessData, NewStateRoot common.Hash, dbTx pgx.Tx) error { receipt := state.ProcessingReceipt{ BatchNumber: data.BatchNumber, @@ -296,7 +280,17 @@ func (b *SyncTrustedBatchExecutorForEtrog) updateWIPBatch(ctx context.Context, d // ReProcess process a batch that we have processed before, but we don't have the intermediate state root, so we need to reprocess it func (b *SyncTrustedBatchExecutorForEtrog) ReProcess(ctx context.Context, data *l2_shared.ProcessData, dbTx pgx.Tx) (*l2_shared.ProcessResponse, error) { log.Warnf("%s needs to be reprocessed! deleting batches from this batch, because it was partially processed but the intermediary stateRoot is lost", data.DebugPrefix) - err := b.state.ResetTrustedState(ctx, uint64(data.TrustedBatch.Number)-1, dbTx) + // Check that there are no VirtualBatches neither VerifiedBatches that are newer than this batch + lastVirtualBatchNum, err := b.state.GetLastVirtualBatchNum(ctx, dbTx) + if err != nil { + log.Errorf("%s error getting lastVirtualBatchNum. Error: %v", data.DebugPrefix, err) + return nil, err + } + if lastVirtualBatchNum >= uint64(data.TrustedBatch.Number) { + log.Errorf("%s there are newer or equal virtualBatches than this batch. Can't reprocess because then will delete a virtualBatch", data.DebugPrefix) + return nil, syncinterfaces.ErrMissingSyncFromL1 + } + err = b.state.ResetTrustedState(ctx, uint64(data.TrustedBatch.Number)-1, dbTx) if err != nil { log.Warnf("%s error deleting batches from this batch: %v", data.DebugPrefix, err) return nil, err diff --git a/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync_test.go b/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync_test.go index 01de172a5d..97a7125b96 100644 --- a/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync_test.go +++ b/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync_test.go @@ -196,6 +196,7 @@ func TestNothingProcessDoesntMatchBatchReprocess(t *testing.T) { StateRoot: common.HexToHash(hashExamplesValues[2]), }, } + testData.stateMock.EXPECT().GetLastVirtualBatchNum(testData.ctx, mock.Anything).Return(uint64(122), nil).Maybe() testData.stateMock.EXPECT().ResetTrustedState(testData.ctx, data.BatchNumber-1, mock.Anything).Return(nil).Once() testData.stateMock.EXPECT().OpenBatch(testData.ctx, mock.Anything, mock.Anything).Return(nil).Once() testData.stateMock.EXPECT().GetL1InfoTreeDataFromBatchL2Data(testData.ctx, mock.Anything, mock.Anything).Return(map[uint32]state.L1DataV2{}, common.Hash{}, common.Hash{}, nil).Once() @@ -211,6 +212,35 @@ func TestNothingProcessDoesntMatchBatchReprocess(t *testing.T) { require.NoError(t, err) } +func TestReprocessRejectDeleteVirtualBatch(t *testing.T) { + testData := newTestData(t) + // Arrange + data := l2_shared.ProcessData{ + BatchNumber: 123, + Mode: l2_shared.NothingProcessMode, + BatchMustBeClosed: false, + DebugPrefix: "test", + StateBatch: &state.Batch{ + BatchNumber: 123, + StateRoot: common.HexToHash(hashExamplesValues[1]), + BatchL2Data: []byte{1, 2, 3, 4}, + WIP: true, + }, + TrustedBatch: &types.Batch{ + Number: 123, + StateRoot: common.HexToHash(hashExamplesValues[0]), + BatchL2Data: []byte{1, 2, 3, 4}, + }, + PreviousStateBatch: &state.Batch{ + BatchNumber: 122, + StateRoot: common.HexToHash(hashExamplesValues[2]), + }, + } + testData.stateMock.EXPECT().GetLastVirtualBatchNum(testData.ctx, mock.Anything).Return(uint64(123), nil).Maybe() + _, err := testData.sut.ReProcess(testData.ctx, &data, nil) + require.Error(t, err) +} + func TestNothingProcessIfBatchMustBeClosedThenCloseBatch(t *testing.T) { testData := newTestData(t) // Arrange @@ -265,7 +295,7 @@ func TestCloseBatchGivenAlreadyClosedAndTheDataAreRightThenNoError(t *testing.T) require.NoError(t, res) } -func TestEmptyBatch(t *testing.T) { +func TestEmptyWIPBatch(t *testing.T) { testData := newTestData(t) // Arrange expectedBatch := state.Batch{ diff --git a/synchronizer/l2_sync/l2_sync_etrog/mocks/l1_sync_checker.go b/synchronizer/l2_sync/l2_sync_etrog/mocks/l1_sync_checker.go deleted file mode 100644 index ced3c7a54f..0000000000 --- a/synchronizer/l2_sync/l2_sync_etrog/mocks/l1_sync_checker.go +++ /dev/null @@ -1,89 +0,0 @@ -// Code generated by mockery. DO NOT EDIT. - -package mock_l2_sync_etrog - -import ( - context "context" - - common "github.com/ethereum/go-ethereum/common" - - mock "github.com/stretchr/testify/mock" - - pgx "github.com/jackc/pgx/v4" -) - -// L1SyncChecker is an autogenerated mock type for the L1SyncChecker type -type L1SyncChecker struct { - mock.Mock -} - -type L1SyncChecker_Expecter struct { - mock *mock.Mock -} - -func (_m *L1SyncChecker) EXPECT() *L1SyncChecker_Expecter { - return &L1SyncChecker_Expecter{mock: &_m.Mock} -} - -// CheckL1SyncStatusEnoughToProcessBatch provides a mock function with given fields: ctx, batchNumber, globalExitRoot, dbTx -func (_m *L1SyncChecker) CheckL1SyncStatusEnoughToProcessBatch(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx) error { - ret := _m.Called(ctx, batchNumber, globalExitRoot, dbTx) - - if len(ret) == 0 { - panic("no return value specified for CheckL1SyncStatusEnoughToProcessBatch") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, common.Hash, pgx.Tx) error); ok { - r0 = rf(ctx, batchNumber, globalExitRoot, dbTx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckL1SyncStatusEnoughToProcessBatch' -type L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call struct { - *mock.Call -} - -// CheckL1SyncStatusEnoughToProcessBatch is a helper method to define mock.On call -// - ctx context.Context -// - batchNumber uint64 -// - globalExitRoot common.Hash -// - dbTx pgx.Tx -func (_e *L1SyncChecker_Expecter) CheckL1SyncStatusEnoughToProcessBatch(ctx interface{}, batchNumber interface{}, globalExitRoot interface{}, dbTx interface{}) *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call { - return &L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call{Call: _e.mock.On("CheckL1SyncStatusEnoughToProcessBatch", ctx, batchNumber, globalExitRoot, dbTx)} -} - -func (_c *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call) Run(run func(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx)) *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(common.Hash), args[3].(pgx.Tx)) - }) - return _c -} - -func (_c *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call) Return(_a0 error) *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call) RunAndReturn(run func(context.Context, uint64, common.Hash, pgx.Tx) error) *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call { - _c.Call.Return(run) - return _c -} - -// NewL1SyncChecker creates a new instance of L1SyncChecker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewL1SyncChecker(t interface { - mock.TestingT - Cleanup(func()) -}) *L1SyncChecker { - mock := &L1SyncChecker{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/synchronizer/l2_sync/l2_sync_etrog/mocks/state_interface.go b/synchronizer/l2_sync/l2_sync_etrog/mocks/state_interface.go index 1772294e0c..5101bb4b6a 100644 --- a/synchronizer/l2_sync/l2_sync_etrog/mocks/state_interface.go +++ b/synchronizer/l2_sync/l2_sync_etrog/mocks/state_interface.go @@ -317,6 +317,63 @@ func (_c *StateInterface_GetL1InfoTreeDataFromBatchL2Data_Call) RunAndReturn(run return _c } +// GetLastVirtualBatchNum provides a mock function with given fields: ctx, dbTx +func (_m *StateInterface) GetLastVirtualBatchNum(ctx context.Context, dbTx pgx.Tx) (uint64, error) { + ret := _m.Called(ctx, dbTx) + + if len(ret) == 0 { + panic("no return value specified for GetLastVirtualBatchNum") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, pgx.Tx) (uint64, error)); ok { + return rf(ctx, dbTx) + } + if rf, ok := ret.Get(0).(func(context.Context, pgx.Tx) uint64); ok { + r0 = rf(ctx, dbTx) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(context.Context, pgx.Tx) error); ok { + r1 = rf(ctx, dbTx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// StateInterface_GetLastVirtualBatchNum_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLastVirtualBatchNum' +type StateInterface_GetLastVirtualBatchNum_Call struct { + *mock.Call +} + +// GetLastVirtualBatchNum is a helper method to define mock.On call +// - ctx context.Context +// - dbTx pgx.Tx +func (_e *StateInterface_Expecter) GetLastVirtualBatchNum(ctx interface{}, dbTx interface{}) *StateInterface_GetLastVirtualBatchNum_Call { + return &StateInterface_GetLastVirtualBatchNum_Call{Call: _e.mock.On("GetLastVirtualBatchNum", ctx, dbTx)} +} + +func (_c *StateInterface_GetLastVirtualBatchNum_Call) Run(run func(ctx context.Context, dbTx pgx.Tx)) *StateInterface_GetLastVirtualBatchNum_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(pgx.Tx)) + }) + return _c +} + +func (_c *StateInterface_GetLastVirtualBatchNum_Call) Return(_a0 uint64, _a1 error) *StateInterface_GetLastVirtualBatchNum_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *StateInterface_GetLastVirtualBatchNum_Call) RunAndReturn(run func(context.Context, pgx.Tx) (uint64, error)) *StateInterface_GetLastVirtualBatchNum_Call { + _c.Call.Return(run) + return _c +} + // OpenBatch provides a mock function with given fields: ctx, processingContext, dbTx func (_m *StateInterface) OpenBatch(ctx context.Context, processingContext state.ProcessingContext, dbTx pgx.Tx) error { ret := _m.Called(ctx, processingContext, dbTx) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 04fe6dd6f5..591b2f4357 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -164,7 +164,7 @@ func newL1SyncParallel(ctx context.Context, cfg Config, etherManForL1 []syncinte l1SyncOrchestration := l1_parallel_sync.NewL1SyncOrchestration(ctx, l1DataRetriever, L1DataProcessor) if runExternalControl { log.Infof("Starting external control") - externalControl := newExternalControl(l1DataRetriever, l1SyncOrchestration) + externalControl := newExternalCmdControl(l1DataRetriever, l1SyncOrchestration) externalControl.start() } return l1SyncOrchestration @@ -370,6 +370,7 @@ func (s *ClientSynchronizer) Sync() error { metrics.FullL1SyncTime(time.Since(startL1)) if err != nil { log.Warn("error syncing blocks: ", err) + s.CleanTrustedState() lastEthBlockSynced, err = s.state.GetLastBlock(s.ctx, nil) if err != nil { log.Fatal("error getting lastEthBlockSynced to resume the synchronization... Error: ", err)