Skip to content

Commit

Permalink
feat(BEDS-536): implement misc fix-internal-txs-from-node cmd with …
Browse files Browse the repository at this point in the history
…batch processing from Erigon node (#2967)

* wip: move itx status parsing logic to transformers func

* wip: implementing fix-internal-txs misc cmd

* wip: updated geth client initialisation

* wip: moved blocks by batch logic to erigon.go

* wip: updated block by number batch call

* updated blocks batch processing

* feat: added transformers for bulk mutations

* fix: save current block height in local variable when performing mutation of the block data

* fix: add check for start and end block flags

* fix: updated erigon client

* fix: add check for batchcall size and return if it's empty

* cleanup unused code

* cleanup

* updated cmd name to fix-internal-txs-from-node

* fix: uncles, blobGasUsed & excessBlobGas parsing

* rm timings, update GetBlocksByBatch func

* fix: revised ReindexITxsFromNode func logic

* fix: update the logic for parsing uncles, BlobGasUsed and ExcessBlobGas
  • Loading branch information
Monika-Bitfly authored and Tangui-Bitfly committed Jan 22, 2025
1 parent 9d133f3 commit d25650e
Show file tree
Hide file tree
Showing 3 changed files with 467 additions and 1 deletion.
48 changes: 48 additions & 0 deletions cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ func main() {
err = disableUserPerEmail()
case "fix-epochs":
err = fixEpochs()
case "fix-internal-txs-from-node":
fixInternalTxsFromNode(opts.StartBlock, opts.EndBlock, opts.BatchSize, opts.DataConcurrency, bt)
case "validate-firebase-tokens":
err = validateFirebaseTokens()
default:
Expand Down Expand Up @@ -543,6 +545,52 @@ func disableUserPerEmail() error {
return nil
}

func fixInternalTxsFromNode(startBlock, endBlock, batchSize, concurrency uint64, bt *db.Bigtable) {
if endBlock > 0 && endBlock < startBlock {
utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", endBlock, startBlock), 0)
return
}

if concurrency == 0 {
utils.LogError(nil, "concurrency must be greater than 0", 0)
return
}
if bt == nil {
utils.LogError(nil, "no bigtable provided", 0)
return
}

transformers := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
transformers = append(transformers, bt.TransformBlock, bt.TransformTx, bt.TransformItx)

to := endBlock
if endBlock == math.MaxInt64 {
lastBlockFromBlocksTable, err := bt.GetLastBlockInBlocksTable()
if err != nil {
utils.LogError(err, "error retrieving last blocks from blocks table", 0)
return
}

to = uint64(lastBlockFromBlocksTable)
}

cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit
blockCount := utilMath.MaxU64(1, batchSize)

logrus.Infof("Starting to reindex all txs for blocks ranging from %d to %d", startBlock, to)
for from := startBlock; from <= to; from = from + blockCount {
toBlock := utilMath.MinU64(to, from+blockCount-1)

logrus.Infof("reindexing txs for blocks from height %v to %v in data table ...", from, toBlock)
err := bt.ReindexITxsFromNode(int64(from), int64(toBlock), int64(batchSize), int64(concurrency), transformers, cache)
if err != nil {
utils.LogError(err, "error indexing from bigtable", 0)
}
cache.Clear()

}
}

func fixEns(erigonClient *rpc.ErigonClient) error {
logrus.WithField("dry", opts.DryRun).Infof("command: fix-ens")
addrs := []struct {
Expand Down
93 changes: 92 additions & 1 deletion db/bigtable_eth1.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/ethereum/go-ethereum/common/math"
eth_types "github.com/ethereum/go-ethereum/core/types"
"github.com/go-redis/redis/v8"

"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -4924,3 +4923,95 @@ func (bigtable *Bigtable) GetGasNowHistory(ts, pastTs time.Time) ([]types.GasNow
}
return history, nil
}

func (bigtable *Bigtable) ReindexITxsFromNode(start, end, batchSize, concurrency int64, transforms []func(blk *types.Eth1Block, cache *freecache.Cache) (bulkData *types.BulkMutations, bulkMetadataUpdates *types.BulkMutations, err error), cache *freecache.Cache) error {
g := new(errgroup.Group)
g.SetLimit(int(concurrency))

if start == 0 && end == 0 {
return fmt.Errorf("start or end block height can't be 0")
}

if end < start {
return fmt.Errorf("end block must be grater or equal to start block")
}

logrus.Infof("reindexing txs for blocks from %d to %d", start, end)

for i := start; i <= end; i += batchSize {
firstBlock := i
lastBlock := firstBlock + batchSize - 1
if lastBlock > end {
lastBlock = end
}

blockNumbers := make([]int64, 0, lastBlock-firstBlock+1)
for b := firstBlock; b <= lastBlock; b++ {
blockNumbers = append(blockNumbers, b)
}

g.Go(func() error {
blocks, err := rpc.CurrentErigonClient.GetBlocksByBatch(blockNumbers)
if err != nil {
return fmt.Errorf("error getting blocks by batch from %v to %v: %v", firstBlock, lastBlock, err)
}

subG := new(errgroup.Group)
subG.SetLimit(int(concurrency))

for _, block := range blocks {
currentBlock := block
subG.Go(func() error {
bulkMutsData := types.BulkMutations{}
bulkMutsMetadataUpdate := types.BulkMutations{}
for _, transform := range transforms {
mutsData, mutsMetadataUpdate, err := transform(currentBlock, cache)
if err != nil {
logrus.WithError(err).Errorf("error transforming block [%v]", currentBlock.Number)
}
bulkMutsData.Keys = append(bulkMutsData.Keys, mutsData.Keys...)
bulkMutsData.Muts = append(bulkMutsData.Muts, mutsData.Muts...)

if mutsMetadataUpdate != nil {
bulkMutsMetadataUpdate.Keys = append(bulkMutsMetadataUpdate.Keys, mutsMetadataUpdate.Keys...)
bulkMutsMetadataUpdate.Muts = append(bulkMutsMetadataUpdate.Muts, mutsMetadataUpdate.Muts...)
}
}

if len(bulkMutsData.Keys) > 0 {
metaKeys := strings.Join(bulkMutsData.Keys, ",") // save block keys in order to be able to handle chain reorgs
err := bigtable.SaveBlockKeys(currentBlock.Number, currentBlock.Hash, metaKeys)
if err != nil {
return fmt.Errorf("error saving block [%v] keys to bigtable metadata updates table: %w", currentBlock.Number, err)
}

err = bigtable.WriteBulk(&bulkMutsData, bigtable.tableData, DEFAULT_BATCH_INSERTS)
if err != nil {
return fmt.Errorf("error writing block [%v] to bigtable data table: %w", currentBlock.Number, err)
}
}

if len(bulkMutsMetadataUpdate.Keys) > 0 {
err := bigtable.WriteBulk(&bulkMutsMetadataUpdate, bigtable.tableMetadataUpdates, DEFAULT_BATCH_INSERTS)
if err != nil {
return fmt.Errorf("error writing block [%v] to bigtable metadata updates table: %w", currentBlock.Number, err)
}
}

return nil
})
}
return subG.Wait()
})

}

if err := g.Wait(); err == nil {
logrus.Info("data table indexing completed")
} else {
utils.LogError(err, "wait group error", 0)
return err
}

return nil
}
Loading

0 comments on commit d25650e

Please sign in to comment.