Skip to content

Commit

Permalink
better logging and progress (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
V-Staykov authored Feb 2, 2024
1 parent 5b975d7 commit a2ad6e8
Show file tree
Hide file tree
Showing 18 changed files with 78 additions and 26 deletions.
5 changes: 3 additions & 2 deletions core/genesis_write_zkevm.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package core

import (
"math/big"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/smt/pkg/smt"
"github.com/ledgerwatch/erigon/zkevm/hex"
"math/big"
)

func HermezMainnetGenesisBlock() *types.Genesis {
Expand Down Expand Up @@ -117,7 +118,7 @@ func processAccount(s *smt.SMT, root *big.Int, a *types.GenesisAccount, addr lib

// store the account storage
if len(sm) > 0 {
r, err = s.SetContractStorage(addr.String(), sm)
r, err = s.SetContractStorage(addr.String(), sm, nil)
}
return r, nil
}
8 changes: 7 additions & 1 deletion eth/stagedsync/stage_blockhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/chain"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
Expand Down Expand Up @@ -37,6 +38,10 @@ func StageBlockHashesCfg(db kv.RwDB, tmpDir string, cc *chain.Config) BlockHashe
}

func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Etl transform started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Etl transform ended", logPrefix))

useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
Expand All @@ -51,6 +56,8 @@ func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx cont
return fmt.Errorf("getting headers progress: %w", err)
}
if s.BlockNumber == headNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to transform", logPrefix))

return nil
}

Expand All @@ -59,7 +66,6 @@ func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx cont
endKey := dbutils.HeaderKey(headNumber+1, libcommon.Hash{}) // etl.Tranform uses ExractEndKey as exclusive bound, therefore +1

//todo do we need non canonical headers ?
logPrefix := s.LogPrefix()
if err := etl.Transform(
logPrefix,
tx,
Expand Down
6 changes: 5 additions & 1 deletion eth/stagedsync/stage_call_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func StageCallTracesCfg(
}

func SpawnCallTraces(s *StageState, tx kv.RwTx, cfg CallTracesCfg, ctx context.Context) error {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))

useExternalTx := tx != nil
if !useExternalTx {
var err error
Expand All @@ -62,11 +66,11 @@ func SpawnCallTraces(s *StageState, tx kv.RwTx, cfg CallTracesCfg, ctx context.C
if cfg.ToBlock > 0 && cfg.ToBlock < endBlock {
endBlock = cfg.ToBlock
}
logPrefix := s.LogPrefix()
if err != nil {
return fmt.Errorf("getting last executed block: %w", err)
}
if endBlock == s.BlockNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion eth/stagedsync/stage_cumulative_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/rlp"
)

type CumulativeIndexCfg struct {
Expand All @@ -30,6 +30,10 @@ func StageCumulativeIndexCfg(db kv.RwDB) CumulativeIndexCfg {
}

func SpawnStageCumulativeIndex(cfg CumulativeIndexCfg, s *StageState, tx kv.RwTx, ctx context.Context) error {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))

useExternalTx := tx != nil

if !useExternalTx {
Expand All @@ -50,6 +54,7 @@ func SpawnStageCumulativeIndex(cfg CumulativeIndexCfg, s *StageState, tx kv.RwTx
}
// If we are done already, we can exit the stage
if s.BlockNumber == headNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions eth/stagedsync/stage_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func StageFinishCfg(db kv.RwDB, tmpDir string, forkValidator *engineapi.ForkVali
}

func FinishForward(s *StageState, tx kv.RwTx, cfg FinishCfg, initialCycle bool) error {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))

useExternalTx := tx != nil
if !useExternalTx {
var err error
Expand All @@ -55,6 +59,7 @@ func FinishForward(s *StageState, tx kv.RwTx, cfg FinishCfg, initialCycle bool)
return err
}
if executionAt <= s.BlockNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))
return nil
}
rawdb.WriteHeadBlockHash(tx, rawdb.ReadHeadHeaderHash(tx))
Expand Down
10 changes: 9 additions & 1 deletion eth/stagedsync/stage_hashstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func StageHashStateCfg(db kv.RwDB, dirs datadir.Dirs, historyV3 bool, agg *state
}

func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx context.Context, quiet bool) error {
logPrefix := s.LogPrefix()
if !quiet {
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))
}

useExternalTx := tx != nil
if !useExternalTx {
var err error
Expand All @@ -60,7 +66,6 @@ func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx contex
defer tx.Rollback()
}

logPrefix := s.LogPrefix()
to, err := s.ExecutionAt(tx)
if err != nil {
return err
Expand All @@ -69,6 +74,9 @@ func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx contex
if s.BlockNumber == to {
// we already did hash check for this block
// we don't do the obvious `if s.BlockNumber > to` to support reorgs more naturally
if !quiet {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))
}
return nil
}
if s.BlockNumber > to {
Expand Down
13 changes: 11 additions & 2 deletions eth/stagedsync/stage_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func StageHistoryCfg(db kv.RwDB, prune prune.Mode, tmpDir string) HistoryCfg {
}

func SpawnAccountHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx context.Context) error {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))

useExternalTx := tx != nil
if !useExternalTx {
var err error
Expand All @@ -60,11 +64,12 @@ func SpawnAccountHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx con
quitCh := ctx.Done()

endBlock, err := s.ExecutionAt(tx)
logPrefix := s.LogPrefix()
if err != nil {
return fmt.Errorf(" getting last executed block: %w", err)
}
if endBlock <= s.BlockNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))

return nil
}

Expand Down Expand Up @@ -96,6 +101,10 @@ func SpawnAccountHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx con
}

func SpawnStorageHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx context.Context) error {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))

useExternalTx := tx != nil
if !useExternalTx {
var err error
Expand All @@ -108,11 +117,11 @@ func SpawnStorageHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx con
quitCh := ctx.Done()

executionAt, err := s.ExecutionAt(tx)
logPrefix := s.LogPrefix()
if err != nil {
return fmt.Errorf("getting last executed block: %w", err)
}
if executionAt <= s.BlockNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion eth/stagedsync/stage_log_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func StageLogIndexCfg(db kv.RwDB, prune prune.Mode, tmpDir string) LogIndexCfg {
}

func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Context, prematureEndBlock uint64) error {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))

useExternalTx := tx != nil
if !useExternalTx {
var err error
Expand All @@ -60,7 +64,6 @@ func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
}

endBlock, err := s.ExecutionAt(tx)
logPrefix := s.LogPrefix()
if err != nil {
return fmt.Errorf("getting last executed block: %w", err)
}
Expand All @@ -73,6 +76,7 @@ func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
// in which case it is important that we skip this stage,
// or else we could overwrite stage_at with prematureEndBlock
if endBlock <= s.BlockNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion eth/stagedsync/stage_senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/ledgerwatch/erigon/common/debug"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
)
Expand Down Expand Up @@ -103,6 +103,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
logPrefix := s.LogPrefix()
if !quiet && to > s.BlockNumber+16 {
log.Info(fmt.Sprintf("[%s] Started", logPrefix), "from", s.BlockNumber, "to", to)
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))
}

logEvery := time.NewTicker(30 * time.Second)
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_txlookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func StageTxLookupCfg(
}

func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, ctx context.Context) (err error) {

logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Skipping", logPrefix))
// TODO: abstract
return nil

Expand All @@ -58,7 +59,6 @@ func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, c
}
defer tx.Rollback()
}
logPrefix := s.LogPrefix()
endBlock, err := s.ExecutionAt(tx)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions smt/pkg/smt/entity_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *SMT) SetContractBytecode(ethAddr string, bytecode string) error {
return err
}

func (s *SMT) SetContractStorage(ethAddr string, storage map[string]string) (*big.Int, error) {
func (s *SMT) SetContractStorage(ethAddr string, storage map[string]string, progressChan chan uint64) (*big.Int, error) {
storageKeys := make([]string, len(storage))
ii := 0
for k := range storage {
Expand Down Expand Up @@ -201,7 +201,7 @@ func (s *SMT) SetContractStorage(ethAddr string, storage map[string]string) (*bi
}
}

auxRes, err := s.InsertStorage(ethAddr, &storage, &chm, &vhm)
auxRes, err := s.InsertStorage(ethAddr, &storage, &chm, &vhm, progressChan)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions smt/pkg/smt/entity_storage_mdbx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func runGenesisTestMdbx(tb testing.TB, filename string) {
}
// add storage if defined
if len(addr.Storage) > 0 {
_, _ = smt.SetContractStorage(addr.Address, addr.Storage)
_, _ = smt.SetContractStorage(addr.Address, addr.Storage, nil)
}
}

Expand Down Expand Up @@ -281,7 +281,7 @@ func runTestVectorsMdbx(t *testing.T, filename string) {
}
// add storage if defined
if len(addr.Storage) > 0 {
_, _ = smt.SetContractStorage(addr.Address, addr.Storage)
_, _ = smt.SetContractStorage(addr.Address, addr.Storage, nil)
}
}

Expand Down
5 changes: 3 additions & 2 deletions smt/pkg/smt/entity_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"context"

"github.com/ledgerwatch/erigon/smt/pkg/utils"
)

Expand Down Expand Up @@ -85,7 +86,7 @@ func runGenesisTest(tb testing.TB, filename string) {
}
// add storage if defined
if len(addr.Storage) > 0 {
_, _ = smt.SetContractStorage(addr.Address, addr.Storage)
_, _ = smt.SetContractStorage(addr.Address, addr.Storage, nil)
}
}

Expand Down Expand Up @@ -142,7 +143,7 @@ func runTestVectors(t *testing.T, filename string) {
}
// add storage if defined
if len(addr.Storage) > 0 {
_, _ = smt.SetContractStorage(addr.Address, addr.Storage)
_, _ = smt.SetContractStorage(addr.Address, addr.Storage, nil)
}
}

Expand Down
6 changes: 5 additions & 1 deletion smt/pkg/smt/smt.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *SMT) InsertKA(key utils.NodeKey, value *big.Int) (*SMTResponse, error)
return s.insertSingle(key, *v, [4]uint64{})
}

func (s *SMT) InsertStorage(ethAddr string, storage *map[string]string, chm *map[string]*utils.NodeValue8, vhm *map[string][4]uint64) (*SMTResponse, error) {
func (s *SMT) InsertStorage(ethAddr string, storage *map[string]string, chm *map[string]*utils.NodeValue8, vhm *map[string][4]uint64, progressChan chan uint64) (*SMTResponse, error) {
s.clearUpMutex.Lock()
defer s.clearUpMutex.Unlock()

Expand Down Expand Up @@ -166,6 +166,10 @@ func (s *SMT) InsertStorage(ethAddr string, storage *map[string]string, chm *map
if err != nil {
return nil, err
}

if progressChan != nil {
progressChan <- 1
}
}

if err = s.setLastRoot(*smtr.NewRootScalar); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion smt/pkg/smt/witness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func prepareSMT(t *testing.T) (*SMT, *trie.RetainList) {

storage[sKey.String()] = sVal.String()

smtTrie.SetContractStorage(contract.String(), storage)
smtTrie.SetContractStorage(contract.String(), storage, nil)

return smtTrie, rl
}
Expand Down
1 change: 1 addition & 0 deletions zk/stages/stage_dataStreamCatchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func SpawnStageDataStreamCatchup(

if stream == nil {
// skip the stage if there is no streamer provided
log.Info(fmt.Sprintf("[%s]: no streamer provided, skipping stage", logPrefix))
return nil
}

Expand Down
Loading

0 comments on commit a2ad6e8

Please sign in to comment.