From 4928981c29c6905c2eba7783b61a9fe755eb767c Mon Sep 17 00:00:00 2001 From: Valentin Staykov Date: Fri, 2 Feb 2024 15:21:42 +0000 Subject: [PATCH] better logging and progress --- core/genesis_write_zkevm.go | 5 +++-- eth/stagedsync/stage_blockhashes.go | 8 +++++++- eth/stagedsync/stage_call_traces.go | 6 +++++- eth/stagedsync/stage_cumulative_index.go | 7 ++++++- eth/stagedsync/stage_finish.go | 5 +++++ eth/stagedsync/stage_hashstate.go | 10 +++++++++- eth/stagedsync/stage_indexes.go | 13 +++++++++++-- eth/stagedsync/stage_log_index.go | 6 +++++- eth/stagedsync/stage_senders.go | 3 ++- eth/stagedsync/stage_txlookup.go | 4 ++-- smt/pkg/smt/entity_storage.go | 4 ++-- smt/pkg/smt/entity_storage_mdbx_test.go | 4 ++-- smt/pkg/smt/entity_storage_test.go | 5 +++-- smt/pkg/smt/smt.go | 6 +++++- smt/pkg/smt/witness_test.go | 2 +- zk/stages/stage_dataStreamCatchup.go | 1 + zk/stages/stage_interhashes.go | 13 ++++++++----- zk/utils.go | 2 +- 18 files changed, 78 insertions(+), 26 deletions(-) diff --git a/core/genesis_write_zkevm.go b/core/genesis_write_zkevm.go index 2877dbc9406..4358ae02c21 100644 --- a/core/genesis_write_zkevm.go +++ b/core/genesis_write_zkevm.go @@ -1,12 +1,13 @@ package core import ( + "math/big" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/smt/pkg/smt" "github.com/ledgerwatch/erigon/zkevm/hex" - "math/big" ) func HermezMainnetGenesisBlock() *types.Genesis { @@ -117,7 +118,7 @@ func processAccount(s *smt.SMT, root *big.Int, a *types.GenesisAccount, addr lib // store the account storage if len(sm) > 0 { - r, err = s.SetContractStorage(addr.String(), sm) + r, err = s.SetContractStorage(addr.String(), sm, nil) } return r, nil } diff --git a/eth/stagedsync/stage_blockhashes.go b/eth/stagedsync/stage_blockhashes.go index 76ec2bd305b..4e815c3935f 100644 --- a/eth/stagedsync/stage_blockhashes.go +++ b/eth/stagedsync/stage_blockhashes.go @@ -9,6 +9,7 @@ import ( "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/chain" + "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" @@ -37,6 +38,10 @@ func StageBlockHashesCfg(db kv.RwDB, tmpDir string, cc *chain.Config) BlockHashe } func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) { + logPrefix := s.LogPrefix() + log.Info(fmt.Sprintf("[%s] Etl transform started", logPrefix)) + defer log.Info(fmt.Sprintf("[%s] Etl transform ended", logPrefix)) + useExternalTx := tx != nil if !useExternalTx { tx, err = cfg.db.BeginRw(ctx) @@ -51,6 +56,8 @@ func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx cont return fmt.Errorf("getting headers progress: %w", err) } if s.BlockNumber == headNumber { + log.Info(fmt.Sprintf("[%s] Nothing new to transform", logPrefix)) + return nil } @@ -59,7 +66,6 @@ func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx cont endKey := dbutils.HeaderKey(headNumber+1, libcommon.Hash{}) // etl.Tranform uses ExractEndKey as exclusive bound, therefore +1 //todo do we need non canonical headers ? - logPrefix := s.LogPrefix() if err := etl.Transform( logPrefix, tx, diff --git a/eth/stagedsync/stage_call_traces.go b/eth/stagedsync/stage_call_traces.go index fe5e620fa3b..a5e3dbbf093 100644 --- a/eth/stagedsync/stage_call_traces.go +++ b/eth/stagedsync/stage_call_traces.go @@ -47,6 +47,10 @@ func StageCallTracesCfg( } func SpawnCallTraces(s *StageState, tx kv.RwTx, cfg CallTracesCfg, ctx context.Context) error { + logPrefix := s.LogPrefix() + log.Info(fmt.Sprintf("[%s] Started", logPrefix)) + defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix)) + useExternalTx := tx != nil if !useExternalTx { var err error @@ -62,11 +66,11 @@ func SpawnCallTraces(s *StageState, tx kv.RwTx, cfg CallTracesCfg, ctx context.C if cfg.ToBlock > 0 && cfg.ToBlock < endBlock { endBlock = cfg.ToBlock } - logPrefix := s.LogPrefix() if err != nil { return fmt.Errorf("getting last executed block: %w", err) } if endBlock == s.BlockNumber { + log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix)) return nil } diff --git a/eth/stagedsync/stage_cumulative_index.go b/eth/stagedsync/stage_cumulative_index.go index 37dae2b1755..231f6bac6c5 100644 --- a/eth/stagedsync/stage_cumulative_index.go +++ b/eth/stagedsync/stage_cumulative_index.go @@ -15,8 +15,8 @@ import ( "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/rlp" ) type CumulativeIndexCfg struct { @@ -30,6 +30,10 @@ func StageCumulativeIndexCfg(db kv.RwDB) CumulativeIndexCfg { } func SpawnStageCumulativeIndex(cfg CumulativeIndexCfg, s *StageState, tx kv.RwTx, ctx context.Context) error { + logPrefix := s.LogPrefix() + log.Info(fmt.Sprintf("[%s] Started", logPrefix)) + defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix)) + useExternalTx := tx != nil if !useExternalTx { @@ -50,6 +54,7 @@ func SpawnStageCumulativeIndex(cfg CumulativeIndexCfg, s *StageState, tx kv.RwTx } // If we are done already, we can exit the stage if s.BlockNumber == headNumber { + log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix)) return nil } diff --git a/eth/stagedsync/stage_finish.go b/eth/stagedsync/stage_finish.go index 7b85c1bcebb..da68387335e 100644 --- a/eth/stagedsync/stage_finish.go +++ b/eth/stagedsync/stage_finish.go @@ -39,6 +39,10 @@ func StageFinishCfg(db kv.RwDB, tmpDir string, forkValidator *engineapi.ForkVali } func FinishForward(s *StageState, tx kv.RwTx, cfg FinishCfg, initialCycle bool) error { + logPrefix := s.LogPrefix() + log.Info(fmt.Sprintf("[%s] Started", logPrefix)) + defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix)) + useExternalTx := tx != nil if !useExternalTx { var err error @@ -55,6 +59,7 @@ func FinishForward(s *StageState, tx kv.RwTx, cfg FinishCfg, initialCycle bool) return err } if executionAt <= s.BlockNumber { + log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix)) return nil } rawdb.WriteHeadBlockHash(tx, rawdb.ReadHeadHeaderHash(tx)) diff --git a/eth/stagedsync/stage_hashstate.go b/eth/stagedsync/stage_hashstate.go index 832a3cd2b5a..02d162ab282 100644 --- a/eth/stagedsync/stage_hashstate.go +++ b/eth/stagedsync/stage_hashstate.go @@ -50,6 +50,12 @@ func StageHashStateCfg(db kv.RwDB, dirs datadir.Dirs, historyV3 bool, agg *state } func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx context.Context, quiet bool) error { + logPrefix := s.LogPrefix() + if !quiet { + log.Info(fmt.Sprintf("[%s] Started", logPrefix)) + defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix)) + } + useExternalTx := tx != nil if !useExternalTx { var err error @@ -60,7 +66,6 @@ func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx contex defer tx.Rollback() } - logPrefix := s.LogPrefix() to, err := s.ExecutionAt(tx) if err != nil { return err @@ -69,6 +74,9 @@ func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx contex if s.BlockNumber == to { // we already did hash check for this block // we don't do the obvious `if s.BlockNumber > to` to support reorgs more naturally + if !quiet { + log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix)) + } return nil } if s.BlockNumber > to { diff --git a/eth/stagedsync/stage_indexes.go b/eth/stagedsync/stage_indexes.go index 2c6ff2b6310..a299d5cc1c1 100644 --- a/eth/stagedsync/stage_indexes.go +++ b/eth/stagedsync/stage_indexes.go @@ -48,6 +48,10 @@ func StageHistoryCfg(db kv.RwDB, prune prune.Mode, tmpDir string) HistoryCfg { } func SpawnAccountHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx context.Context) error { + logPrefix := s.LogPrefix() + log.Info(fmt.Sprintf("[%s] Started", logPrefix)) + defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix)) + useExternalTx := tx != nil if !useExternalTx { var err error @@ -60,11 +64,12 @@ func SpawnAccountHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx con quitCh := ctx.Done() endBlock, err := s.ExecutionAt(tx) - logPrefix := s.LogPrefix() if err != nil { return fmt.Errorf(" getting last executed block: %w", err) } if endBlock <= s.BlockNumber { + log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix)) + return nil } @@ -96,6 +101,10 @@ func SpawnAccountHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx con } func SpawnStorageHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx context.Context) error { + logPrefix := s.LogPrefix() + log.Info(fmt.Sprintf("[%s] Started", logPrefix)) + defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix)) + useExternalTx := tx != nil if !useExternalTx { var err error @@ -108,11 +117,11 @@ func SpawnStorageHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx con quitCh := ctx.Done() executionAt, err := s.ExecutionAt(tx) - logPrefix := s.LogPrefix() if err != nil { return fmt.Errorf("getting last executed block: %w", err) } if executionAt <= s.BlockNumber { + log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix)) return nil } diff --git a/eth/stagedsync/stage_log_index.go b/eth/stagedsync/stage_log_index.go index 01a6947dff3..1b34ebf0f18 100644 --- a/eth/stagedsync/stage_log_index.go +++ b/eth/stagedsync/stage_log_index.go @@ -49,6 +49,10 @@ func StageLogIndexCfg(db kv.RwDB, prune prune.Mode, tmpDir string) LogIndexCfg { } func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Context, prematureEndBlock uint64) error { + logPrefix := s.LogPrefix() + log.Info(fmt.Sprintf("[%s] Started", logPrefix)) + defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix)) + useExternalTx := tx != nil if !useExternalTx { var err error @@ -60,7 +64,6 @@ func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte } endBlock, err := s.ExecutionAt(tx) - logPrefix := s.LogPrefix() if err != nil { return fmt.Errorf("getting last executed block: %w", err) } @@ -73,6 +76,7 @@ func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte // in which case it is important that we skip this stage, // or else we could overwrite stage_at with prematureEndBlock if endBlock <= s.BlockNumber { + log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix)) return nil } diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index 156223eeb49..487f7284aa4 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -23,8 +23,8 @@ import ( "github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" ) @@ -103,6 +103,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R logPrefix := s.LogPrefix() if !quiet && to > s.BlockNumber+16 { log.Info(fmt.Sprintf("[%s] Started", logPrefix), "from", s.BlockNumber, "to", to) + defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix)) } logEvery := time.NewTicker(30 * time.Second) diff --git a/eth/stagedsync/stage_txlookup.go b/eth/stagedsync/stage_txlookup.go index 7889e4200e1..22e81d44865 100644 --- a/eth/stagedsync/stage_txlookup.go +++ b/eth/stagedsync/stage_txlookup.go @@ -45,7 +45,8 @@ func StageTxLookupCfg( } func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, ctx context.Context) (err error) { - + logPrefix := s.LogPrefix() + log.Info(fmt.Sprintf("[%s] Skipping", logPrefix)) // TODO: abstract return nil @@ -58,7 +59,6 @@ func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, c } defer tx.Rollback() } - logPrefix := s.LogPrefix() endBlock, err := s.ExecutionAt(tx) if err != nil { return err diff --git a/smt/pkg/smt/entity_storage.go b/smt/pkg/smt/entity_storage.go index 8d1ef13532e..82a7a2a2a8a 100644 --- a/smt/pkg/smt/entity_storage.go +++ b/smt/pkg/smt/entity_storage.go @@ -111,7 +111,7 @@ func (s *SMT) SetContractBytecode(ethAddr string, bytecode string) error { return err } -func (s *SMT) SetContractStorage(ethAddr string, storage map[string]string) (*big.Int, error) { +func (s *SMT) SetContractStorage(ethAddr string, storage map[string]string, progressChan chan uint64) (*big.Int, error) { storageKeys := make([]string, len(storage)) ii := 0 for k := range storage { @@ -201,7 +201,7 @@ func (s *SMT) SetContractStorage(ethAddr string, storage map[string]string) (*bi } } - auxRes, err := s.InsertStorage(ethAddr, &storage, &chm, &vhm) + auxRes, err := s.InsertStorage(ethAddr, &storage, &chm, &vhm, progressChan) if err != nil { return nil, err } diff --git a/smt/pkg/smt/entity_storage_mdbx_test.go b/smt/pkg/smt/entity_storage_mdbx_test.go index 0a2720eae8a..33cdb24e9f6 100644 --- a/smt/pkg/smt/entity_storage_mdbx_test.go +++ b/smt/pkg/smt/entity_storage_mdbx_test.go @@ -224,7 +224,7 @@ func runGenesisTestMdbx(tb testing.TB, filename string) { } // add storage if defined if len(addr.Storage) > 0 { - _, _ = smt.SetContractStorage(addr.Address, addr.Storage) + _, _ = smt.SetContractStorage(addr.Address, addr.Storage, nil) } } @@ -281,7 +281,7 @@ func runTestVectorsMdbx(t *testing.T, filename string) { } // add storage if defined if len(addr.Storage) > 0 { - _, _ = smt.SetContractStorage(addr.Address, addr.Storage) + _, _ = smt.SetContractStorage(addr.Address, addr.Storage, nil) } } diff --git a/smt/pkg/smt/entity_storage_test.go b/smt/pkg/smt/entity_storage_test.go index 47cf95e1bf9..3688f590e59 100644 --- a/smt/pkg/smt/entity_storage_test.go +++ b/smt/pkg/smt/entity_storage_test.go @@ -10,6 +10,7 @@ import ( "testing" "context" + "github.com/ledgerwatch/erigon/smt/pkg/utils" ) @@ -85,7 +86,7 @@ func runGenesisTest(tb testing.TB, filename string) { } // add storage if defined if len(addr.Storage) > 0 { - _, _ = smt.SetContractStorage(addr.Address, addr.Storage) + _, _ = smt.SetContractStorage(addr.Address, addr.Storage, nil) } } @@ -142,7 +143,7 @@ func runTestVectors(t *testing.T, filename string) { } // add storage if defined if len(addr.Storage) > 0 { - _, _ = smt.SetContractStorage(addr.Address, addr.Storage) + _, _ = smt.SetContractStorage(addr.Address, addr.Storage, nil) } } diff --git a/smt/pkg/smt/smt.go b/smt/pkg/smt/smt.go index 64e7101fe53..1c50162fe38 100644 --- a/smt/pkg/smt/smt.go +++ b/smt/pkg/smt/smt.go @@ -132,7 +132,7 @@ func (s *SMT) InsertKA(key utils.NodeKey, value *big.Int) (*SMTResponse, error) return s.insertSingle(key, *v, [4]uint64{}) } -func (s *SMT) InsertStorage(ethAddr string, storage *map[string]string, chm *map[string]*utils.NodeValue8, vhm *map[string][4]uint64) (*SMTResponse, error) { +func (s *SMT) InsertStorage(ethAddr string, storage *map[string]string, chm *map[string]*utils.NodeValue8, vhm *map[string][4]uint64, progressChan chan uint64) (*SMTResponse, error) { s.clearUpMutex.Lock() defer s.clearUpMutex.Unlock() @@ -166,6 +166,10 @@ func (s *SMT) InsertStorage(ethAddr string, storage *map[string]string, chm *map if err != nil { return nil, err } + + if progressChan != nil { + progressChan <- 1 + } } if err = s.setLastRoot(*smtr.NewRootScalar); err != nil { diff --git a/smt/pkg/smt/witness_test.go b/smt/pkg/smt/witness_test.go index acb224e6a62..e019bce5018 100644 --- a/smt/pkg/smt/witness_test.go +++ b/smt/pkg/smt/witness_test.go @@ -76,7 +76,7 @@ func prepareSMT(t *testing.T) (*SMT, *trie.RetainList) { storage[sKey.String()] = sVal.String() - smtTrie.SetContractStorage(contract.String(), storage) + smtTrie.SetContractStorage(contract.String(), storage, nil) return smtTrie, rl } diff --git a/zk/stages/stage_dataStreamCatchup.go b/zk/stages/stage_dataStreamCatchup.go index 27880766edb..4bed7967eed 100644 --- a/zk/stages/stage_dataStreamCatchup.go +++ b/zk/stages/stage_dataStreamCatchup.go @@ -44,6 +44,7 @@ func SpawnStageDataStreamCatchup( if stream == nil { // skip the stage if there is no streamer provided + log.Info(fmt.Sprintf("[%s]: no streamer provided, skipping stage", logPrefix)) return nil } diff --git a/zk/stages/stage_interhashes.go b/zk/stages/stage_interhashes.go index 6cdc98e7722..965c79e8081 100644 --- a/zk/stages/stage_interhashes.go +++ b/zk/stages/stage_interhashes.go @@ -412,10 +412,14 @@ func zkIncrementIntermediateHashes(logPrefix string, s *stagedsync.StageState, d } } - total := len(accChanges) + len(codeChanges) + len(storageChanges) + storageTotal := 0 + for _, v := range storageChanges { + storageTotal += len(v) + } - progressChan, stopProgressPrinter := zk.ProgressPrinter(fmt.Sprintf("[%s] Progress inserting values", logPrefix), uint64(total)) + total := len(accChanges) + len(codeChanges) + storageTotal + progressChan, stopProgressPrinter := zk.ProgressPrinter(fmt.Sprintf("[%s] Progress inserting values", logPrefix), uint64(total)) // update the tree for addr, acc := range accChanges { if err := updateAccInTree(dbSmt, addr, acc); err != nil { @@ -434,11 +438,10 @@ func zkIncrementIntermediateHashes(logPrefix string, s *stagedsync.StageState, d } for addr, storage := range storageChanges { - if _, err := dbSmt.SetContractStorage(addr.String(), storage); err != nil { + if _, err := dbSmt.SetContractStorage(addr.String(), storage, progressChan); err != nil { stopProgressPrinter() return trie.EmptyRoot, err } - progressChan <- 1 } stopProgressPrinter() @@ -592,7 +595,7 @@ func unwindZkSMT(logPrefix string, from, to uint64, db kv.RwTx, checkRoot bool, } for addr, storage := range storageChanges { - if _, err := dbSmt.SetContractStorage(addr.String(), storage); err != nil { + if _, err := dbSmt.SetContractStorage(addr.String(), storage, nil); err != nil { return trie.EmptyRoot, err } } diff --git a/zk/utils.go b/zk/utils.go index 50e306de4eb..d382275f471 100644 --- a/zk/utils.go +++ b/zk/utils.go @@ -26,7 +26,7 @@ func ProgressPrinter(message string, total uint64) (chan uint64, func()) { for { select { case newPc := <-progress: - pc = newPc + pc += newPc if total > 0 { pct = (pc * 100) / total }