Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

better logging and progress #102

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/genesis_write_zkevm.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package core

import (
"math/big"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/smt/pkg/smt"
"github.com/ledgerwatch/erigon/zkevm/hex"
"math/big"
)

func HermezMainnetGenesisBlock() *types.Genesis {
Expand Down Expand Up @@ -117,7 +118,7 @@ func processAccount(s *smt.SMT, root *big.Int, a *types.GenesisAccount, addr lib

// store the account storage
if len(sm) > 0 {
r, err = s.SetContractStorage(addr.String(), sm)
r, err = s.SetContractStorage(addr.String(), sm, nil)
}
return r, nil
}
8 changes: 7 additions & 1 deletion eth/stagedsync/stage_blockhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/chain"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
Expand Down Expand Up @@ -37,6 +38,10 @@ func StageBlockHashesCfg(db kv.RwDB, tmpDir string, cc *chain.Config) BlockHashe
}

func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Etl transform started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Etl transform ended", logPrefix))

useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
Expand All @@ -51,6 +56,8 @@ func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx cont
return fmt.Errorf("getting headers progress: %w", err)
}
if s.BlockNumber == headNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to transform", logPrefix))

return nil
}

Expand All @@ -59,7 +66,6 @@ func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx cont
endKey := dbutils.HeaderKey(headNumber+1, libcommon.Hash{}) // etl.Tranform uses ExractEndKey as exclusive bound, therefore +1

//todo do we need non canonical headers ?
logPrefix := s.LogPrefix()
if err := etl.Transform(
logPrefix,
tx,
Expand Down
6 changes: 5 additions & 1 deletion eth/stagedsync/stage_call_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func StageCallTracesCfg(
}

func SpawnCallTraces(s *StageState, tx kv.RwTx, cfg CallTracesCfg, ctx context.Context) error {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))

useExternalTx := tx != nil
if !useExternalTx {
var err error
Expand All @@ -62,11 +66,11 @@ func SpawnCallTraces(s *StageState, tx kv.RwTx, cfg CallTracesCfg, ctx context.C
if cfg.ToBlock > 0 && cfg.ToBlock < endBlock {
endBlock = cfg.ToBlock
}
logPrefix := s.LogPrefix()
if err != nil {
return fmt.Errorf("getting last executed block: %w", err)
}
if endBlock == s.BlockNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion eth/stagedsync/stage_cumulative_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/rlp"
)

type CumulativeIndexCfg struct {
Expand All @@ -30,6 +30,10 @@ func StageCumulativeIndexCfg(db kv.RwDB) CumulativeIndexCfg {
}

func SpawnStageCumulativeIndex(cfg CumulativeIndexCfg, s *StageState, tx kv.RwTx, ctx context.Context) error {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))

useExternalTx := tx != nil

if !useExternalTx {
Expand All @@ -50,6 +54,7 @@ func SpawnStageCumulativeIndex(cfg CumulativeIndexCfg, s *StageState, tx kv.RwTx
}
// If we are done already, we can exit the stage
if s.BlockNumber == headNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions eth/stagedsync/stage_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func StageFinishCfg(db kv.RwDB, tmpDir string, forkValidator *engineapi.ForkVali
}

func FinishForward(s *StageState, tx kv.RwTx, cfg FinishCfg, initialCycle bool) error {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))

useExternalTx := tx != nil
if !useExternalTx {
var err error
Expand All @@ -55,6 +59,7 @@ func FinishForward(s *StageState, tx kv.RwTx, cfg FinishCfg, initialCycle bool)
return err
}
if executionAt <= s.BlockNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))
return nil
}
rawdb.WriteHeadBlockHash(tx, rawdb.ReadHeadHeaderHash(tx))
Expand Down
10 changes: 9 additions & 1 deletion eth/stagedsync/stage_hashstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func StageHashStateCfg(db kv.RwDB, dirs datadir.Dirs, historyV3 bool, agg *state
}

func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx context.Context, quiet bool) error {
logPrefix := s.LogPrefix()
if !quiet {
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))
}

useExternalTx := tx != nil
if !useExternalTx {
var err error
Expand All @@ -60,7 +66,6 @@ func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx contex
defer tx.Rollback()
}

logPrefix := s.LogPrefix()
to, err := s.ExecutionAt(tx)
if err != nil {
return err
Expand All @@ -69,6 +74,9 @@ func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx contex
if s.BlockNumber == to {
// we already did hash check for this block
// we don't do the obvious `if s.BlockNumber > to` to support reorgs more naturally
if !quiet {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))
}
return nil
}
if s.BlockNumber > to {
Expand Down
13 changes: 11 additions & 2 deletions eth/stagedsync/stage_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func StageHistoryCfg(db kv.RwDB, prune prune.Mode, tmpDir string) HistoryCfg {
}

func SpawnAccountHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx context.Context) error {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))

useExternalTx := tx != nil
if !useExternalTx {
var err error
Expand All @@ -60,11 +64,12 @@ func SpawnAccountHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx con
quitCh := ctx.Done()

endBlock, err := s.ExecutionAt(tx)
logPrefix := s.LogPrefix()
if err != nil {
return fmt.Errorf(" getting last executed block: %w", err)
}
if endBlock <= s.BlockNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))

return nil
}

Expand Down Expand Up @@ -96,6 +101,10 @@ func SpawnAccountHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx con
}

func SpawnStorageHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx context.Context) error {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))

useExternalTx := tx != nil
if !useExternalTx {
var err error
Expand All @@ -108,11 +117,11 @@ func SpawnStorageHistoryIndex(s *StageState, tx kv.RwTx, cfg HistoryCfg, ctx con
quitCh := ctx.Done()

executionAt, err := s.ExecutionAt(tx)
logPrefix := s.LogPrefix()
if err != nil {
return fmt.Errorf("getting last executed block: %w", err)
}
if executionAt <= s.BlockNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion eth/stagedsync/stage_log_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func StageLogIndexCfg(db kv.RwDB, prune prune.Mode, tmpDir string) LogIndexCfg {
}

func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Context, prematureEndBlock uint64) error {
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))

useExternalTx := tx != nil
if !useExternalTx {
var err error
Expand All @@ -60,7 +64,6 @@ func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
}

endBlock, err := s.ExecutionAt(tx)
logPrefix := s.LogPrefix()
if err != nil {
return fmt.Errorf("getting last executed block: %w", err)
}
Expand All @@ -73,6 +76,7 @@ func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
// in which case it is important that we skip this stage,
// or else we could overwrite stage_at with prematureEndBlock
if endBlock <= s.BlockNumber {
log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix))
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion eth/stagedsync/stage_senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/ledgerwatch/erigon/common/debug"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
)
Expand Down Expand Up @@ -103,6 +103,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
logPrefix := s.LogPrefix()
if !quiet && to > s.BlockNumber+16 {
log.Info(fmt.Sprintf("[%s] Started", logPrefix), "from", s.BlockNumber, "to", to)
defer log.Info(fmt.Sprintf("[%s] Finished", logPrefix))
}

logEvery := time.NewTicker(30 * time.Second)
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_txlookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func StageTxLookupCfg(
}

func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, ctx context.Context) (err error) {

logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Skipping", logPrefix))
// TODO: abstract
return nil

Expand All @@ -58,7 +59,6 @@ func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, c
}
defer tx.Rollback()
}
logPrefix := s.LogPrefix()
endBlock, err := s.ExecutionAt(tx)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions smt/pkg/smt/entity_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *SMT) SetContractBytecode(ethAddr string, bytecode string) error {
return err
}

func (s *SMT) SetContractStorage(ethAddr string, storage map[string]string) (*big.Int, error) {
func (s *SMT) SetContractStorage(ethAddr string, storage map[string]string, progressChan chan uint64) (*big.Int, error) {
storageKeys := make([]string, len(storage))
ii := 0
for k := range storage {
Expand Down Expand Up @@ -201,7 +201,7 @@ func (s *SMT) SetContractStorage(ethAddr string, storage map[string]string) (*bi
}
}

auxRes, err := s.InsertStorage(ethAddr, &storage, &chm, &vhm)
auxRes, err := s.InsertStorage(ethAddr, &storage, &chm, &vhm, progressChan)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions smt/pkg/smt/entity_storage_mdbx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func runGenesisTestMdbx(tb testing.TB, filename string) {
}
// add storage if defined
if len(addr.Storage) > 0 {
_, _ = smt.SetContractStorage(addr.Address, addr.Storage)
_, _ = smt.SetContractStorage(addr.Address, addr.Storage, nil)
}
}

Expand Down Expand Up @@ -281,7 +281,7 @@ func runTestVectorsMdbx(t *testing.T, filename string) {
}
// add storage if defined
if len(addr.Storage) > 0 {
_, _ = smt.SetContractStorage(addr.Address, addr.Storage)
_, _ = smt.SetContractStorage(addr.Address, addr.Storage, nil)
}
}

Expand Down
5 changes: 3 additions & 2 deletions smt/pkg/smt/entity_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"context"

"github.com/ledgerwatch/erigon/smt/pkg/utils"
)

Expand Down Expand Up @@ -85,7 +86,7 @@ func runGenesisTest(tb testing.TB, filename string) {
}
// add storage if defined
if len(addr.Storage) > 0 {
_, _ = smt.SetContractStorage(addr.Address, addr.Storage)
_, _ = smt.SetContractStorage(addr.Address, addr.Storage, nil)
}
}

Expand Down Expand Up @@ -142,7 +143,7 @@ func runTestVectors(t *testing.T, filename string) {
}
// add storage if defined
if len(addr.Storage) > 0 {
_, _ = smt.SetContractStorage(addr.Address, addr.Storage)
_, _ = smt.SetContractStorage(addr.Address, addr.Storage, nil)
}
}

Expand Down
6 changes: 5 additions & 1 deletion smt/pkg/smt/smt.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *SMT) InsertKA(key utils.NodeKey, value *big.Int) (*SMTResponse, error)
return s.insertSingle(key, *v, [4]uint64{})
}

func (s *SMT) InsertStorage(ethAddr string, storage *map[string]string, chm *map[string]*utils.NodeValue8, vhm *map[string][4]uint64) (*SMTResponse, error) {
func (s *SMT) InsertStorage(ethAddr string, storage *map[string]string, chm *map[string]*utils.NodeValue8, vhm *map[string][4]uint64, progressChan chan uint64) (*SMTResponse, error) {
s.clearUpMutex.Lock()
defer s.clearUpMutex.Unlock()

Expand Down Expand Up @@ -166,6 +166,10 @@ func (s *SMT) InsertStorage(ethAddr string, storage *map[string]string, chm *map
if err != nil {
return nil, err
}

if progressChan != nil {
progressChan <- 1
}
}

if err = s.setLastRoot(*smtr.NewRootScalar); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion smt/pkg/smt/witness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func prepareSMT(t *testing.T) (*SMT, *trie.RetainList) {

storage[sKey.String()] = sVal.String()

smtTrie.SetContractStorage(contract.String(), storage)
smtTrie.SetContractStorage(contract.String(), storage, nil)

return smtTrie, rl
}
Expand Down
1 change: 1 addition & 0 deletions zk/stages/stage_dataStreamCatchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func SpawnStageDataStreamCatchup(

if stream == nil {
// skip the stage if there is no streamer provided
log.Info(fmt.Sprintf("[%s]: no streamer provided, skipping stage", logPrefix))
return nil
}

Expand Down
Loading