diff --git a/docs/design/synchronizer/l1_sync_channels_flow_v2.drawio.png b/docs/design/synchronizer/l1_sync_channels_flow_v2.drawio.png new file mode 100644 index 0000000000..b0f24bf57f Binary files /dev/null and b/docs/design/synchronizer/l1_sync_channels_flow_v2.drawio.png differ diff --git a/docs/design/synchronizer/l1_synchronization.md b/docs/design/synchronizer/l1_synchronization.md index f44d26c89a..547df4c75e 100644 --- a/docs/design/synchronizer/l1_synchronization.md +++ b/docs/design/synchronizer/l1_synchronization.md @@ -1,63 +1,63 @@ - +# L1 parallel synchronization This is a refactor of L1 synchronization to improve speed. -- It ask data in parallel to L1 meanwhile another goroutine is execution the rollup info. -- It makes that executor be ocupied 100% of time. - -## Pending to do - - - All the stuff related to updating last block on L1 could be moved to another class - - Check context usage: - It need a context to cancel itself and create another context to cancel workers? - - Emit metrics - - if nothing to update reduce code to be executed (not sure, because functionality to keep update beyond last block on L1) - - Improve the unittest of all objects - - Check all log.fatals to remove it or add a status before the panic - - Missing **feature update beyond last block on L1**: Old syncBlocks method try to ask for blocks over last L1 block, I suppose that is to keep synchronizing even a long the synchronization have new blocks. This is not implemented here - This is the behaviour of ethman in that situation: - - GetRollupInfoByBlockRange returns no errors, zero blocks... - - EthBlockByNumber returns error: "not found" +- It ask data in parallel to L1 meanwhile another goroutine is executing the rollup info. +- It makes that the executor be occupied 100% of the time. + +## Pending to do - Some test on ` synchronizer/synchronizer_test.go` are based on this feature, so are running against legacy code -- Move to configuration file some 'hardcoded' values ## Configuration -This feature is experimental for that reason you can configure to use old sequential one: +You could choose between new L1 parallel sync or sequential one (legacy): ``` [Synchronizer] UseParallelModeForL1Synchronization = false ``` If you activate this feature you can configure: -- `NumberOfParallelOfEthereumClients`: how many parallel request can be done. Currently this create multiples instances of etherman over same server, in the future maybe make sense to use differents servers -- `CapacityOfBufferingRollupInfoFromL1`: buffer of data pending to be processed +- `NumberOfParallelOfEthereumClients`: how many parallel request can be done. You must consider that 1 is just for requesting the last block on L1, and the rest for rollup info +- `CapacityOfBufferingRollupInfoFromL1`: buffer of data pending to be processed. This is the queue data to be executed by consumer. + +For a full description of fields please check config-file documentation. + +Example: ``` UseParallelModeForL1Synchronization = true [Synchronizer.L1ParallelSynchronization] NumberOfParallelOfEthereumClients = 2 CapacityOfBufferingRollupInfoFromL1 = 10 + TimeForCheckLastBlockOnL1Time = "5s" + TimeoutForRequestLastBlockOnL1 = "5s" + MaxNumberOfRetriesForRequestLastBlockOnL1 = 3 + TimeForShowUpStatisticsLog = "5m" + TimeOutMainLoop = "5m" + MinTimeBetweenRetriesForRollupInfo = "5s" + [Synchronizer.L1ParallelSynchronization.PerformanceCheck] + AcceptableTimeWaitingForNewRollupInfo = "5s" + NumIterationsBeforeStartCheckingTimeWaitinfForNewRollupInfo = 10 + ``` ## Remakable logs ### How to known the occupation of executor To check that executor are fully ocuppied you can check next log: ``` -INFO synchronizer/l1_processor_consumer.go:110 consumer: processing rollupInfo #1291: range:[188064, 188164] num_blocks [0] wasted_time_waiting_for_data [74.17575ms] last_process_time [2.534115ms] block_per_second [0.000000] +INFO synchronizer/l1_rollup_info_consumer.go:128 consumer: processing rollupInfo #1553: range:[8720385, 8720485] num_blocks [37] statistics:wasted_time_waiting_for_data [0s] last_process_time [6m2.635208117s] block_per_second [2.766837] ``` -The `wasted_time_waiting_for_data` show the waiting time between this call and the previous to executor. If this value (after 20 interations) are greater to 1 seconds a warning is show. +The `wasted_time_waiting_for_data` show the waiting time between this call and the previous to executor. It could show a warning configuring `Synchronizer.L1ParallelSynchronization.PerformanceCheck` ### Estimated time to be fully synchronizer with L1 -This log show the estimated time (**ETA**) to reach the block goal +This log show the estimated time (**ETA**) to reach the block goal. You can configure the frequency with var `TimeForShowUpStatisticsLog` ``` -INFO synchronizer/l1_data_retriever_producer.go:255 producer: Statistics:ETA: 3h40m1.311379085s percent:1.35 blocks_per_seconds:706.80 pending_block:127563/9458271 num_errors:0 +INFO synchronizer/l1_rollup_info_producer.go:357 producer: Statistics:ETA: 54h7m47.594422312s percent:12.26 blocks_per_seconds:5.48 pending_block:149278/1217939 num_errors:8 ``` ## Flow of data -![l1_sync_channels_flow_v2 drawio](https://github.com/0xPolygonHermez/zkevm-node/assets/129153821/430abeb3-13b2-4c13-8d5e-4996a134a353) +![l1_sync_channels_flow_v2 drawio](l1_sync_channels_flow_v2.drawio.png) -## Class diagram -This is a class diagram of principal class an relationships. -The entry point is `synchronizer.go:276` function `syncBlocksParallel`. -- It create all objects needed and launch `l1SyncOrchestration` that wait until the job is done to return ### The main objects are: -- `l1RollupInfoProducer`: is the object that send rollup data through the channel +- `l1SyncOrchestration`: is the entry point and the reponsable to launch the producer and consumer +- `l1RollupInfoProducer`: this object send rollup data through the channel to the consumer - `l1RollupInfoConsumer`: that receive the data and execute it -![image](https://github.com/0xPolygonHermez/zkevm-node/assets/129153821/957a3e95-77c7-446b-a6ec-ef28cc44cb18) + +## Future changes +- Configure multiples servers for L1 information: instead of calling the same server,it make sense to configure individually each URL to allow to have multiples sources diff --git a/synchronizer/l1_rollup_info_producer.go b/synchronizer/l1_rollup_info_producer.go index d36aded12b..ccab954339 100644 --- a/synchronizer/l1_rollup_info_producer.go +++ b/synchronizer/l1_rollup_info_producer.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/0xPolygonHermez/zkevm-node" "github.com/0xPolygonHermez/zkevm-node/log" ) @@ -30,6 +31,7 @@ const ( minTimeOutMainLoop = time.Minute * 5 timeForShowUpStatisticsLog = time.Second * 60 conversionFactorPercentage = 100 + lenCommandsChannels = 5 ) type filter interface { @@ -48,10 +50,11 @@ type syncStatusInterface interface { isNodeFullySynchronizedWithL1() bool haveRequiredAllBlocksToBeSynchronized() bool isSetLastBlockOnL1Value() bool + doesItHaveAllTheNeedDataToWork() bool getLastBlockOnL1() uint64 onStartedNewWorker(br blockRange) - onFinishWorker(br blockRange, successful bool) + onFinishWorker(br blockRange, successful bool) bool onNewLastBlockOnL1(lastBlock uint64) onNewLastBlockResponse } @@ -74,10 +77,11 @@ const ( producerIdle producerStatusEnum = 0 producerWorking producerStatusEnum = 1 producerSynchronized producerStatusEnum = 2 + producerNoRunning producerStatusEnum = 3 ) func (s producerStatusEnum) String() string { - return [...]string{"idle", "working", "synchronized"}[s] + return [...]string{"idle", "working", "synchronized", "no_running"}[s] } type configProducer struct { @@ -120,6 +124,23 @@ func (cfg *configProducer) normalize() { } } +type producerCmdEnum int32 + +const ( + producerNop producerCmdEnum = 0 + producerStop producerCmdEnum = 1 + producerReset producerCmdEnum = 2 +) + +func (s producerCmdEnum) String() string { + return [...]string{"nop", "stop", "reset"}[s] +} + +type producerCmd struct { + cmd producerCmdEnum + param1 uint64 +} + type l1RollupInfoProducer struct { mutex sync.Mutex ctxParent context.Context @@ -133,9 +154,16 @@ type l1RollupInfoProducer struct { filterToSendOrdererResultsToConsumer filter statistics l1RollupInfoProducerStatistics cfg configProducer + channelCmds chan producerCmd } func (l *l1RollupInfoProducer) toStringBrief() string { + l.mutex.Lock() + defer l.mutex.Unlock() + return l.toStringBriefUnsafe() +} + +func (l *l1RollupInfoProducer) toStringBriefUnsafe() string { return fmt.Sprintf("status:%s syncStatus:[%s] workers:[%s] filter:[%s] cfg:[%s]", l.status, l.syncStatus.toStringBrief(), l.workers.String(), l.filterToSendOrdererResultsToConsumer.ToStringBrief(), l.cfg.String()) } @@ -155,42 +183,66 @@ func newL1DataRetriever(cfg configProducer, ethermans []EthermanInterface, outgo filterToSendOrdererResultsToConsumer: newFilterToSendOrdererResultsToConsumer(invalidBlockNumber), outgoingChannel: outgoingChannel, statistics: newRollupInfoProducerStatistics(invalidBlockNumber), - status: producerIdle, + status: producerNoRunning, cfg: cfg, + channelCmds: make(chan producerCmd, lenCommandsChannels), } return &result } // ResetAndStop: reset the object and stop the current process. Set first block to be retrieved -func (l *l1RollupInfoProducer) ResetAndStop(startingBlockNumber uint64) { +// This function could be call from outside of main goroutine +func (l *l1RollupInfoProducer) Reset(startingBlockNumber uint64) { + log.Infof("producer: ResetAndStop(%d) queue cmd", startingBlockNumber) + l.channelCmds <- producerCmd{cmd: producerReset, param1: startingBlockNumber} +} + +func (l *l1RollupInfoProducer) resetUnsafe(startingBlockNumber uint64) { log.Infof("producer: Reset L1 sync process to blockNumber %d st=%s", startingBlockNumber, l.toStringBrief()) log.Debugf("producer: Reset(%d): stop previous run (state=%s)", startingBlockNumber, l.status.String()) - l.Stop() - - l.mutex.Lock() log.Debugf("producer: Reset(%d): syncStatus.reset", startingBlockNumber) l.syncStatus.reset(startingBlockNumber) l.statistics.reset(startingBlockNumber) - l.mutex.Unlock() + log.Debugf("producer: Reset(%d): stopping workers", startingBlockNumber) + l.workers.stop() // Empty pending rollupinfos log.Debugf("producer: Reset(%d): emptyChannel", startingBlockNumber) l.emptyChannel() log.Debugf("producer: Reset(%d): reset Filter", startingBlockNumber) l.filterToSendOrdererResultsToConsumer.Reset(startingBlockNumber) - log.Debugf("producer: Reset(%d): reset done!", startingBlockNumber) + l.setStatus(producerIdle) + log.Infof("producer: Reset(%d): reset done!", startingBlockNumber) +} + +func (l *l1RollupInfoProducer) isProducerRunning() bool { + return l.status != producerNoRunning +} +func (l *l1RollupInfoProducer) setStatus(newStatus producerStatusEnum) { + previousStatus := l.status + l.status = newStatus + if previousStatus != newStatus { + log.Infof("producer: Status changed from [%s] to [%s]", previousStatus.String(), newStatus.String()) + if newStatus == producerSynchronized { + log.Infof("producer: send a message to consumer to indicate that we are synchronized") + l.sendPackages([]l1SyncMessage{*newL1SyncMessageControl(eventProducerIsFullySynced)}) + } + } } func (l *l1RollupInfoProducer) Stop() { - log.Debugf("producer: stop() called st=%s", l.toStringBrief()) + log.Infof("producer: Stop() queue cmd") + l.channelCmds <- producerCmd{cmd: producerStop} +} - if l.status != producerIdle { +func (l *l1RollupInfoProducer) stopUnsafe() { + log.Infof("producer: stop() called st=%s", l.toStringBrief()) + + if l.isProducerRunning() { log.Infof("producer:Stop:was running -> stopping producer") + l.ctxWithCancel.cancel() } - l.ctxWithCancel.cancel() - l.status = producerIdle - l.mutex.Lock() - defer l.mutex.Unlock() + l.setStatus(producerNoRunning) log.Debugf("producer:Stop: stop workers and wait for finish (%s)", l.workers.String()) l.workers.stop() } @@ -210,7 +262,7 @@ func (l *l1RollupInfoProducer) initialize(ctx context.Context) error { log.Debug("producer: initialize") err := l.verify() if err != nil { - return err + log.Debug("producer: initialize, syncstatus not ready: %s", err.Error()) } if l.ctxParent != ctx || l.ctxWithCancel.isInvalid() { log.Debug("producer: start called and need to create a new context") @@ -221,18 +273,6 @@ func (l *l1RollupInfoProducer) initialize(ctx context.Context) error { if err != nil { return err } - if l.syncStatus.isSetLastBlockOnL1Value() { - log.Infof("producer: Need a initial value for Last Block On L1, doing the request (maxRetries:%v, timeRequest:%v)", - l.cfg.numOfAllowedRetriesForRequestLastBlockOnL1, l.cfg.timeoutForRequestLastBlockOnL1) - //result := l.retrieveInitialValueOfLastBlock(maxRetriesForRequestnitialValueOfLastBlock, timeRequestInitialValueOfLastBlock) - result := l.workers.requestLastBlockWithRetries(l.ctxWithCancel.ctx, l.cfg.timeoutForRequestLastBlockOnL1, l.cfg.numOfAllowedRetriesForRequestLastBlockOnL1) - if result.generic.err != nil { - log.Error(result.generic.err) - return result.generic.err - } - l.onNewLastBlock(result.result.block, false) - } - return nil } @@ -244,57 +284,94 @@ func (l *l1RollupInfoProducer) Start(ctx context.Context) error { log.Infof("producer: can't start because: %s", err.Error()) return err } + l.setStatus(producerIdle) log.Debugf("producer: starting configuration: %s", l.cfg.String()) var waitDuration = time.Duration(0) for l.step(&waitDuration) { } + l.setStatus(producerNoRunning) l.workers.waitFinishAllWorkers() return nil } func (l *l1RollupInfoProducer) step(waitDuration *time.Duration) bool { - previousStatus := l.status - res := l.stepInner(waitDuration) - newStatus := l.status - if previousStatus != newStatus { - log.Infof("producer: Status changed from [%s] to [%s]", previousStatus.String(), newStatus.String()) - if newStatus == producerSynchronized { - log.Infof("producer: send a message to consumer to indicate that we are synchronized") - l.sendPackages([]l1SyncMessage{*newL1SyncMessageControl(eventProducerIsFullySynced)}) - } + if l.status == producerNoRunning { + log.Info("producer: step: status is no running, changing to idle") + l.setStatus(producerIdle) } - return res -} - -func (l *l1RollupInfoProducer) stepInner(waitDuration *time.Duration) bool { + log.Infof("producer:%s step: status:%s", zkevm.BuildDate, l.toStringBrief()) select { case <-l.ctxWithCancel.Done(): log.Debugf("producer: context canceled") return false + case cmd := <-l.channelCmds: + log.Infof("producer: received a command") + res := l.executeCmd(cmd) + if !res { + log.Info("producer: cmd %s stop the process", cmd.cmd.String()) + return false + } // That timeout is not need, but just in case that stop launching request case <-time.After(*waitDuration): log.Debugf("producer: reach timeout of step loop it was of %s", *waitDuration) case resultRollupInfo := <-l.workers.getResponseChannelForRollupInfo(): l.onResponseRollupInfo(resultRollupInfo) } - if l.syncStatus.haveRequiredAllBlocksToBeSynchronized() { - // Try to nenew last block on L1 if needed - log.Debugf("producer: we have required (maybe not responsed yet) all blocks, so getting last block on L1") - l.renewLastBlockOnL1IfNeeded(false) + switch l.status { + case producerIdle: + // Is ready to start working? + l.renewLastBlockOnL1IfNeeded() + if l.syncStatus.doesItHaveAllTheNeedDataToWork() { + log.Infof("producer: producerIdle: have all the data to work, moving to working status. status:%s", l.syncStatus.toStringBrief()) + l.setStatus(producerWorking) + // This is for wakeup the step again to launch a new work + l.channelCmds <- producerCmd{cmd: producerNop} + } else { + log.Infof("producer: producerIdle: still dont have all the data to work status:%s", l.syncStatus.toStringBrief()) + } + case producerWorking: + // launch new Work + l.launchWork() + // If I'm have required all blocks to L1? + if l.syncStatus.haveRequiredAllBlocksToBeSynchronized() { + log.Debugf("producer: producerWorking: haveRequiredAllBlocksToBeSynchronized -> renewLastBlockOnL1IfNeeded") + l.renewLastBlockOnL1IfNeeded() + } + // If after asking for a new lastBlockOnL1 we are still synchronized then we are synchronized + if l.syncStatus.isNodeFullySynchronizedWithL1() { + l.setStatus(producerSynchronized) + } + case producerSynchronized: + // renew last block on L1 if needed + log.Debugf("producer: producerSynchronized") + l.renewLastBlockOnL1IfNeeded() + + if l.launchWork() > 0 { + l.setStatus(producerWorking) + } } - // Try to launch retrieve more rollupInfo from L1 - l.launchWork() + if l.cfg.timeForShowUpStatisticsLog != 0 && time.Since(l.statistics.lastShowUpTime) > l.cfg.timeForShowUpStatisticsLog { log.Infof("producer: Statistics:%s", l.statistics.getETA()) l.statistics.lastShowUpTime = time.Now() } - if l.syncStatus.isNodeFullySynchronizedWithL1() { - l.status = producerSynchronized - } else { - l.status = producerWorking - } *waitDuration = l.getNextTimeout() - log.Debugf("producer: Next timeout: %s status:%s sync_status: %s", *waitDuration, l.status, l.syncStatus.toStringBrief()) + log.Debugf("producer: Next timeout: %s status:%s ", *waitDuration, l.toStringBrief()) + return true +} + +// return if the producer must keep running (false -> stop) +func (l *l1RollupInfoProducer) executeCmd(cmd producerCmd) bool { + switch cmd.cmd { + case producerStop: + log.Infof("producer: received a stop, so it stops processing") + l.stopUnsafe() + return false + case producerReset: + log.Infof("producer: received a reset(%d)", cmd.param1) + l.resetUnsafe(cmd.param1) + return true + } return true } @@ -312,23 +389,22 @@ func (l *l1RollupInfoProducer) getNextTimeout() time.Duration { case producerSynchronized: nextRenewLastBlock := time.Since(l.timeLastBLockOnL1) + l.ttlOfLastBlockOnL1() return max(nextRenewLastBlock, time.Second) + case producerNoRunning: + return timeOutMainLoop default: - log.Fatalf("producer: Unknown status: %s", l.status) + log.Fatalf("producer: Unknown status: %s", l.status.String()) } return timeOutMainLoop } // OnNewLastBlock is called when a new last block on L1 is received -func (l *l1RollupInfoProducer) onNewLastBlock(lastBlock uint64, launchWork bool) onNewLastBlockResponse { +func (l *l1RollupInfoProducer) onNewLastBlock(lastBlock uint64) onNewLastBlockResponse { resp := l.syncStatus.onNewLastBlockOnL1(lastBlock) l.statistics.updateLastBlockNumber(resp.fullRange.toBlock) l.timeLastBLockOnL1 = time.Now() if resp.extendedRange != nil { log.Infof("producer: New last block on L1: %v -> %s", resp.fullRange.toBlock, resp.toString()) } - if launchWork { - l.launchWork() - } return resp } @@ -348,8 +424,6 @@ func (l *l1RollupInfoProducer) canISendNewRequestsUnsafe() (bool, string) { // launchWork: launch new workers if possible and returns new channels created // returns the number of workers launched func (l *l1RollupInfoProducer) launchWork() int { - l.mutex.Lock() - defer l.mutex.Unlock() launchedWorker := 0 allowNewRequests, allowNewRequestMsg := l.canISendNewRequestsUnsafe() accDebugStr := "[" + allowNewRequestMsg + "] " @@ -387,13 +461,11 @@ func (l *l1RollupInfoProducer) outgoingPackageStatusDebugString() string { return fmt.Sprintf("outgoint_channel[%d/%d], filter:%s workers:%s", len(l.outgoingChannel), cap(l.outgoingChannel), l.filterToSendOrdererResultsToConsumer.ToStringBrief(), l.workers.String()) } -func (l *l1RollupInfoProducer) renewLastBlockOnL1IfNeeded(forced bool) { - l.mutex.Lock() +func (l *l1RollupInfoProducer) renewLastBlockOnL1IfNeeded() { elapsed := time.Since(l.timeLastBLockOnL1) ttl := l.ttlOfLastBlockOnL1() oldBlock := l.syncStatus.getLastBlockOnL1() - l.mutex.Unlock() - if elapsed > ttl || forced { + if elapsed > ttl { log.Infof("producer: Need a new value for Last Block On L1, doing the request") result := l.workers.requestLastBlockWithRetries(l.ctxWithCancel.ctx, l.cfg.timeoutForRequestLastBlockOnL1, l.cfg.numOfAllowedRetriesForRequestLastBlockOnL1) log.Infof("producer: Need a new value for Last Block On L1, doing the request old_block:%v -> new block:%v", oldBlock, result.result.block) @@ -401,7 +473,7 @@ func (l *l1RollupInfoProducer) renewLastBlockOnL1IfNeeded(forced bool) { log.Error(result.generic.err) return } - l.onNewLastBlock(result.result.block, true) + l.onNewLastBlock(result.result.block) } } @@ -409,7 +481,10 @@ func (l *l1RollupInfoProducer) onResponseRollupInfo(result responseRollupInfoByB log.Infof("producer: Received responseRollupInfoByBlockRange: %s", result.toStringBrief()) l.statistics.onResponseRollupInfo(result) isOk := (result.generic.err == nil) - l.syncStatus.onFinishWorker(result.result.blockRange, isOk) + if !l.syncStatus.onFinishWorker(result.result.blockRange, isOk) { + log.Infof("producer: Ignoring result because the range is not longer valid: %s", result.toStringBrief()) + return + } if isOk { outgoingPackages := l.filterToSendOrdererResultsToConsumer.Filter(*newL1SyncMessageData(result.result)) l.sendPackages(outgoingPackages) diff --git a/synchronizer/l1_rollup_info_producer_test.go b/synchronizer/l1_rollup_info_producer_test.go index 70cd998883..ce3665b5f2 100644 --- a/synchronizer/l1_rollup_info_producer_test.go +++ b/synchronizer/l1_rollup_info_producer_test.go @@ -29,15 +29,15 @@ func TestExploratoryL1Get(t *testing.T) { func TestGivenNeedSyncWhenStartThenAskForRollupInfo(t *testing.T) { sut, ethermans, _ := setup(t) - etherman := ethermans[0] - expectedForGettingL1LastBlock(t, etherman, 150) - expectedRollupInfoCalls(t, etherman, 1) + expectedForGettingL1LastBlock(t, ethermans[0], 150) + expectedRollupInfoCalls(t, ethermans[1], 1) err := sut.initialize(context.Background()) require.NoError(t, err) sut.launchWork() var waitDuration = time.Duration(0) - sut.stepInner(&waitDuration) + sut.step(&waitDuration) + sut.step(&waitDuration) sut.workers.waitFinishAllWorkers() } @@ -47,17 +47,10 @@ func TestGivenNoNeedSyncWhenStartsSendAndEventOfSynchronized(t *testing.T) { // Our last block is 100 in DB and it returns 100 as last block on L1 // so is synchronized expectedForGettingL1LastBlock(t, etherman, 100) - //expectedRollupInfoCalls(t, etherman, 1) - err := sut.initialize(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + err := sut.Start(ctx) require.NoError(t, err) - sut.launchWork() - var waitDuration = time.Duration(0) - - sut.step(&waitDuration) - - waitDuration = time.Duration(0) - res := sut.step(&waitDuration) - require.True(t, res) // read everything in channel ch for len(ch) > 0 { data := <-ch @@ -73,23 +66,16 @@ func TestGivenNoNeedSyncWhenStartsSendAndEventOfSynchronized(t *testing.T) { // Then: Ask for rollupinfo func TestGivenNeedSyncWhenReachLastBlockThenSendAndEventOfSynchronized(t *testing.T) { sut, ethermans, ch := setup(t) - etherman := ethermans[0] // Our last block is 100 in DB and it returns 101 as last block on L1 // so it need to retrieve 1 rollupinfo - expectedForGettingL1LastBlock(t, etherman, 101) - expectedRollupInfoCalls(t, etherman, 1) - err := sut.initialize(context.Background()) - require.NoError(t, err) - var waitDuration = time.Duration(0) + expectedForGettingL1LastBlock(t, ethermans[0], 101) + expectedRollupInfoCalls(t, ethermans[1], 1) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + res := sut.Start(ctx) + require.NoError(t, res) - // Is going to ask for last block again because it'll launch all request - expectedForGettingL1LastBlock(t, etherman, 101) - sut.step(&waitDuration) - require.Equal(t, sut.status, producerWorking) - waitDuration = time.Millisecond * 100 // need a bit of time to receive the response to rollupinfo - res := sut.step(&waitDuration) - require.True(t, res) - require.Equal(t, sut.status, producerSynchronized) // read everything in channel ch for len(ch) > 0 { data := <-ch @@ -100,25 +86,24 @@ func TestGivenNeedSyncWhenReachLastBlockThenSendAndEventOfSynchronized(t *testin require.Fail(t, "should not have send a eventProducerIsFullySynced in channel") } -func TestGivenNoSetFirstBlockWhenCallStartThenReturnError(t *testing.T) { - sut, _, _ := setupNoResetCall(t) +func TestGivenNoSetFirstBlockWhenCallStartThenDontReturnError(t *testing.T) { + sut, ethermans, _ := setupNoResetCall(t) ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() + cancel() + expectedForGettingL1LastBlock(t, ethermans[0], 101) err := sut.Start(ctx) - require.Error(t, err) - require.Equal(t, errStartingBlockNumberMustBeDefined, err) + require.NoError(t, err) } func setup(t *testing.T) (*l1RollupInfoProducer, []*ethermanMock, chan l1SyncMessage) { sut, ethermansMock, resultChannel := setupNoResetCall(t) - sut.ResetAndStop(100) + sut.Reset(100) return sut, ethermansMock, resultChannel } func setupNoResetCall(t *testing.T) (*l1RollupInfoProducer, []*ethermanMock, chan l1SyncMessage) { - etherman := newEthermanMock(t) - ethermansMock := []*ethermanMock{etherman} - ethermans := []EthermanInterface{etherman} + ethermansMock := []*ethermanMock{newEthermanMock(t), newEthermanMock(t)} + ethermans := []EthermanInterface{ethermansMock[0], ethermansMock[1]} resultChannel := make(chan l1SyncMessage, 100) cfg := configProducer{ syncChunkSize: 100, diff --git a/synchronizer/l1_sync_orchestration.go b/synchronizer/l1_sync_orchestration.go index 3045c09a54..15d7cc8192 100644 --- a/synchronizer/l1_sync_orchestration.go +++ b/synchronizer/l1_sync_orchestration.go @@ -18,7 +18,7 @@ type l1RollupProducerInterface interface { // Stop cancel current process Stop() // ResetAndStop set a new starting point and cancel current process if any - ResetAndStop(startingBlockNumber uint64) + Reset(startingBlockNumber uint64) } type l1RollupConsumerInterface interface { @@ -59,11 +59,11 @@ func newL1SyncOrchestration(ctx context.Context, producer l1RollupProducerInterf } func (l *l1SyncOrchestration) reset(startingBlockNumber uint64) { - log.Warnf("Reset L1 sync process to blockNumber %d", startingBlockNumber) + log.Warnf("orchestration: Reset L1 sync process to blockNumber %d", startingBlockNumber) if l.isRunning { - log.Infof("orchestration: reset(%d) is going to stop producer", startingBlockNumber) + log.Infof("orchestration: reset(%d) is going to reset producer", startingBlockNumber) } - l.producer.ResetAndStop(startingBlockNumber) + l.producer.Reset(startingBlockNumber) // If orchestrator is running then producer is going to be started by orchestrate() select function when detects that producer has finished } diff --git a/synchronizer/l1_sync_orchestration_test.go b/synchronizer/l1_sync_orchestration_test.go index 106defaf3a..2b05abfdf9 100644 --- a/synchronizer/l1_sync_orchestration_test.go +++ b/synchronizer/l1_sync_orchestration_test.go @@ -19,14 +19,17 @@ func TestGivenOrquestrationWhenHappyPathThenReturnsBlockAndNoErrorAndProducerIsR ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() sut, mocks := setupOrchestrationTest(t, ctxTimeout) - mocks.producer.On("ResetAndStop", mock.Anything).Return() + mocks.producer.On("Reset", mock.Anything).Return() mocks.producer.On("Start", mock.Anything).Return(func(context.Context) error { time.Sleep(time.Second * 2) return nil }) block := state.Block{} mocks.consumer.On("GetLastEthBlockSynced").Return(block, true) - mocks.consumer.On("Start", mock.Anything).Return(nil) + mocks.consumer.On("Start", mock.Anything).Return(func(context.Context) error { + time.Sleep(time.Millisecond * 100) + return nil + }) sut.reset(123) returnedBlock, err := sut.start() require.NoError(t, err) diff --git a/synchronizer/l1_syncstatus.go b/synchronizer/l1_syncstatus.go index 0fae05a4f3..dbf70ace70 100644 --- a/synchronizer/l1_syncstatus.go +++ b/synchronizer/l1_syncstatus.go @@ -60,7 +60,7 @@ func (s *syncStatus) reset(lastBlockStoreOnStateDB uint64) { s.lastBlockStoreOnStateDB = lastBlockStoreOnStateDB s.highestBlockRequested = lastBlockStoreOnStateDB s.processingRanges = newLiveBlockRanges() - s.lastBlockOnL1 = invalidLastBlock + //s.lastBlockOnL1 = invalidLastBlock } func (s *syncStatus) getLastBlockOnL1() uint64 { @@ -69,10 +69,11 @@ func (s *syncStatus) getLastBlockOnL1() uint64 { return s.lastBlockOnL1 } +// All pending blocks have been requested or are currently being requested func (s *syncStatus) haveRequiredAllBlocksToBeSynchronized() bool { s.mutex.Lock() defer s.mutex.Unlock() - return s.lastBlockOnL1 <= s.highestBlockRequested && s.errorRanges.len() == 0 + return s.lastBlockOnL1 <= s.highestBlockRequested } // isNodeFullySynchronizedWithL1 returns true if the node is fully synchronized with L1 @@ -158,7 +159,8 @@ func (s *syncStatus) onStartedNewWorker(br blockRange) { } } -func (s *syncStatus) onFinishWorker(br blockRange, successful bool) { +// return true is a valid blockRange +func (s *syncStatus) onFinishWorker(br blockRange, successful bool) bool { s.mutex.Lock() defer s.mutex.Unlock() log.Debugf("onFinishWorker(br=%s, successful=%v) initial_status: %s", br.String(), successful, s.toStringBrief()) @@ -167,7 +169,7 @@ func (s *syncStatus) onFinishWorker(br blockRange, successful bool) { err := s.processingRanges.removeBlockRange(br) if err != nil { log.Infof("Unexpected finished block_range %s, ignoring it: %s", br.String(), err) - return + return false } if successful { @@ -190,6 +192,7 @@ func (s *syncStatus) onFinishWorker(br blockRange, successful bool) { } } log.Debugf("onFinishWorker final_status: %s", s.toStringBrief()) + return true } func getNextBlockRangeFromUnsafe(lastBlockInState uint64, lastBlockInL1 uint64, amountOfBlocksInEachRange uint64) *blockRange { @@ -272,6 +275,12 @@ func (s *syncStatus) isSetLastBlockOnL1Value() bool { return s.lastBlockOnL1 == invalidLastBlock } +func (s *syncStatus) doesItHaveAllTheNeedDataToWork() bool { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.lastBlockOnL1 != invalidLastBlock && s.lastBlockStoreOnStateDB != invalidBlockNumber +} + func (s *syncStatus) verify() error { if s.amountOfBlocksInEachRange == 0 { return errSyncChunkSizeMustBeGreaterThanZero diff --git a/synchronizer/l1_syncstatus_test.go b/synchronizer/l1_syncstatus_test.go index a21816594e..9effa68075 100644 --- a/synchronizer/l1_syncstatus_test.go +++ b/synchronizer/l1_syncstatus_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestGivenObjectWithDataWhenResetThenForgetLastBlockOnL1AndgetNextRangeReturnsNil(t *testing.T) { +func TestGivenObjectWithDataWhenResetThenDontForgetLastBlockOnL1AndgetNextRangeReturnsNil(t *testing.T) { s := newSyncStatus(1617, 10) s.setLastBlockOnL1(1982) s.onStartedNewWorker(blockRange{fromBlock: 1820, toBlock: 1920}) @@ -15,7 +15,7 @@ func TestGivenObjectWithDataWhenResetThenForgetLastBlockOnL1AndgetNextRangeRetur // lose lastBlockOnL1 so it returns a nil br := s.getNextRange() - require.Nil(t, br) + require.Equal(t, *br, blockRange{fromBlock: 1235, toBlock: 1245}) } func TestGivenObjectWithDataWhenResetAndSetLastBlockOnL1ThenGetNextRangeReturnsNextRange(t *testing.T) { @@ -60,7 +60,8 @@ func TestFirstRunWithPendingBlocksToRetrieve(t *testing.T) { func TestWhenReceiveAndNoStartedBlockRangeThenIgnore(t *testing.T) { s := newSyncStatus(1617, 10) s.setLastBlockOnL1(1982) - s.onFinishWorker(blockRange{fromBlock: 1618, toBlock: 1628}, true) + res := s.onFinishWorker(blockRange{fromBlock: 1618, toBlock: 1628}, true) + require.False(t, res) br := s.getNextRange() require.Equal(t, blockRange{fromBlock: 1618, toBlock: 1628}, *br) } @@ -97,7 +98,8 @@ func TestGenerateNextRangeWithProcessedResult(t *testing.T) { s := newSyncStatus(100, 10) s.setLastBlockOnL1(150) s.onStartedNewWorker(blockRange{fromBlock: 101, toBlock: 111}) - s.onFinishWorker(blockRange{fromBlock: 101, toBlock: 111}, true) + res := s.onFinishWorker(blockRange{fromBlock: 101, toBlock: 111}, true) + require.True(t, res) br := s.getNextRange() require.NotNil(t, br) require.Equal(t, *br, blockRange{fromBlock: 112, toBlock: 122}) @@ -111,7 +113,8 @@ func TestGivenMultiplesWorkersWhenBrInMiddleFinishThenDontChangeLastBlock(t *tes s.onStartedNewWorker(blockRange{fromBlock: 101, toBlock: 111}) s.onStartedNewWorker(blockRange{fromBlock: 112, toBlock: 122}) s.onStartedNewWorker(blockRange{fromBlock: 123, toBlock: 133}) - s.onFinishWorker(blockRange{fromBlock: 112, toBlock: 122}, true) + res := s.onFinishWorker(blockRange{fromBlock: 112, toBlock: 122}, true) + require.True(t, res) require.Equal(t, previousValue, s.lastBlockStoreOnStateDB) br := s.getNextRange() @@ -122,11 +125,11 @@ func TestGivenMultiplesWorkersWhenBrInMiddleFinishThenDontChangeLastBlock(t *tes func TestGivenMultiplesWorkersWhenFirstFinishThenChangeLastBlock(t *testing.T) { s := newSyncStatus(100, 10) s.setLastBlockOnL1(150) - s.onStartedNewWorker(blockRange{fromBlock: 101, toBlock: 111}) s.onStartedNewWorker(blockRange{fromBlock: 112, toBlock: 122}) s.onStartedNewWorker(blockRange{fromBlock: 123, toBlock: 133}) - s.onFinishWorker(blockRange{fromBlock: 101, toBlock: 111}, true) + res := s.onFinishWorker(blockRange{fromBlock: 101, toBlock: 111}, true) + require.True(t, res) require.Equal(t, uint64(111), s.lastBlockStoreOnStateDB) br := s.getNextRange() @@ -141,7 +144,8 @@ func TestGivenMultiplesWorkersWhenLastFinishThenDontChangeLastBlock(t *testing.T s.onStartedNewWorker(blockRange{fromBlock: 101, toBlock: 111}) s.onStartedNewWorker(blockRange{fromBlock: 112, toBlock: 122}) s.onStartedNewWorker(blockRange{fromBlock: 123, toBlock: 133}) - s.onFinishWorker(blockRange{fromBlock: 123, toBlock: 133}, true) + res := s.onFinishWorker(blockRange{fromBlock: 123, toBlock: 133}, true) + require.True(t, res) require.Equal(t, previousValue, s.lastBlockStoreOnStateDB) br := s.getNextRange() @@ -156,7 +160,8 @@ func TestGivenMultiplesWorkersWhenLastFinishAndFinishAlsoNextOneThenDontChangeLa s.onStartedNewWorker(blockRange{fromBlock: 101, toBlock: 111}) s.onStartedNewWorker(blockRange{fromBlock: 112, toBlock: 122}) s.onStartedNewWorker(blockRange{fromBlock: 123, toBlock: 133}) - s.onFinishWorker(blockRange{fromBlock: 123, toBlock: 133}, true) + res := s.onFinishWorker(blockRange{fromBlock: 123, toBlock: 133}, true) + require.True(t, res) s.onStartedNewWorker(blockRange{fromBlock: 134, toBlock: 144}) require.Equal(t, previousValue, s.lastBlockStoreOnStateDB) diff --git a/synchronizer/l1_worker_etherman.go b/synchronizer/l1_worker_etherman.go index 4852cddbd8..7ade6f7fa9 100644 --- a/synchronizer/l1_worker_etherman.go +++ b/synchronizer/l1_worker_etherman.go @@ -173,8 +173,8 @@ func (w *workerEtherman) asyncRequestRollupInfoByBlockRange(ctx contextWithCance duration := time.Since(now) result := newResponseRollupInfo(err, duration, typeRequestRollupInfo, &rollupInfoByBlockRangeResult{blockRange, blocks, order, lastBlock}) w.setStatus(ethermanIdle) - if !errors.Is(err, context.Canceled) { - log.Debugf("worker: RollUpInfo(%s) cancelled result err=%s", blockRange.String(), err) + if err != nil && !errors.Is(err, context.Canceled) { + log.Debugf("worker: RollUpInfo(%s) cancelled result err=%s", blockRange.String(), err.Error()) } ch <- result } diff --git a/synchronizer/l1_workers.go b/synchronizer/l1_workers.go index dc24d99141..74332592e9 100644 --- a/synchronizer/l1_workers.go +++ b/synchronizer/l1_workers.go @@ -11,7 +11,8 @@ import ( ) const ( - noSleepTime = time.Duration(0) + noSleepTime = time.Duration(0) + minimumNumberOfEthermans = 2 ) var ( @@ -41,8 +42,10 @@ func (w *workerData) String() string { } type workers struct { - mutex sync.Mutex - workers []workerData + mutex sync.Mutex + // worker for asking lastBlock on L1 (to avoid that all of them are busy) + workerForLastBlock workerData + workers []workerData // Channel to send to outside the responses from worker | workers --> client chOutgoingRollupInfo chan responseRollupInfoByBlockRange @@ -67,11 +70,15 @@ func (w *workers) String() string { func newWorkers(ethermans []EthermanInterface, cfg workersConfig) *workers { result := workers{chIncommingRollupInfo: make(chan responseRollupInfoByBlockRange, len(ethermans)+1), cfg: cfg} - - result.workers = make([]workerData, len(ethermans)) + if (len(ethermans)) < minimumNumberOfEthermans { + log.Fatalf("workers: at least %d ethermans are required, got %d", minimumNumberOfEthermans, len(ethermans)) + } + workers := make([]workerData, len(ethermans)) for i, etherman := range ethermans { - result.workers[i].worker = newWorker(etherman) + workers[i].worker = newWorker(etherman) } + result.workers = workers[1:] + result.workerForLastBlock = workers[0] result.chOutgoingRollupInfo = make(chan responseRollupInfoByBlockRange, len(ethermans)+1) return &result } @@ -84,7 +91,7 @@ func (w *workers) initialize() error { } func (w *workers) stop() { - log.Debugf("workers: stopping workers %s", w.String()) + log.Infof("workers: stopping workers %s", w.String()) for i := range w.workers { wd := &w.workers[i] if !wd.worker.isIdle() { @@ -92,7 +99,7 @@ func (w *workers) stop() { } wd.ctx.cancel() } - w.waitFinishAllWorkers() + //w.waitFinishAllWorkers() } func (w *workers) getResponseChannelForRollupInfo() chan responseRollupInfoByBlockRange { @@ -131,15 +138,16 @@ func (w *workers) requestLastBlock(ctx context.Context, timeout time.Duration) r defer ctxTimeout.cancel() w.mutex.Lock() defer w.mutex.Unlock() - workerIndex, worker := w.getIdleWorkerUnsafe() + //workerIndex, worker := w.getIdleWorkerUnsafe() + worker := &w.workerForLastBlock if worker == nil { log.Debugf("workers: call:[%s] failed err:%s", "requestLastBlock", errAllWorkersBusy) return newResponseL1LastBlock(errAllWorkersBusy, time.Duration(0), typeRequestLastBlock, nil) } - w.workers[workerIndex].ctx = ctxTimeout + worker.ctx = ctxTimeout - log.Debugf("workers: worker[%d] : launching requestLatBlock (timeout=%s)", workerIndex, timeout.String()) - result := worker.requestLastBlock(ctxTimeout.ctx) + log.Debugf("workers: worker : launching requestLatBlock (timeout=%s)", timeout.String()) + result := worker.worker.requestLastBlock(ctxTimeout.ctx) return result } diff --git a/synchronizer/l1_workers_decorator_limit_retries_by_time.go b/synchronizer/l1_workers_decorator_limit_retries_by_time.go index d820682771..5d160931cc 100644 --- a/synchronizer/l1_workers_decorator_limit_retries_by_time.go +++ b/synchronizer/l1_workers_decorator_limit_retries_by_time.go @@ -18,6 +18,10 @@ type controlWorkerFlux struct { retries int } +func (c *controlWorkerFlux) String() string { + return fmt.Sprintf("time:%s retries:%d", c.time, c.retries) +} + type workerDecoratorLimitRetriesByTime struct { mutex sync.Mutex workersInterface @@ -33,6 +37,12 @@ func (w *workerDecoratorLimitRetriesByTime) String() string { return fmt.Sprintf("[FILTERED_LRBT Active/%s]", w.minTimeBetweenCalls) + w.workersInterface.String() } +func (w *workerDecoratorLimitRetriesByTime) stop() { + w.mutex.Lock() + defer w.mutex.Unlock() + w.processingRanges = newLiveBlockRangesWithTag[controlWorkerFlux]() +} + func (w *workerDecoratorLimitRetriesByTime) asyncRequestRollupInfoByBlockRange(ctx context.Context, blockRange blockRange, sleepBefore time.Duration) (chan responseRollupInfoByBlockRange, error) { w.mutex.Lock() defer w.mutex.Unlock() @@ -52,7 +62,7 @@ func (w *workerDecoratorLimitRetriesByTime) asyncRequestRollupInfoByBlockRange(c ctrl = controlWorkerFlux{time: time.Now(), retries: 0} err = w.processingRanges.addBlockRangeWithTag(blockRange, ctrl) if err != nil { - log.Warnf("workerDecoratorLimitRetriesByTime: error adding blockRange %s with tag %s", blockRange, ctrl) + log.Warnf("workerDecoratorLimitRetriesByTime: error adding blockRange %s err:%s", blockRange.String(), err.Error()) } } diff --git a/synchronizer/mock_l1_rollup_producer_interface.go b/synchronizer/mock_l1_rollup_producer_interface.go index a38872a72c..a4689e6be8 100644 --- a/synchronizer/mock_l1_rollup_producer_interface.go +++ b/synchronizer/mock_l1_rollup_producer_interface.go @@ -13,8 +13,8 @@ type l1RollupProducerInterfaceMock struct { mock.Mock } -// ResetAndStop provides a mock function with given fields: startingBlockNumber -func (_m *l1RollupProducerInterfaceMock) ResetAndStop(startingBlockNumber uint64) { +// Reset provides a mock function with given fields: startingBlockNumber +func (_m *l1RollupProducerInterfaceMock) Reset(startingBlockNumber uint64) { _m.Called(startingBlockNumber) } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 529a5954dd..b72623badd 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -352,8 +352,9 @@ func (s *ClientSynchronizer) syncBlocksParallel(lastEthBlockSynced *state.Block) } if !s.l1SyncOrchestration.isProducerRunning() { log.Infof("producer is not running. Resetting the state to start from block %v (last on DB)", lastEthBlockSynced.BlockNumber) - s.l1SyncOrchestration.producer.ResetAndStop(lastEthBlockSynced.BlockNumber) + s.l1SyncOrchestration.producer.Reset(lastEthBlockSynced.BlockNumber) } + log.Infof("Starting L1 sync orchestrator in parallel") return s.l1SyncOrchestration.start() }