Skip to content

Commit

Permalink
feat(BEDS-536): add re-index-blocks cmd to fix internal transfers s…
Browse files Browse the repository at this point in the history
…tatus (#2965)

* fix(internal transaction): recursively revert traces

* fix: SQLReaderDb interface missing methods

* fix(eth1tx): remove transfers if reverted

* fix(internal tx): only save the highest root revert

* ci lint

* feat: updated eth1 proto file

* feat: updated Eth1InternalTransaction in eth1.proto file

* feat: added Reverted value to Eth1InternalTransactionIndexed

* feat: add check for internal txs while querying the block from erigon node and set the status to 2 if tx has failed partially

* feat: updated reverted internal txs check logic

* feat(types): add status enum and add status partially executed

* refactor(client/erigon): simplify transaction indexing flow

* feat(client/erigon): rework geth traces

* fix: lint

* feat: add status to Eth1TransactionIndexed msg and update hash formatting

* fix: updated internal tx handling in GetBlock func

* fix: updated internal txs parsing in GetBlock

* fix: optimise the memory of internla tx parsing

* lint

* fix(internal): remove revert + status logic from client to transformers

* test(internal): test for revert transformer on tx and itx

* feat(cmd): add re-index-blocks cmd

* chore(proto): clean proto Eth1Transaction and Eth1TransactionIndexed

* fix(TransformItx): empty revertSource + error before skipping

* feat(BEDS-536): implement misc `fix-internal-txs-from-node` cmd with batch processing from Erigon node (#2967)

* wip: move itx status parsing logic to transformers func

* wip: implementing fix-internal-txs misc cmd

* wip: updated geth client initialisation

* wip: moved blocks by batch logic to erigon.go

* wip: updated block by number batch call

* updated blocks batch processing

* feat: added transformers for bulk mutations

* fix: save current block height in local variable when performing mutation of the block data

* fix: add check for start and end block flags

* fix: updated erigon client

* fix: add check for batchcall size and return if it's empty

* cleanup unused code

* cleanup

* updated cmd name to fix-internal-txs-from-node

* fix: uncles, blobGasUsed & excessBlobGas parsing

* rm timings, update GetBlocksByBatch func

* fix: revised ReindexITxsFromNode func logic

* fix: update the logic for parsing uncles, BlobGasUsed and ExcessBlobGas

* (BEDS-536) Use raw db for resync (#2972)

* rpc/erigon: use raw bigtable
* rpc/erigon: use cache raw db
* rpc/erigon: correct path for geth traces
* cmd/reindex: improve performance + fix raw store cache
* rpc/erigon: fix sender address
* db2/store: add remote server + client

* fix ci

* store/bigtable: fix range limits

go mod

* store/bigtable: fix grpc error on close

* updated Receipts len check

* updated traceMode to geth

* fix ci

* clenup

* rpc/erigon: parse traces geth handle CALLCODE

* fix(transform itx): allow internal index == ITX_PER_TX_LIMIT

* fix(bigtable): retry on grpc internal err

* fix(re index): log error rather than returning error and panicking

* feat(re index): re print read error at the end

* fix(TransformEnsNameRegistered): return none nil if ignored chainID

* fix(db2/WithFallback): fallback on syscall.ECONNRESET

* fix(blockHash): prevent wrong calculated hash

* fix: return error if mismatch between receipts and transactions length

* fix: blockhash read from node response

* fix: merge go mod

---------

Co-authored-by: Monika-Bitfly <[email protected]>
Co-authored-by: Patrick Pfeiffer <[email protected]>
  • Loading branch information
3 people authored Jan 22, 2025
1 parent 07d8905 commit a073010
Show file tree
Hide file tree
Showing 37 changed files with 4,639 additions and 989 deletions.
16 changes: 15 additions & 1 deletion cache/tiered_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/sirupsen/logrus"
)

var _ TieredCacher = (*tieredCache)(nil)

// Tiered cache is a cache implementation combining a
type tieredCache struct {
localGoCache *freecache.Cache
Expand All @@ -31,7 +33,19 @@ type RemoteCache interface {
GetBool(ctx context.Context, key string) (bool, error)
}

var TieredCache *tieredCache
type TieredCacher interface {
Set(key string, value interface{}, expiration time.Duration) error
SetString(key string, value string, expiration time.Duration) error
SetUint64(key string, value uint64, expiration time.Duration) error
SetBool(key string, value bool, expiration time.Duration) error

GetStringWithLocalTimeout(key string, localExpiration time.Duration) (string, error)
GetUint64WithLocalTimeout(key string, localExpiration time.Duration) (uint64, error)
GetBoolWithLocalTimeout(key string, localExpiration time.Duration) (bool, error)
GetWithLocalTimeout(key string, localExpiration time.Duration, returnValue interface{}) (interface{}, error)
}

var TieredCache TieredCacher

func MustInitTieredCache(redisAddress string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
Expand Down
4 changes: 2 additions & 2 deletions cmd/eth1indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
offsetBlocks := flag.Int64("blocks.offset", 100, "Blocks offset")
checkBlocksGaps := flag.Bool("blocks.gaps", false, "Check for gaps in the blocks table")
checkBlocksGapsLookback := flag.Int("blocks.gaps.lookback", 1000000, "Lookback for gaps check of the blocks table")
traceMode := flag.String("blocks.tracemode", "parity/geth", "Trace mode to use, can bei either 'parity', 'geth' or 'parity/geth' for both")
traceMode := flag.String("blocks.tracemode", "geth", "Trace mode to use, can bei either 'parity', 'geth' or 'parity/geth' for both")

concurrencyData := flag.Int64("data.concurrency", 30, "Concurrency to use when indexing data from bigtable")
startData := flag.Int64("data.start", 0, "Block to start indexing")
Expand Down Expand Up @@ -187,7 +187,7 @@ func main() {
return
}

transforms := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
transforms := make([]db.TransformFunc, 0)
transforms = append(transforms,
bt.TransformBlock,
bt.TransformTx,
Expand Down
204 changes: 182 additions & 22 deletions cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package main
import (
"bytes"
"context"

"database/sql"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"math"
"math/big"
Expand All @@ -17,6 +17,7 @@ import (
"time"

"firebase.google.com/go/v4/messaging"

"github.com/gobitfly/eth2-beaconchain-explorer/cmd/misc/commands"
"github.com/gobitfly/eth2-beaconchain-explorer/db"
"github.com/gobitfly/eth2-beaconchain-explorer/exporter"
Expand All @@ -27,19 +28,15 @@ import (
"github.com/gobitfly/eth2-beaconchain-explorer/utils"
"github.com/gobitfly/eth2-beaconchain-explorer/version"

"github.com/Gurpartap/storekit-go"
"github.com/coocood/freecache"
"github.com/ethereum/go-ethereum/common"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/pkg/errors"
utilMath "github.com/protolambda/zrnt/eth2/util/math"
"github.com/sirupsen/logrus"
go_ens "github.com/wealdtech/go-ens/v3"
"golang.org/x/sync/errgroup"

"flag"

"github.com/Gurpartap/storekit-go"

"github.com/sirupsen/logrus"
)

var opts = struct {
Expand Down Expand Up @@ -77,7 +74,7 @@ func main() {
statsPartitionCommand := commands.StatsMigratorCommand{}

configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, disable-user-per-email, validate-firebase-tokens")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, re-index-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, disable-user-per-email, validate-firebase-tokens")
flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
flag.Uint64Var(&opts.User, "user", 0, "user id")
Expand Down Expand Up @@ -320,6 +317,8 @@ func main() {
exportHistoricPrices(opts.StartDay, opts.EndDay)
case "index-missing-blocks":
indexMissingBlocks(opts.StartBlock, opts.EndBlock, bt, erigonClient)
case "re-index-blocks":
reIndexBlocks(opts.StartBlock, opts.EndBlock, bt, erigonClient, opts.Transformers, opts.BatchSize, opts.DataConcurrency)
case "migrate-last-attestation-slot-bigtable":
migrateLastAttestationSlotToBigtable()
case "migrate-app-purchases":
Expand Down Expand Up @@ -440,6 +439,8 @@ func main() {
err = disableUserPerEmail()
case "fix-epochs":
err = fixEpochs()
case "fix-internal-txs-from-node":
fixInternalTxsFromNode(opts.StartBlock, opts.EndBlock, opts.BatchSize, opts.DataConcurrency, bt)
case "validate-firebase-tokens":
err = validateFirebaseTokens()
default:
Expand Down Expand Up @@ -544,6 +545,52 @@ func disableUserPerEmail() error {
return nil
}

func fixInternalTxsFromNode(startBlock, endBlock, batchSize, concurrency uint64, bt *db.Bigtable) {
if endBlock > 0 && endBlock < startBlock {
utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", endBlock, startBlock), 0)
return
}

if concurrency == 0 {
utils.LogError(nil, "concurrency must be greater than 0", 0)
return
}
if bt == nil {
utils.LogError(nil, "no bigtable provided", 0)
return
}

transformers := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
transformers = append(transformers, bt.TransformBlock, bt.TransformTx, bt.TransformItx)

to := endBlock
if endBlock == math.MaxInt64 {
lastBlockFromBlocksTable, err := bt.GetLastBlockInBlocksTable()
if err != nil {
utils.LogError(err, "error retrieving last blocks from blocks table", 0)
return
}

to = uint64(lastBlockFromBlocksTable)
}

cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit
blockCount := utilMath.MaxU64(1, batchSize)

logrus.Infof("Starting to reindex all txs for blocks ranging from %d to %d", startBlock, to)
for from := startBlock; from <= to; from = from + blockCount {
toBlock := utilMath.MinU64(to, from+blockCount-1)

logrus.Infof("reindexing txs for blocks from height %v to %v in data table ...", from, toBlock)
err := bt.ReindexITxsFromNode(int64(from), int64(toBlock), int64(batchSize), int64(concurrency), transformers, cache)
if err != nil {
utils.LogError(err, "error indexing from bigtable", 0)
}
cache.Clear()

}
}

func fixEns(erigonClient *rpc.ErigonClient) error {
logrus.WithField("dry", opts.DryRun).Infof("command: fix-ens")
addrs := []struct {
Expand Down Expand Up @@ -1082,7 +1129,7 @@ func debugBlocks() error {
}
// logrus.WithFields(logrus.Fields{"block": i, "data": fmt.Sprintf("%+v", b)}).Infof("block from bt")

elBlock, _, err := elClient.GetBlock(int64(i), "parity/geth")
elBlock, _, err := elClient.GetBlock(int64(i), "geth")
if err != nil {
return err
}
Expand Down Expand Up @@ -1550,7 +1597,7 @@ func indexMissingBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.E
if _, err := db.BigtableClient.GetBlockFromBlocksTable(block); err != nil {
logrus.Infof("could not load [%v] from blocks table, will try to fetch it from the node and save it", block)

bc, _, err := client.GetBlock(int64(block), "parity/geth")
bc, _, err := client.GetBlock(int64(block), "geth")
if err != nil {
utils.LogError(err, fmt.Sprintf("error getting block %v from the node", block), 0)
return
Expand All @@ -1568,35 +1615,127 @@ func indexMissingBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.E
}
}

func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, concurrency uint64, transformerFlag string, bt *db.Bigtable, client *rpc.ErigonClient) {
if endBlock > 0 && endBlock < startBlock {
utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", endBlock, startBlock), 0)
// Goes through the blocks in the given range from [start] to [end] and re indexes them with the provided transformers
//
// Both [start] and [end] are inclusive
// Pass math.MaxInt64 as [end] to export from [start] to the last block in the blocks table
func reIndexBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.ErigonClient, transformerFlag string, batchSize uint64, concurrency uint64) {
if start > 0 && end < start {
utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", end, start), 0)
return
}
if concurrency == 0 {
utils.LogError(nil, "concurrency must be greater than 0", 0)
return
}
if bt == nil {
utils.LogError(nil, "no bigtable provided", 0)
if end == math.MaxInt64 {
lastBlockFromBlocksTable, err := bt.GetLastBlockInBlocksTable()
if err != nil {
logrus.Errorf("error retrieving last blocks from blocks table: %v", err)
return
}
end = uint64(lastBlockFromBlocksTable)
}
transformers, importENSChanges, err := getTransformers(transformerFlag, bt)
if err != nil {
utils.LogError(nil, err, 0)
return
}
if importENSChanges {
if err := bt.ImportEnsUpdates(client.GetNativeClient(), math.MaxInt64); err != nil {
utils.LogError(err, "error importing ens from events", 0)
return
}
}

readGroup := errgroup.Group{}
readGroup.SetLimit(int(concurrency))

writeGroup := errgroup.Group{}
writeGroup.SetLimit(int(concurrency*concurrency) + 1)

cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit
quit := make(chan struct{})

sink := make(chan *types.Eth1Block)
writeGroup.Go(func() error {
for {
select {
case block, ok := <-sink:
if !ok {
return nil
}
writeGroup.Go(func() error {
if err := bt.SaveBlock(block); err != nil {
return fmt.Errorf("error saving block %v: %w", block.Number, err)
}
err := bt.IndexBlocksWithTransformers([]*types.Eth1Block{block}, transformers, cache)
if err != nil {
return fmt.Errorf("error indexing from bigtable: %w", err)
}
logrus.Infof("%d indexed", block.Number)
return nil
})
case <-quit:
return nil
}
}
})

var errs []error
var mu sync.Mutex
for i := start; i <= end; i = i + batchSize {
height := int64(i)
readGroup.Go(func() error {
heightEnd := height + int64(batchSize) - 1
if heightEnd > int64(end) {
heightEnd = int64(end)
}
blocks, err := client.GetBlocks(height, heightEnd, "geth")
if err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("cannot read block range %d-%d: %w", height, heightEnd, err))
mu.Unlock()
logrus.WithFields(map[string]interface{}{
"message": err.Error(),
"start": height,
"end": heightEnd,
}).Error("cannot read block range")
return nil
}
for _, block := range blocks {
sink <- block
}
return nil
})
}
if err := readGroup.Wait(); err != nil {
panic(err)
}
for _, err := range errs {
logrus.Error(err.Error())
}
quit <- struct{}{}
close(sink)
if err := writeGroup.Wait(); err != nil {
panic(err)
}
}

transforms := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
func getTransformers(transformerFlag string, bt *db.Bigtable) ([]db.TransformFunc, bool, error) {
transforms := make([]db.TransformFunc, 0)

logrus.Infof("transformerFlag: %v", transformerFlag)
transformerList := strings.Split(transformerFlag, ",")
if transformerFlag == "all" {
transformerList = []string{"TransformBlock", "TransformTx", "TransformBlobTx", "TransformItx", "TransformERC20", "TransformERC721", "TransformERC1155", "TransformWithdrawals", "TransformUncle", "TransformEnsNameRegistered", "TransformContract"}
} else if len(transformerList) == 0 {
utils.LogError(nil, "no transformer functions provided", 0)
return
return nil, false, fmt.Errorf("no transformer functions provided")
}
logrus.Infof("transformers: %v", transformerList)

importENSChanges := false
/**
* Add additional transformers you want to sync to this switch case
**/
for _, t := range transformerList {
switch t {
case "TransformBlock":
Expand All @@ -1623,10 +1762,31 @@ func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, co
case "TransformContract":
transforms = append(transforms, bt.TransformContract)
default:
utils.LogError(nil, "Invalid transformer flag %v", 0)
return
return nil, false, fmt.Errorf("invalid transformer flag %v", t)
}
}
return transforms, importENSChanges, nil
}

func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, concurrency uint64, transformerFlag string, bt *db.Bigtable, client *rpc.ErigonClient) {
if endBlock > 0 && endBlock < startBlock {
utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", endBlock, startBlock), 0)
return
}
if concurrency == 0 {
utils.LogError(nil, "concurrency must be greater than 0", 0)
return
}
if bt == nil {
utils.LogError(nil, "no bigtable provided", 0)
return
}

transforms, importENSChanges, err := getTransformers(transformerFlag, bt)
if err != nil {
utils.LogError(nil, err, 0)
return
}

cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit

Expand Down
38 changes: 38 additions & 0 deletions cmd/store/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"errors"
"flag"
"net/http"

"github.com/sirupsen/logrus"

"github.com/gobitfly/eth2-beaconchain-explorer/db2"
"github.com/gobitfly/eth2-beaconchain-explorer/db2/store"
"github.com/gobitfly/eth2-beaconchain-explorer/types"
"github.com/gobitfly/eth2-beaconchain-explorer/utils"
)

func main() {
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.Parse()

cfg := &types.Config{}
err := utils.ReadConfig(cfg, *configPath)
if err != nil {
panic(err)
}

bt, err := store.NewBigTable(cfg.RawBigtable.Bigtable.Project, cfg.RawBigtable.Bigtable.Instance, nil)
if err != nil {
panic(err)
}
remote := store.NewRemoteStore(store.Wrap(bt, db2.BlocksRawTable, ""))
go func() {
logrus.Info("starting remote raw store on port 8087")
if err := http.ListenAndServe("0.0.0.0:8087", remote.Routes()); err != nil && !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}()
utils.WaitForCtrlC()
}
Loading

0 comments on commit a073010

Please sign in to comment.