From 7f4d54b253b55116bbffd1411e41bc4d4b90117b Mon Sep 17 00:00:00 2001 From: Joan Esteban <129153821+joanestebanr@users.noreply.github.com> Date: Tue, 27 Feb 2024 16:54:06 +0100 Subject: [PATCH] sync reprocess full batch on close, check GER, and fix minor bugs (#3374) * sync reprocess full batch on close * reduce the of live of batch cache from 1hour to 5min * fix reprocess, check virtualBatch before reseting state * #3376 - check GlobalExitRoot before working on a batch * update same data as updateWIPBatch do in the cache of batches --- config/config_test.go | 4 + config/default.go | 1 + docs/config-file/node-config-doc.html | 2 +- docs/config-file/node-config-doc.md | 21 +- docs/config-file/node-config-schema.json | 5 + state/batch.go | 2 +- state/pgstatestorage/l2block.go | 3 +- synchronizer/ext_control.go | 203 ++++++++++++++---- synchronizer/l2_sync/config.go | 3 + .../mocks/l1_sync_global_exit_root_checker.go | 89 ++++++++ .../l2_shared/processor_trusted_batch_sync.go | 46 +++- .../processor_trusted_batch_sync_test.go | 36 +++- .../check_sync_status_to_process_batch.go | 8 +- ...check_sync_status_to_process_batch_test.go | 14 +- .../executor_trusted_batch_sync.go | 62 +++--- .../executor_trusted_batch_sync_test.go | 32 ++- .../l2_sync_etrog/mocks/l1_sync_checker.go | 89 -------- .../l2_sync_etrog/mocks/state_interface.go | 57 +++++ synchronizer/synchronizer.go | 3 +- 19 files changed, 481 insertions(+), 199 deletions(-) create mode 100644 synchronizer/l2_sync/l2_shared/mocks/l1_sync_global_exit_root_checker.go delete mode 100644 synchronizer/l2_sync/l2_sync_etrog/mocks/l1_sync_checker.go diff --git a/config/config_test.go b/config/config_test.go index 7350421fe2..aebc82fe90 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -56,6 +56,10 @@ func Test_Defaults(t *testing.T) { path: "Synchronizer.L2Synchronization.AcceptEmptyClosedBatches", expectedValue: false, }, + { + path: "Synchronizer.L2Synchronization.ReprocessFullBatchOnClose", + expectedValue: true, + }, { path: "Sequencer.DeletePoolTxsL1BlockConfirmations", expectedValue: uint64(100), diff --git a/config/default.go b/config/default.go index c087b0fa6f..e5a77ab641 100644 --- a/config/default.go +++ b/config/default.go @@ -118,6 +118,7 @@ L1SynchronizationMode = "sequential" ApplyAfterNumRollupReceived = 10 [Synchronizer.L2Synchronization] AcceptEmptyClosedBatches = false + ReprocessFullBatchOnClose = true [Sequencer] DeletePoolTxsL1BlockConfirmations = 100 diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index 1ab65c4a31..6b95d5c91b 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -28,7 +28,7 @@
"300ms"
RollupInfoRetriesSpacing is the minimum time between retries to request rollup info (it will sleep for fulfill this time) to avoid spamming L1
"1m"
"300ms"
-
FallbackToSequentialModeOnSynchronized if true switch to sequential mode if the system is synchronized
AcceptEmptyClosedBatches is a flag to enable or disable the acceptance of empty batches.
if true, the synchronizer will accept empty batches and process them.
DeletePoolTxsL1BlockConfirmations is blocks amount after which txs will be deleted from the pool
DeletePoolTxsCheckInterval is frequency with which txs will be checked for deleting
"1m"
+
FallbackToSequentialModeOnSynchronized if true switch to sequential mode if the system is synchronized
AcceptEmptyClosedBatches is a flag to enable or disable the acceptance of empty batches.
if true, the synchronizer will accept empty batches and process them.
ReprocessFullBatchOnClose if is true when a batch is closed is force to reprocess again
DeletePoolTxsL1BlockConfirmations is blocks amount after which txs will be deleted from the pool
DeletePoolTxsCheckInterval is frequency with which txs will be checked for deleting
"1m"
"300ms"
TxLifetimeCheckInterval is the time the sequencer waits to check txs lifetime
"1m"
"300ms"
diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md
index 4492a475cf..2a7c8b9810 100644
--- a/docs/config-file/node-config-doc.md
+++ b/docs/config-file/node-config-doc.md
@@ -1683,9 +1683,10 @@ FallbackToSequentialModeOnSynchronized=false
**Type:** : `object`
**Description:** L2Synchronization Configuration for L2 synchronization
-| Property | Pattern | Type | Deprecated | Definition | Title/Description |
-| --------------------------------------------------------------------------------------- | ------- | ------- | ---------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| - [AcceptEmptyClosedBatches](#Synchronizer_L2Synchronization_AcceptEmptyClosedBatches ) | No | boolean | No | - | AcceptEmptyClosedBatches is a flag to enable or disable the acceptance of empty batches.
if true, the synchronizer will accept empty batches and process them. |
+| Property | Pattern | Type | Deprecated | Definition | Title/Description |
+| ----------------------------------------------------------------------------------------- | ------- | ------- | ---------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| - [AcceptEmptyClosedBatches](#Synchronizer_L2Synchronization_AcceptEmptyClosedBatches ) | No | boolean | No | - | AcceptEmptyClosedBatches is a flag to enable or disable the acceptance of empty batches.
if true, the synchronizer will accept empty batches and process them. |
+| - [ReprocessFullBatchOnClose](#Synchronizer_L2Synchronization_ReprocessFullBatchOnClose ) | No | boolean | No | - | ReprocessFullBatchOnClose if is true when a batch is closed is force to reprocess again |
#### 9.6.1. `Synchronizer.L2Synchronization.AcceptEmptyClosedBatches`
@@ -1702,6 +1703,20 @@ if true, the synchronizer will accept empty batches and process them.
AcceptEmptyClosedBatches=false
```
+#### 9.6.2. `Synchronizer.L2Synchronization.ReprocessFullBatchOnClose`
+
+**Type:** : `boolean`
+
+**Default:** `true`
+
+**Description:** ReprocessFullBatchOnClose if is true when a batch is closed is force to reprocess again
+
+**Example setting the default value** (true):
+```
+[Synchronizer.L2Synchronization]
+ReprocessFullBatchOnClose=true
+```
+
## 10. `[Sequencer]`
**Type:** : `object`
diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json
index aeb3345481..867558ea25 100644
--- a/docs/config-file/node-config-schema.json
+++ b/docs/config-file/node-config-schema.json
@@ -631,6 +631,11 @@
"type": "boolean",
"description": "AcceptEmptyClosedBatches is a flag to enable or disable the acceptance of empty batches.\nif true, the synchronizer will accept empty batches and process them.",
"default": false
+ },
+ "ReprocessFullBatchOnClose": {
+ "type": "boolean",
+ "description": "ReprocessFullBatchOnClose if is true when a batch is closed is force to reprocess again",
+ "default": true
}
},
"additionalProperties": false,
diff --git a/state/batch.go b/state/batch.go
index 09b329ee5f..7cf10ebeab 100644
--- a/state/batch.go
+++ b/state/batch.go
@@ -173,7 +173,7 @@ func (s *State) OpenBatch(ctx context.Context, processingContext ProcessingConte
return err
}
if prevTimestamp.Unix() > processingContext.Timestamp.Unix() {
- return ErrTimestampGE
+ return fmt.Errorf(" oldBatch(%d) tstamp=%d > openingBatch(%d)=%d err: %w", lastBatchNum, prevTimestamp.Unix(), processingContext.BatchNumber, processingContext.Timestamp.Unix(), ErrTimestampGE)
}
return s.OpenBatchInStorage(ctx, processingContext, dbTx)
}
diff --git a/state/pgstatestorage/l2block.go b/state/pgstatestorage/l2block.go
index f86b10d26d..fac23ce6ce 100644
--- a/state/pgstatestorage/l2block.go
+++ b/state/pgstatestorage/l2block.go
@@ -197,7 +197,8 @@ func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2
}
uncles = string(unclesBytes)
}
-
+ l2blockNumber := l2Block.Number().Uint64()
+ log.Debugf("[AddL2Block] adding L2 block %d", l2blockNumber)
if _, err := e.Exec(ctx, addL2BlockSQL,
l2Block.Number().Uint64(), l2Block.Hash().String(), header, uncles,
l2Block.ParentHash().String(), l2Block.Root().String(),
diff --git a/synchronizer/ext_control.go b/synchronizer/ext_control.go
index 0ca2840dca..289d0e27ea 100644
--- a/synchronizer/ext_control.go
+++ b/synchronizer/ext_control.go
@@ -2,6 +2,8 @@ package synchronizer
import (
"bufio"
+ "errors"
+ "fmt"
"io"
"os"
"strconv"
@@ -14,6 +16,7 @@ import (
const (
externalControlFilename = "/tmp/synchronizer_in"
+ externalOutputFilename = "/tmp/synchronizer_out"
filePermissions = 0644
sleepTimeToReadFile = 500 * time.Millisecond
)
@@ -28,16 +31,58 @@ const (
// example of usage (first you need to run the service):
// echo "l1_producer_stop" >> /tmp/synchronizer_in
// echo "l1_orchestrator_reset|8577060" >> /tmp/synchronizer_in
-type externalControl struct {
- producer *l1_parallel_sync.L1RollupInfoProducer
- orquestrator *l1_parallel_sync.L1SyncOrchestration
+
+// ExtCmdArgs is the type of the arguments of the command
+type ExtCmdArgs []string
+
+// ExtControlCmd is the interface of the external command
+type ExtControlCmd interface {
+ // FunctionName returns the name of the function to be called example: "l1_producer_stop"
+ FunctionName() string
+ // ValidateArguments validates the arguments of the command, returns nil if ok, error if not
+ ValidateArguments(ExtCmdArgs) error
+ // Process the command
+ // args: the arguments of the command
+ // return: string with the output and an error
+ Process(ExtCmdArgs) (string, error)
+ // Help returns the help of the command
+ Help() string
+}
+
+type externalCmdControl struct {
+ //producer *l1_parallel_sync.L1RollupInfoProducer
+ //orquestrator *l1_parallel_sync.L1SyncOrchestration
+ RegisteredCmds map[string]ExtControlCmd
+}
+
+func newExternalCmdControl(producer *l1_parallel_sync.L1RollupInfoProducer, orquestrator *l1_parallel_sync.L1SyncOrchestration) *externalCmdControl {
+ res := &externalCmdControl{
+ RegisteredCmds: make(map[string]ExtControlCmd),
+ }
+ res.RegisterCmd(&helpCmd{externalControl: res})
+ res.RegisterCmd(&l1OrchestratorResetCmd{orquestrator: orquestrator})
+ res.RegisterCmd(&l1ProducerStopCmd{producer: producer})
+ return res
}
-func newExternalControl(producer *l1_parallel_sync.L1RollupInfoProducer, orquestrator *l1_parallel_sync.L1SyncOrchestration) *externalControl {
- return &externalControl{producer: producer, orquestrator: orquestrator}
+// RegisterCmd registers a command
+func (e *externalCmdControl) RegisterCmd(cmd ExtControlCmd) {
+ if e.RegisteredCmds == nil {
+ e.RegisteredCmds = make(map[string]ExtControlCmd)
+ }
+ e.RegisteredCmds[cmd.FunctionName()] = cmd
}
-func (e *externalControl) start() {
+// GetCmd returns a command by its name
+func (e *externalCmdControl) GetCmd(functionName string) (ExtControlCmd, error) {
+ cmd, ok := e.RegisteredCmds[functionName]
+ if !ok {
+ return nil, errors.New("command not found")
+ }
+ return cmd, nil
+}
+
+func (e *externalCmdControl) start() {
log.Infof("EXT:start: starting external control opening %s", externalControlFilename)
file, err := os.OpenFile(externalControlFilename, os.O_APPEND|os.O_CREATE|os.O_RDONLY, filePermissions)
if err != nil {
@@ -52,7 +97,7 @@ func (e *externalControl) start() {
}
// https://medium.com/@arunprabhu.1/tailing-a-file-in-golang-72944204f22b
-func (e *externalControl) readFile(file *os.File) {
+func (e *externalCmdControl) readFile(file *os.File) {
defer file.Close()
reader := bufio.NewReader(file)
for {
@@ -65,66 +110,134 @@ func (e *externalControl) readFile(file *os.File) {
time.Sleep(sleepTimeToReadFile)
continue
}
-
break
}
log.Infof("EXT:readFile: new command: %s", line)
- e.process(line)
+ cmd, cmdArgs, err := e.parse(line)
+ if err != nil {
+ log.Warnf("EXT:readFile: error parsing command %s:err %s", line, err)
+ continue
+ }
+ e.process(cmd, cmdArgs)
}
}
}
-func (e *externalControl) process(line string) {
+func (e *externalCmdControl) parse(line string) (ExtControlCmd, ExtCmdArgs, error) {
cmd := strings.Split(line, "|")
if len(cmd) < 1 {
- return
+ return nil, nil, errors.New("invalid command")
}
- switch strings.TrimSpace(cmd[0]) {
- case "l1_producer_stop":
- e.cmdL1ProducerStop(cmd[1:])
- case "l1_orchestrator_reset":
- e.cmdL1OrchestratorReset(cmd[1:])
- case "l1_orchestrator_stop":
- e.cmdL1OrchestratorAbort(cmd[1:])
- default:
- log.Warnf("EXT:process: unknown command: %s", cmd[0])
+ functionName := strings.TrimSpace(cmd[0])
+ args := cmd[1:]
+ cmdObj, err := e.GetCmd(functionName)
+ if err != nil {
+ return nil, nil, err
}
+ err = cmdObj.ValidateArguments(args)
+ if err != nil {
+ return nil, nil, err
+ }
+ return cmdObj, args, nil
}
-func (e *externalControl) cmdL1OrchestratorReset(args []string) {
- log.Infof("EXT:cmdL1OrchestratorReset: %s", args)
- if len(args) < 1 {
- log.Infof("EXT:cmdL1OrchestratorReset: missing block number")
+func (e *externalCmdControl) process(cmd ExtControlCmd, args ExtCmdArgs) {
+ fullFunc, err := fmt.Printf("%s(%s)", cmd.FunctionName(), strings.Join(args, ","))
+ if err != nil {
+ log.Warnf("EXT:readFile: error composing cmd %s:err %s", cmd.FunctionName(), err)
return
}
- blockNumber, err := strconv.ParseUint(strings.TrimSpace(args[0]), 10, 64)
+ output, err := cmd.Process(args)
if err != nil {
- log.Infof("EXT:cmdL1OrchestratorReset: error parsing block number: %s", err)
+ log.Warnf("EXT:readFile: error processing command %s:err %s", fullFunc, err)
return
}
- log.Infof("EXT:cmdL1OrchestratorReset: calling orchestrator reset(%d)", blockNumber)
- e.orquestrator.Reset(blockNumber)
- log.Infof("EXT:cmdL1OrchestratorReset: calling orchestrator reset(%d) returned", blockNumber)
+ log.Warnf("EXT:readFile: command %s processed with output: %s", fullFunc, output)
}
-func (e *externalControl) cmdL1OrchestratorAbort(args []string) {
- log.Infof("EXT:cmdL1OrchestratorAbort: %s", args)
- if e.orquestrator == nil {
- log.Infof("EXT:cmdL1OrchestratorAbort: orquestrator is nil")
- return
+// COMMANDS IMPLEMENTATION
+// HELP
+type helpCmd struct {
+ externalControl *externalCmdControl
+}
+
+func (h *helpCmd) FunctionName() string {
+ return "help"
+}
+func (h *helpCmd) ValidateArguments(args ExtCmdArgs) error {
+ if len(args) > 0 {
+ return errors.New(h.FunctionName() + " command does not accept arguments")
}
- log.Infof("EXT:cmdL1OrchestratorAbort: calling orquestrator stop")
- e.orquestrator.Abort()
- log.Infof("EXT:cmdL1OrchestratorAbort: calling orquestrator stop returned")
+ return nil
}
-func (e *externalControl) cmdL1ProducerStop(args []string) {
- log.Infof("EXT:cmdL1Stop: %s", args)
- if e.producer == nil {
- log.Infof("EXT:cmdL1Stop: producer is nil")
- return
+func (h *helpCmd) Process(args ExtCmdArgs) (string, error) {
+ var help string
+ for _, cmd := range h.externalControl.RegisteredCmds {
+ help += cmd.Help() + "\n"
}
- log.Infof("EXT:cmdL1Stop: calling producer stop")
- e.producer.Stop()
- log.Infof("EXT:cmdL1Stop: calling producer stop returned")
+ return help, nil
+}
+func (h *helpCmd) Help() string {
+ return h.FunctionName() + ": show the help of the commands"
+}
+
+// COMMANDS "l1_orchestrator_reset"
+type l1OrchestratorResetCmd struct {
+ orquestrator *l1_parallel_sync.L1SyncOrchestration
+}
+
+func (h *l1OrchestratorResetCmd) FunctionName() string {
+ return "l1_orchestrator_reset"
+}
+
+func (h *l1OrchestratorResetCmd) ValidateArguments(args ExtCmdArgs) error {
+ if len(args) != 1 {
+ return errors.New(h.FunctionName() + " needs 1 argument")
+ }
+ _, err := strconv.ParseUint(strings.TrimSpace(args[0]), 10, 64)
+ if err != nil {
+ return fmt.Errorf("error parsing block number: %s err:%w", args[0], err)
+ }
+ return nil
+}
+func (h *l1OrchestratorResetCmd) Process(args ExtCmdArgs) (string, error) {
+ blockNumber, err := strconv.ParseUint(strings.TrimSpace(args[0]), 10, 64)
+ if err != nil {
+ return "error param", err
+ }
+ log.Warnf("EXT:"+h.FunctionName()+": calling orchestrator reset(%d)", blockNumber)
+ h.orquestrator.Reset(blockNumber)
+ res := fmt.Sprintf("EXT: "+h.FunctionName()+": reset to block %d", blockNumber)
+ return res, nil
+}
+
+func (h *l1OrchestratorResetCmd) Help() string {
+ return h.FunctionName() + ": reset L1 parallel sync orchestrator to a given block number"
+}
+
+// COMMANDS l1_producer_stop
+type l1ProducerStopCmd struct {
+ producer *l1_parallel_sync.L1RollupInfoProducer
+}
+
+func (h *l1ProducerStopCmd) FunctionName() string {
+ return "l1_producer_stop"
+}
+
+func (h *l1ProducerStopCmd) ValidateArguments(args ExtCmdArgs) error {
+ if len(args) > 0 {
+ return errors.New(h.FunctionName() + " command does not accept arguments")
+ }
+ return nil
+}
+func (h *l1ProducerStopCmd) Process(args ExtCmdArgs) (string, error) {
+ log.Warnf("EXT:" + h.FunctionName() + ": calling producer stop")
+ h.producer.Stop()
+ res := "EXT: " + h.FunctionName() + ": producer stopped"
+ return res, nil
+}
+
+func (h *l1ProducerStopCmd) Help() string {
+ return h.FunctionName() + ": stop L1 rollup info producer"
}
diff --git a/synchronizer/l2_sync/config.go b/synchronizer/l2_sync/config.go
index 292d726caf..7166765b88 100644
--- a/synchronizer/l2_sync/config.go
+++ b/synchronizer/l2_sync/config.go
@@ -5,4 +5,7 @@ type Config struct {
// AcceptEmptyClosedBatches is a flag to enable or disable the acceptance of empty batches.
// if true, the synchronizer will accept empty batches and process them.
AcceptEmptyClosedBatches bool `mapstructure:"AcceptEmptyClosedBatches"`
+
+ // ReprocessFullBatchOnClose if is true when a batch is closed is force to reprocess again
+ ReprocessFullBatchOnClose bool `mapstructure:"ReprocessFullBatchOnClose"`
}
diff --git a/synchronizer/l2_sync/l2_shared/mocks/l1_sync_global_exit_root_checker.go b/synchronizer/l2_sync/l2_shared/mocks/l1_sync_global_exit_root_checker.go
new file mode 100644
index 0000000000..0c50000d4d
--- /dev/null
+++ b/synchronizer/l2_sync/l2_shared/mocks/l1_sync_global_exit_root_checker.go
@@ -0,0 +1,89 @@
+// Code generated by mockery. DO NOT EDIT.
+
+package mock_l2_shared
+
+import (
+ context "context"
+
+ common "github.com/ethereum/go-ethereum/common"
+
+ mock "github.com/stretchr/testify/mock"
+
+ pgx "github.com/jackc/pgx/v4"
+)
+
+// L1SyncGlobalExitRootChecker is an autogenerated mock type for the L1SyncGlobalExitRootChecker type
+type L1SyncGlobalExitRootChecker struct {
+ mock.Mock
+}
+
+type L1SyncGlobalExitRootChecker_Expecter struct {
+ mock *mock.Mock
+}
+
+func (_m *L1SyncGlobalExitRootChecker) EXPECT() *L1SyncGlobalExitRootChecker_Expecter {
+ return &L1SyncGlobalExitRootChecker_Expecter{mock: &_m.Mock}
+}
+
+// CheckL1SyncGlobalExitRootEnoughToProcessBatch provides a mock function with given fields: ctx, batchNumber, globalExitRoot, dbTx
+func (_m *L1SyncGlobalExitRootChecker) CheckL1SyncGlobalExitRootEnoughToProcessBatch(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx) error {
+ ret := _m.Called(ctx, batchNumber, globalExitRoot, dbTx)
+
+ if len(ret) == 0 {
+ panic("no return value specified for CheckL1SyncGlobalExitRootEnoughToProcessBatch")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(context.Context, uint64, common.Hash, pgx.Tx) error); ok {
+ r0 = rf(ctx, batchNumber, globalExitRoot, dbTx)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckL1SyncGlobalExitRootEnoughToProcessBatch'
+type L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call struct {
+ *mock.Call
+}
+
+// CheckL1SyncGlobalExitRootEnoughToProcessBatch is a helper method to define mock.On call
+// - ctx context.Context
+// - batchNumber uint64
+// - globalExitRoot common.Hash
+// - dbTx pgx.Tx
+func (_e *L1SyncGlobalExitRootChecker_Expecter) CheckL1SyncGlobalExitRootEnoughToProcessBatch(ctx interface{}, batchNumber interface{}, globalExitRoot interface{}, dbTx interface{}) *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call {
+ return &L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call{Call: _e.mock.On("CheckL1SyncGlobalExitRootEnoughToProcessBatch", ctx, batchNumber, globalExitRoot, dbTx)}
+}
+
+func (_c *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call) Run(run func(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx)) *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(context.Context), args[1].(uint64), args[2].(common.Hash), args[3].(pgx.Tx))
+ })
+ return _c
+}
+
+func (_c *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call) Return(_a0 error) *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call) RunAndReturn(run func(context.Context, uint64, common.Hash, pgx.Tx) error) *L1SyncGlobalExitRootChecker_CheckL1SyncGlobalExitRootEnoughToProcessBatch_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// NewL1SyncGlobalExitRootChecker creates a new instance of L1SyncGlobalExitRootChecker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewL1SyncGlobalExitRootChecker(t interface {
+ mock.TestingT
+ Cleanup(func())
+}) *L1SyncGlobalExitRootChecker {
+ mock := &L1SyncGlobalExitRootChecker{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/synchronizer/l2_sync/l2_shared/processor_trusted_batch_sync.go b/synchronizer/l2_sync/l2_shared/processor_trusted_batch_sync.go
index 6f5c2b48c1..94535ebe4e 100644
--- a/synchronizer/l2_sync/l2_shared/processor_trusted_batch_sync.go
+++ b/synchronizer/l2_sync/l2_shared/processor_trusted_batch_sync.go
@@ -127,24 +127,31 @@ type SyncTrustedBatchExecutor interface {
NothingProcess(ctx context.Context, data *ProcessData, dbTx pgx.Tx) (*ProcessResponse, error)
}
+// L1SyncGlobalExitRootChecker is the interface to check if the required GlobalExitRoot is already synced from L1
+type L1SyncGlobalExitRootChecker interface {
+ CheckL1SyncGlobalExitRootEnoughToProcessBatch(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx) error
+}
+
// ProcessorTrustedBatchSync is a template to sync trusted state. It classify what kind of update is needed and call to SyncTrustedStateBatchExecutorSteps
//
// that is the one that execute the sync process
//
// the real implementation of the steps is in the SyncTrustedStateBatchExecutorSteps interface that known how to process a batch
type ProcessorTrustedBatchSync struct {
- Steps SyncTrustedBatchExecutor
- timeProvider syncCommon.TimeProvider
- Cfg l2_sync.Config
+ Steps SyncTrustedBatchExecutor
+ timeProvider syncCommon.TimeProvider
+ l1SyncChecker L1SyncGlobalExitRootChecker
+ Cfg l2_sync.Config
}
// NewProcessorTrustedBatchSync creates a new SyncTrustedStateBatchExecutorTemplate
func NewProcessorTrustedBatchSync(steps SyncTrustedBatchExecutor,
- timeProvider syncCommon.TimeProvider, cfg l2_sync.Config) *ProcessorTrustedBatchSync {
+ timeProvider syncCommon.TimeProvider, l1SyncChecker L1SyncGlobalExitRootChecker, cfg l2_sync.Config) *ProcessorTrustedBatchSync {
return &ProcessorTrustedBatchSync{
- Steps: steps,
- timeProvider: timeProvider,
- Cfg: cfg,
+ Steps: steps,
+ timeProvider: timeProvider,
+ l1SyncChecker: l1SyncChecker,
+ Cfg: cfg,
}
}
@@ -152,6 +159,15 @@ func NewProcessorTrustedBatchSync(steps SyncTrustedBatchExecutor,
func (s *ProcessorTrustedBatchSync) ProcessTrustedBatch(ctx context.Context, trustedBatch *types.Batch, status TrustedState, dbTx pgx.Tx, debugPrefix string) (*TrustedState, error) {
log.Debugf("%s Processing trusted batch: %v", debugPrefix, trustedBatch.Number)
stateCurrentBatch, statePreviousBatch := s.GetCurrentAndPreviousBatchFromCache(&status)
+ if s.l1SyncChecker != nil {
+ err := s.l1SyncChecker.CheckL1SyncGlobalExitRootEnoughToProcessBatch(ctx, uint64(trustedBatch.Number), trustedBatch.GlobalExitRoot, dbTx)
+ if err != nil {
+ log.Errorf("%s error checking GlobalExitRoot from TrustedBatch. Error: ", debugPrefix, err)
+ return nil, err
+ }
+ } else {
+ log.Infof("Disabled check L1 sync status for process batch")
+ }
processMode, err := s.GetModeForProcessBatch(trustedBatch, stateCurrentBatch, statePreviousBatch, debugPrefix)
if err != nil {
log.Error("%s error getting processMode. Error: ", debugPrefix, trustedBatch.Number, err)
@@ -275,7 +291,7 @@ func (s *ProcessorTrustedBatchSync) GetModeForProcessBatch(trustedNodeBatch *typ
Description: "Batch is not on database, so is the first time we process it",
}
} else {
- _, strDiffsBatches := AreEqualStateBatchAndTrustedBatch(stateBatch, trustedNodeBatch, CMP_BATCH_IGNORE_TSTAMP)
+ areBatchesExactlyEqual, strDiffsBatches := AreEqualStateBatchAndTrustedBatch(stateBatch, trustedNodeBatch, CMP_BATCH_IGNORE_TSTAMP)
newL2DataFlag, err := ThereAreNewBatchL2Data(stateBatch.BatchL2Data, trustedNodeBatch.BatchL2Data)
if err != nil {
return ProcessData{}, err
@@ -288,6 +304,10 @@ func (s *ProcessorTrustedBatchSync) GetModeForProcessBatch(trustedNodeBatch *typ
BatchMustBeClosed: isTrustedBatchClosed(trustedNodeBatch) && stateBatch.WIP,
Description: "no new data on batch. Diffs: " + strDiffsBatches,
}
+ if areBatchesExactlyEqual {
+ result.BatchMustBeClosed = false
+ result.Description = "exactly batches: " + strDiffsBatches
+ }
} else {
// We have a previous batch, but in node something change
// We have processed this batch before, and we have the intermediate state root, so is going to be process only new Tx.
@@ -310,6 +330,16 @@ func (s *ProcessorTrustedBatchSync) GetModeForProcessBatch(trustedNodeBatch *typ
}
}
+ if s.Cfg.ReprocessFullBatchOnClose && result.BatchMustBeClosed {
+ if result.Mode == IncrementalProcessMode || result.Mode == NothingProcessMode {
+ result.Description = "forced reprocess due to batch closed and ReprocessFullBatchOnClose"
+ log.Infof("%s Batch %v: Converted mode %s to %s because cfg.ReprocessFullBatchOnClose", debugPrefix, trustedNodeBatch.Number, result.Mode, ReprocessProcessMode)
+ result.Mode = ReprocessProcessMode
+ result.OldStateRoot = statePreviousBatch.StateRoot
+ result.BatchMustBeClosed = true
+ }
+ }
+
if result.Mode == "" {
return result, fmt.Errorf("batch %v: failed to get mode for process ", trustedNodeBatch.Number)
}
diff --git a/synchronizer/l2_sync/l2_shared/tests/processor_trusted_batch_sync_test.go b/synchronizer/l2_sync/l2_shared/tests/processor_trusted_batch_sync_test.go
index 9b5ac1f466..79e62c93d5 100644
--- a/synchronizer/l2_sync/l2_shared/tests/processor_trusted_batch_sync_test.go
+++ b/synchronizer/l2_sync/l2_shared/tests/processor_trusted_batch_sync_test.go
@@ -27,7 +27,8 @@ var (
func TestCacheEmpty(t *testing.T) {
mockExecutor := mock_l2_shared.NewSyncTrustedBatchExecutor(t)
mockTimer := &commonSync.MockTimerProvider{}
- sut := l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, cfg)
+ mockL1SyncChecker := mock_l2_shared.NewL1SyncGlobalExitRootChecker(t)
+ sut := l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, mockL1SyncChecker, cfg)
current, previous := sut.GetCurrentAndPreviousBatchFromCache(&l2_shared.TrustedState{
LastTrustedBatches: []*state.Batch{nil, nil},
@@ -57,7 +58,7 @@ func TestCacheJustCurrent(t *testing.T) {
status := l2_shared.TrustedState{
LastTrustedBatches: []*state.Batch{&batchA},
}
- sut := l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, cfg)
+ sut := l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, nil, cfg)
current, previous := sut.GetCurrentAndPreviousBatchFromCache(&status)
require.Nil(t, previous)
@@ -75,7 +76,7 @@ func TestCacheJustPrevious(t *testing.T) {
status := l2_shared.TrustedState{
LastTrustedBatches: []*state.Batch{nil, &batchA},
}
- sut := l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, cfg)
+ sut := l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, nil, cfg)
current, previous := sut.GetCurrentAndPreviousBatchFromCache(&status)
require.Nil(t, current)
@@ -98,7 +99,7 @@ func newTestDataForProcessorTrustedBatchSync(t *testing.T) *TestDataForProcessor
return &TestDataForProcessorTrustedBatchSync{
mockTimer: mockTimer,
mockExecutor: mockExecutor,
- sut: l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, cfg),
+ sut: l2_shared.NewProcessorTrustedBatchSync(mockExecutor, mockTimer, nil, cfg),
stateCurrentBatch: &state.Batch{
BatchNumber: 123,
Coinbase: common.HexToAddress("0x1230"),
@@ -211,6 +212,7 @@ func TestGetModeForProcessBatchNothing(t *testing.T) {
func TestGetModeForEmptyAndClosedBatchConfiguredToReject(t *testing.T) {
testData := newTestDataForProcessorTrustedBatchSync(t)
testData.sut.Cfg.AcceptEmptyClosedBatches = false
+ testData.sut.Cfg.ReprocessFullBatchOnClose = true
testData.stateCurrentBatch.WIP = true
testData.trustedNodeBatch.Closed = true
processData, err := testData.sut.GetModeForProcessBatch(testData.trustedNodeBatch, testData.stateCurrentBatch, testData.statePreviousBatch, "test")
@@ -241,6 +243,32 @@ func TestGetModeForEmptyAndClosedBatchConfiguredToReject(t *testing.T) {
require.Error(t, err)
}
+func TestGetModeReprocessFullBatchOnCloseTrue(t *testing.T) {
+ testData := newTestDataForProcessorTrustedBatchSync(t)
+ testData.sut.Cfg.AcceptEmptyClosedBatches = true
+ testData.sut.Cfg.ReprocessFullBatchOnClose = true
+ testData.stateCurrentBatch.WIP = true
+ testData.stateCurrentBatch.BatchL2Data = common.Hex2Bytes("112233")
+ testData.trustedNodeBatch.BatchL2Data = common.Hex2Bytes("11223344")
+ testData.trustedNodeBatch.Closed = true
+ // Is a incremental converted to reprocess
+ testData.sut.Cfg.ReprocessFullBatchOnClose = true
+ processData, err := testData.sut.GetModeForProcessBatch(testData.trustedNodeBatch, testData.stateCurrentBatch, testData.statePreviousBatch, "test")
+ require.NoError(t, err)
+ require.Equal(t, l2_shared.ReprocessProcessMode, processData.Mode, "current batch and trusted batch are the same, just need to be closed")
+ // Is a incremental to close
+ testData.sut.Cfg.ReprocessFullBatchOnClose = false
+ processData, err = testData.sut.GetModeForProcessBatch(testData.trustedNodeBatch, testData.stateCurrentBatch, testData.statePreviousBatch, "test")
+ require.NoError(t, err)
+ require.Equal(t, l2_shared.IncrementalProcessMode, processData.Mode, "increment of batchl2data, need to incremental execution")
+ // No previous batch, is a fullprocess
+ testData.sut.Cfg.ReprocessFullBatchOnClose = true
+ processData, err = testData.sut.GetModeForProcessBatch(testData.trustedNodeBatch, nil, testData.statePreviousBatch, "test")
+ require.NoError(t, err)
+ require.Equal(t, l2_shared.FullProcessMode, processData.Mode, "no previous batch and close, fullprocess")
+
+}
+
func TestGetNextStatusClear(t *testing.T) {
testData := newTestDataForProcessorTrustedBatchSync(t)
previousStatus := l2_shared.TrustedState{
diff --git a/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch.go b/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch.go
index da6fc75bdd..019c487231 100644
--- a/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch.go
+++ b/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch.go
@@ -33,11 +33,11 @@ func NewCheckSyncStatusToProcessBatch(zkevmRPCClient syncinterfaces.ZKEVMClientG
}
}
-// CheckL1SyncStatusEnoughToProcessBatch returns nil if the are sync and could process the batch
+// CheckL1SyncGlobalExitRootEnoughToProcessBatch returns nil if the are sync and could process the batch
// if not:
// - returns syncinterfaces.ErrMissingSyncFromL1 if we are behind the block number that contains the GlobalExitRoot
// - returns l2_shared.NewDeSyncPermissionlessAndTrustedNodeError if trusted and and permissionless are not in same page! pass also the discrepance point
-func (c *CheckSyncStatusToProcessBatch) CheckL1SyncStatusEnoughToProcessBatch(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx) error {
+func (c *CheckSyncStatusToProcessBatch) CheckL1SyncGlobalExitRootEnoughToProcessBatch(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx) error {
// Find out if this node have GlobalExitRoot
// If not: ask to zkevm-RPC the block number of this GlobalExitRoot
// If we are behind this block number returns ErrMissingSyncFromL1
@@ -47,14 +47,14 @@ func (c *CheckSyncStatusToProcessBatch) CheckL1SyncStatusEnoughToProcessBatch(ct
return nil
}
debugStr := fmt.Sprintf("CheckL1SyncStatusEnoughToProcessBatch batchNumber:%d globalExitRoot: %s ", batchNumber, globalExitRoot.Hex())
- _, err := c.state.GetExitRootByGlobalExitRoot(ctx, globalExitRoot, dbTx)
+ localGERInfo, err := c.state.GetExitRootByGlobalExitRoot(ctx, globalExitRoot, dbTx)
if err != nil && !errors.Is(err, state.ErrNotFound) {
log.Errorf("error getting GetExitRootByGlobalExitRoot %s . Error: ", debugStr, err)
return err
}
if err == nil {
// We have this GlobalExitRoot, so we are synced from L1
- log.Infof("We have this GlobalExitRoot, so we are synced from L1 %s", debugStr)
+ log.Infof("We have this GlobalExitRoot (%s) in L1block %d, so we are synced from L1 %s", globalExitRoot.String(), localGERInfo.BlockNumber, debugStr)
return nil
}
// this means err != state.ErrNotFound -> so we have to ask to zkevm-RPC the block number of this GlobalExitRoot
diff --git a/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch_test.go b/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch_test.go
index 9acee59955..84d11dddc8 100644
--- a/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch_test.go
+++ b/synchronizer/l2_sync/l2_sync_etrog/check_sync_status_to_process_batch_test.go
@@ -42,13 +42,13 @@ func NewTestData(t *testing.T) *testData {
func TestCheckL1SyncStatusEnoughToProcessBatchGerZero(t *testing.T) {
testData := NewTestData(t)
- err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, state.ZeroHash, nil)
+ err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, state.ZeroHash, nil)
require.NoError(t, err)
}
func TestCheckL1SyncStatusEnoughToProcessBatchGerOnDB(t *testing.T) {
testData := NewTestData(t)
testData.stateMock.EXPECT().GetExitRootByGlobalExitRoot(testData.ctx, globalExitRootNonZero, nil).Return(&state.GlobalExitRoot{}, nil).Once()
- err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil)
+ err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil)
require.NoError(t, err)
}
@@ -57,7 +57,7 @@ func TestCheckL1SyncStatusEnoughToProcessBatchGerDatabaseFails(t *testing.T) {
testData.stateMock.EXPECT().GetExitRootByGlobalExitRoot(testData.ctx, globalExitRootNonZero, nil).Return(nil, randomError).Once()
- err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil)
+ err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil)
require.Error(t, err)
}
@@ -67,7 +67,7 @@ func TestCheckL1SyncStatusEnoughToProcessBatchGerNoOnDBFailsCallToZkevm(t *testi
testData.stateMock.EXPECT().GetExitRootByGlobalExitRoot(testData.ctx, globalExitRootNonZero, nil).Return(nil, state.ErrNotFound).Once()
testData.zkevmMock.EXPECT().ExitRootsByGER(testData.ctx, globalExitRootNonZero).Return(nil, randomError).Once()
- err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil)
+ err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil)
require.Error(t, err)
}
@@ -79,7 +79,7 @@ func TestCheckL1SyncStatusEnoughToProcessBatchGerNoOnDBWeAre1BlockBehind(t *test
testData.zkevmMock.EXPECT().ExitRootsByGER(testData.ctx, globalExitRootNonZero).Return(&types.ExitRoots{BlockNumber: types.ArgUint64(l1Block)}, nil).Once()
testData.stateMock.EXPECT().GetLastBlock(testData.ctx, nil).Return(&state.Block{BlockNumber: l1Block - 1}, nil).Once()
- err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil)
+ err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil)
require.ErrorIs(t, err, syncinterfaces.ErrMissingSyncFromL1)
}
@@ -91,7 +91,7 @@ func TestCheckL1SyncStatusEnoughToProcessBatchGerNoOnDBWeAre1BlockBeyond(t *test
testData.zkevmMock.EXPECT().ExitRootsByGER(testData.ctx, globalExitRootNonZero).Return(&types.ExitRoots{BlockNumber: types.ArgUint64(l1Block)}, nil).Once()
testData.stateMock.EXPECT().GetLastBlock(testData.ctx, nil).Return(&state.Block{BlockNumber: l1Block + 1}, nil).Once()
- err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil)
+ err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil)
require.ErrorIs(t, err, syncinterfaces.ErrFatalDesyncFromL1)
l1BlockNumber := err.(*l2_shared.DeSyncPermissionlessAndTrustedNodeError).L1BlockNumber
require.Equal(t, l1Block, l1BlockNumber, "returns the block where is the discrepancy")
@@ -105,7 +105,7 @@ func TestCheckL1SyncStatusEnoughToProcessBatchGerNoOnDBWeAreLastBlockSynced(t *t
testData.zkevmMock.EXPECT().ExitRootsByGER(testData.ctx, globalExitRootNonZero).Return(&types.ExitRoots{BlockNumber: types.ArgUint64(l1Block)}, nil).Once()
testData.stateMock.EXPECT().GetLastBlock(testData.ctx, nil).Return(&state.Block{BlockNumber: l1Block}, nil).Once()
- err := testData.sut.CheckL1SyncStatusEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil)
+ err := testData.sut.CheckL1SyncGlobalExitRootEnoughToProcessBatch(testData.ctx, 1, globalExitRootNonZero, nil)
require.ErrorIs(t, err, syncinterfaces.ErrFatalDesyncFromL1)
l1BlockNumber := err.(*l2_shared.DeSyncPermissionlessAndTrustedNodeError).L1BlockNumber
require.Equal(t, l1Block, l1BlockNumber, "returns the block where is the discrepancy")
diff --git a/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync.go b/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync.go
index 05bfdf915d..1975f6c42b 100644
--- a/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync.go
+++ b/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync.go
@@ -18,6 +18,10 @@ import (
"github.com/jackc/pgx/v4"
)
+const (
+ timeOfLiveBatchOnCache = 5 * time.Minute
+)
+
var (
// ErrNotImplemented is returned when a method is not implemented
ErrNotImplemented = errors.New("not implemented")
@@ -41,34 +45,28 @@ type StateInterface interface {
ProcessBatchV2(ctx context.Context, request state.ProcessRequest, updateMerkleTree bool) (*state.ProcessBatchResponse, error)
StoreL2Block(ctx context.Context, batchNumber uint64, l2Block *state.ProcessBlockResponse, txsEGPLog []*state.EffectiveGasPriceLog, dbTx pgx.Tx) error
GetL1InfoTreeDataFromBatchL2Data(ctx context.Context, batchL2Data []byte, dbTx pgx.Tx) (map[uint32]state.L1DataV2, common.Hash, common.Hash, error)
-}
-
-// L1SyncChecker is the interface to check if we are synced from L1 to process a batch
-type L1SyncChecker interface {
- CheckL1SyncStatusEnoughToProcessBatch(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx) error
+ GetLastVirtualBatchNum(ctx context.Context, dbTx pgx.Tx) (uint64, error)
}
// SyncTrustedBatchExecutorForEtrog is the implementation of the SyncTrustedStateBatchExecutorSteps that
// have the functions to sync a fullBatch, incrementalBatch and reprocessBatch
type SyncTrustedBatchExecutorForEtrog struct {
- state StateInterface
- sync syncinterfaces.SynchronizerFlushIDManager
- l1SyncChecker L1SyncChecker
+ state StateInterface
+ sync syncinterfaces.SynchronizerFlushIDManager
}
// NewSyncTrustedBatchExecutorForEtrog creates a new prcessor for sync with L2 batches
func NewSyncTrustedBatchExecutorForEtrog(zkEVMClient syncinterfaces.ZKEVMClientTrustedBatchesGetter,
state l2_shared.StateInterface, stateBatchExecutor StateInterface,
- sync syncinterfaces.SynchronizerFlushIDManager, timeProvider syncCommon.TimeProvider, l1SyncChecker L1SyncChecker,
+ sync syncinterfaces.SynchronizerFlushIDManager, timeProvider syncCommon.TimeProvider, l1SyncChecker l2_shared.L1SyncGlobalExitRootChecker,
cfg l2_sync.Config) *l2_shared.TrustedBatchesRetrieve {
executorSteps := &SyncTrustedBatchExecutorForEtrog{
- state: stateBatchExecutor,
- sync: sync,
- l1SyncChecker: l1SyncChecker,
+ state: stateBatchExecutor,
+ sync: sync,
}
- executor := l2_shared.NewProcessorTrustedBatchSync(executorSteps, timeProvider, cfg)
- a := l2_shared.NewTrustedBatchesRetrieve(executor, zkEVMClient, state, sync, *l2_shared.NewTrustedStateManager(timeProvider, time.Hour))
+ executor := l2_shared.NewProcessorTrustedBatchSync(executorSteps, timeProvider, l1SyncChecker, cfg)
+ a := l2_shared.NewTrustedBatchesRetrieve(executor, zkEVMClient, state, sync, *l2_shared.NewTrustedStateManager(timeProvider, timeOfLiveBatchOnCache))
return a
}
@@ -144,12 +142,7 @@ func (b *SyncTrustedBatchExecutorForEtrog) FullProcess(ctx context.Context, data
data.DebugPrefix += " (emptyBatch) "
return b.CreateEmptyBatch(ctx, data, dbTx)
}
- err := b.checkIfWeAreSyncedFromL1ToProcessGlobalExitRoot(ctx, data, dbTx)
- if err != nil {
- log.Errorf("%s error checkIfWeAreSyncedFromL1ToProcessGlobalExitRoot. Error: %v", data.DebugPrefix, err)
- return nil, err
- }
- err = b.openBatch(ctx, data.TrustedBatch, dbTx, data.DebugPrefix)
+ err := b.openBatch(ctx, data.TrustedBatch, dbTx, data.DebugPrefix)
if err != nil {
log.Errorf("%s error openning batch. Error: %v", data.DebugPrefix, err)
return nil, err
@@ -209,11 +202,6 @@ func (b *SyncTrustedBatchExecutorForEtrog) IncrementalProcess(ctx context.Contex
log.Errorf("%s error checkThatL2DataIsIncremental. Error: %v", data.DebugPrefix, err)
return nil, err
}
- err = b.checkIfWeAreSyncedFromL1ToProcessGlobalExitRoot(ctx, data, dbTx)
- if err != nil {
- log.Errorf("%s error checkIfWeAreSyncedFromL1ToProcessGlobalExitRoot. Error: %v", data.DebugPrefix, err)
- return nil, err
- }
PartialBatchL2Data, err := b.composePartialBatch(data.StateBatch, data.TrustedBatch)
if err != nil {
@@ -260,21 +248,17 @@ func (b *SyncTrustedBatchExecutorForEtrog) IncrementalProcess(ctx context.Contex
}
updatedBatch := *data.StateBatch
+ updatedBatch.LocalExitRoot = data.TrustedBatch.LocalExitRoot
+ updatedBatch.AccInputHash = data.TrustedBatch.AccInputHash
+ updatedBatch.GlobalExitRoot = data.TrustedBatch.GlobalExitRoot
updatedBatch.BatchL2Data = data.TrustedBatch.BatchL2Data
updatedBatch.WIP = !data.BatchMustBeClosed
+
res := l2_shared.NewProcessResponse()
res.UpdateCurrentBatchWithExecutionResult(&updatedBatch, processBatchResp)
return &res, nil
}
-func (b *SyncTrustedBatchExecutorForEtrog) checkIfWeAreSyncedFromL1ToProcessGlobalExitRoot(ctx context.Context, data *l2_shared.ProcessData, dbTx pgx.Tx) error {
- if b.l1SyncChecker == nil {
- log.Infof("Disabled check L1 sync status for process batch")
- return nil
- }
- return b.l1SyncChecker.CheckL1SyncStatusEnoughToProcessBatch(ctx, data.BatchNumber, data.TrustedBatch.GlobalExitRoot, dbTx)
-}
-
func (b *SyncTrustedBatchExecutorForEtrog) updateWIPBatch(ctx context.Context, data *l2_shared.ProcessData, NewStateRoot common.Hash, dbTx pgx.Tx) error {
receipt := state.ProcessingReceipt{
BatchNumber: data.BatchNumber,
@@ -296,7 +280,17 @@ func (b *SyncTrustedBatchExecutorForEtrog) updateWIPBatch(ctx context.Context, d
// ReProcess process a batch that we have processed before, but we don't have the intermediate state root, so we need to reprocess it
func (b *SyncTrustedBatchExecutorForEtrog) ReProcess(ctx context.Context, data *l2_shared.ProcessData, dbTx pgx.Tx) (*l2_shared.ProcessResponse, error) {
log.Warnf("%s needs to be reprocessed! deleting batches from this batch, because it was partially processed but the intermediary stateRoot is lost", data.DebugPrefix)
- err := b.state.ResetTrustedState(ctx, uint64(data.TrustedBatch.Number)-1, dbTx)
+ // Check that there are no VirtualBatches neither VerifiedBatches that are newer than this batch
+ lastVirtualBatchNum, err := b.state.GetLastVirtualBatchNum(ctx, dbTx)
+ if err != nil {
+ log.Errorf("%s error getting lastVirtualBatchNum. Error: %v", data.DebugPrefix, err)
+ return nil, err
+ }
+ if lastVirtualBatchNum >= uint64(data.TrustedBatch.Number) {
+ log.Errorf("%s there are newer or equal virtualBatches than this batch. Can't reprocess because then will delete a virtualBatch", data.DebugPrefix)
+ return nil, syncinterfaces.ErrMissingSyncFromL1
+ }
+ err = b.state.ResetTrustedState(ctx, uint64(data.TrustedBatch.Number)-1, dbTx)
if err != nil {
log.Warnf("%s error deleting batches from this batch: %v", data.DebugPrefix, err)
return nil, err
diff --git a/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync_test.go b/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync_test.go
index 01de172a5d..97a7125b96 100644
--- a/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync_test.go
+++ b/synchronizer/l2_sync/l2_sync_etrog/executor_trusted_batch_sync_test.go
@@ -196,6 +196,7 @@ func TestNothingProcessDoesntMatchBatchReprocess(t *testing.T) {
StateRoot: common.HexToHash(hashExamplesValues[2]),
},
}
+ testData.stateMock.EXPECT().GetLastVirtualBatchNum(testData.ctx, mock.Anything).Return(uint64(122), nil).Maybe()
testData.stateMock.EXPECT().ResetTrustedState(testData.ctx, data.BatchNumber-1, mock.Anything).Return(nil).Once()
testData.stateMock.EXPECT().OpenBatch(testData.ctx, mock.Anything, mock.Anything).Return(nil).Once()
testData.stateMock.EXPECT().GetL1InfoTreeDataFromBatchL2Data(testData.ctx, mock.Anything, mock.Anything).Return(map[uint32]state.L1DataV2{}, common.Hash{}, common.Hash{}, nil).Once()
@@ -211,6 +212,35 @@ func TestNothingProcessDoesntMatchBatchReprocess(t *testing.T) {
require.NoError(t, err)
}
+func TestReprocessRejectDeleteVirtualBatch(t *testing.T) {
+ testData := newTestData(t)
+ // Arrange
+ data := l2_shared.ProcessData{
+ BatchNumber: 123,
+ Mode: l2_shared.NothingProcessMode,
+ BatchMustBeClosed: false,
+ DebugPrefix: "test",
+ StateBatch: &state.Batch{
+ BatchNumber: 123,
+ StateRoot: common.HexToHash(hashExamplesValues[1]),
+ BatchL2Data: []byte{1, 2, 3, 4},
+ WIP: true,
+ },
+ TrustedBatch: &types.Batch{
+ Number: 123,
+ StateRoot: common.HexToHash(hashExamplesValues[0]),
+ BatchL2Data: []byte{1, 2, 3, 4},
+ },
+ PreviousStateBatch: &state.Batch{
+ BatchNumber: 122,
+ StateRoot: common.HexToHash(hashExamplesValues[2]),
+ },
+ }
+ testData.stateMock.EXPECT().GetLastVirtualBatchNum(testData.ctx, mock.Anything).Return(uint64(123), nil).Maybe()
+ _, err := testData.sut.ReProcess(testData.ctx, &data, nil)
+ require.Error(t, err)
+}
+
func TestNothingProcessIfBatchMustBeClosedThenCloseBatch(t *testing.T) {
testData := newTestData(t)
// Arrange
@@ -265,7 +295,7 @@ func TestCloseBatchGivenAlreadyClosedAndTheDataAreRightThenNoError(t *testing.T)
require.NoError(t, res)
}
-func TestEmptyBatch(t *testing.T) {
+func TestEmptyWIPBatch(t *testing.T) {
testData := newTestData(t)
// Arrange
expectedBatch := state.Batch{
diff --git a/synchronizer/l2_sync/l2_sync_etrog/mocks/l1_sync_checker.go b/synchronizer/l2_sync/l2_sync_etrog/mocks/l1_sync_checker.go
deleted file mode 100644
index ced3c7a54f..0000000000
--- a/synchronizer/l2_sync/l2_sync_etrog/mocks/l1_sync_checker.go
+++ /dev/null
@@ -1,89 +0,0 @@
-// Code generated by mockery. DO NOT EDIT.
-
-package mock_l2_sync_etrog
-
-import (
- context "context"
-
- common "github.com/ethereum/go-ethereum/common"
-
- mock "github.com/stretchr/testify/mock"
-
- pgx "github.com/jackc/pgx/v4"
-)
-
-// L1SyncChecker is an autogenerated mock type for the L1SyncChecker type
-type L1SyncChecker struct {
- mock.Mock
-}
-
-type L1SyncChecker_Expecter struct {
- mock *mock.Mock
-}
-
-func (_m *L1SyncChecker) EXPECT() *L1SyncChecker_Expecter {
- return &L1SyncChecker_Expecter{mock: &_m.Mock}
-}
-
-// CheckL1SyncStatusEnoughToProcessBatch provides a mock function with given fields: ctx, batchNumber, globalExitRoot, dbTx
-func (_m *L1SyncChecker) CheckL1SyncStatusEnoughToProcessBatch(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx) error {
- ret := _m.Called(ctx, batchNumber, globalExitRoot, dbTx)
-
- if len(ret) == 0 {
- panic("no return value specified for CheckL1SyncStatusEnoughToProcessBatch")
- }
-
- var r0 error
- if rf, ok := ret.Get(0).(func(context.Context, uint64, common.Hash, pgx.Tx) error); ok {
- r0 = rf(ctx, batchNumber, globalExitRoot, dbTx)
- } else {
- r0 = ret.Error(0)
- }
-
- return r0
-}
-
-// L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckL1SyncStatusEnoughToProcessBatch'
-type L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call struct {
- *mock.Call
-}
-
-// CheckL1SyncStatusEnoughToProcessBatch is a helper method to define mock.On call
-// - ctx context.Context
-// - batchNumber uint64
-// - globalExitRoot common.Hash
-// - dbTx pgx.Tx
-func (_e *L1SyncChecker_Expecter) CheckL1SyncStatusEnoughToProcessBatch(ctx interface{}, batchNumber interface{}, globalExitRoot interface{}, dbTx interface{}) *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call {
- return &L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call{Call: _e.mock.On("CheckL1SyncStatusEnoughToProcessBatch", ctx, batchNumber, globalExitRoot, dbTx)}
-}
-
-func (_c *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call) Run(run func(ctx context.Context, batchNumber uint64, globalExitRoot common.Hash, dbTx pgx.Tx)) *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run(args[0].(context.Context), args[1].(uint64), args[2].(common.Hash), args[3].(pgx.Tx))
- })
- return _c
-}
-
-func (_c *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call) Return(_a0 error) *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call {
- _c.Call.Return(_a0)
- return _c
-}
-
-func (_c *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call) RunAndReturn(run func(context.Context, uint64, common.Hash, pgx.Tx) error) *L1SyncChecker_CheckL1SyncStatusEnoughToProcessBatch_Call {
- _c.Call.Return(run)
- return _c
-}
-
-// NewL1SyncChecker creates a new instance of L1SyncChecker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
-// The first argument is typically a *testing.T value.
-func NewL1SyncChecker(t interface {
- mock.TestingT
- Cleanup(func())
-}) *L1SyncChecker {
- mock := &L1SyncChecker{}
- mock.Mock.Test(t)
-
- t.Cleanup(func() { mock.AssertExpectations(t) })
-
- return mock
-}
diff --git a/synchronizer/l2_sync/l2_sync_etrog/mocks/state_interface.go b/synchronizer/l2_sync/l2_sync_etrog/mocks/state_interface.go
index 1772294e0c..5101bb4b6a 100644
--- a/synchronizer/l2_sync/l2_sync_etrog/mocks/state_interface.go
+++ b/synchronizer/l2_sync/l2_sync_etrog/mocks/state_interface.go
@@ -317,6 +317,63 @@ func (_c *StateInterface_GetL1InfoTreeDataFromBatchL2Data_Call) RunAndReturn(run
return _c
}
+// GetLastVirtualBatchNum provides a mock function with given fields: ctx, dbTx
+func (_m *StateInterface) GetLastVirtualBatchNum(ctx context.Context, dbTx pgx.Tx) (uint64, error) {
+ ret := _m.Called(ctx, dbTx)
+
+ if len(ret) == 0 {
+ panic("no return value specified for GetLastVirtualBatchNum")
+ }
+
+ var r0 uint64
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, pgx.Tx) (uint64, error)); ok {
+ return rf(ctx, dbTx)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, pgx.Tx) uint64); ok {
+ r0 = rf(ctx, dbTx)
+ } else {
+ r0 = ret.Get(0).(uint64)
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, pgx.Tx) error); ok {
+ r1 = rf(ctx, dbTx)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// StateInterface_GetLastVirtualBatchNum_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLastVirtualBatchNum'
+type StateInterface_GetLastVirtualBatchNum_Call struct {
+ *mock.Call
+}
+
+// GetLastVirtualBatchNum is a helper method to define mock.On call
+// - ctx context.Context
+// - dbTx pgx.Tx
+func (_e *StateInterface_Expecter) GetLastVirtualBatchNum(ctx interface{}, dbTx interface{}) *StateInterface_GetLastVirtualBatchNum_Call {
+ return &StateInterface_GetLastVirtualBatchNum_Call{Call: _e.mock.On("GetLastVirtualBatchNum", ctx, dbTx)}
+}
+
+func (_c *StateInterface_GetLastVirtualBatchNum_Call) Run(run func(ctx context.Context, dbTx pgx.Tx)) *StateInterface_GetLastVirtualBatchNum_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(context.Context), args[1].(pgx.Tx))
+ })
+ return _c
+}
+
+func (_c *StateInterface_GetLastVirtualBatchNum_Call) Return(_a0 uint64, _a1 error) *StateInterface_GetLastVirtualBatchNum_Call {
+ _c.Call.Return(_a0, _a1)
+ return _c
+}
+
+func (_c *StateInterface_GetLastVirtualBatchNum_Call) RunAndReturn(run func(context.Context, pgx.Tx) (uint64, error)) *StateInterface_GetLastVirtualBatchNum_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
// OpenBatch provides a mock function with given fields: ctx, processingContext, dbTx
func (_m *StateInterface) OpenBatch(ctx context.Context, processingContext state.ProcessingContext, dbTx pgx.Tx) error {
ret := _m.Called(ctx, processingContext, dbTx)
diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go
index 04fe6dd6f5..591b2f4357 100644
--- a/synchronizer/synchronizer.go
+++ b/synchronizer/synchronizer.go
@@ -164,7 +164,7 @@ func newL1SyncParallel(ctx context.Context, cfg Config, etherManForL1 []syncinte
l1SyncOrchestration := l1_parallel_sync.NewL1SyncOrchestration(ctx, l1DataRetriever, L1DataProcessor)
if runExternalControl {
log.Infof("Starting external control")
- externalControl := newExternalControl(l1DataRetriever, l1SyncOrchestration)
+ externalControl := newExternalCmdControl(l1DataRetriever, l1SyncOrchestration)
externalControl.start()
}
return l1SyncOrchestration
@@ -370,6 +370,7 @@ func (s *ClientSynchronizer) Sync() error {
metrics.FullL1SyncTime(time.Since(startL1))
if err != nil {
log.Warn("error syncing blocks: ", err)
+ s.CleanTrustedState()
lastEthBlockSynced, err = s.state.GetLastBlock(s.ctx, nil)
if err != nil {
log.Fatal("error getting lastEthBlockSynced to resume the synchronization... Error: ", err)