Skip to content

Commit

Permalink
(BEDS-536) Use raw db for resync (#2972)
Browse files Browse the repository at this point in the history
* rpc/erigon: use raw bigtable
* rpc/erigon: use cache raw db
* rpc/erigon: correct path for geth traces
* cmd/reindex: improve performance + fix raw store cache
* rpc/erigon: fix sender address
* db2/store: add remote server + client
  • Loading branch information
Tangui-Bitfly authored Oct 29, 2024
1 parent 20a211e commit 357057a
Show file tree
Hide file tree
Showing 22 changed files with 2,600 additions and 251 deletions.
2 changes: 1 addition & 1 deletion cmd/eth1indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func main() {
return
}

transforms := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
transforms := make([]db.TransformFunc, 0)
transforms = append(transforms,
bt.TransformBlock,
bt.TransformTx,
Expand Down
144 changes: 99 additions & 45 deletions cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func main() {
case "index-missing-blocks":
indexMissingBlocks(opts.StartBlock, opts.EndBlock, bt, erigonClient)
case "re-index-blocks":
reIndexBlocks(opts.StartBlock, opts.EndBlock, bt, erigonClient, opts.Transformers)
reIndexBlocks(opts.StartBlock, opts.EndBlock, bt, erigonClient, opts.Transformers, opts.BatchSize, opts.DataConcurrency)
case "migrate-last-attestation-slot-bigtable":
migrateLastAttestationSlotToBigtable()
case "migrate-app-purchases":
Expand Down Expand Up @@ -1619,7 +1619,15 @@ func indexMissingBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.E
//
// Both [start] and [end] are inclusive
// Pass math.MaxInt64 as [end] to export from [start] to the last block in the blocks table
func reIndexBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.ErigonClient, transformerFlag string) {
func reIndexBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.ErigonClient, transformerFlag string, batchSize uint64, concurrency uint64) {
if start > 0 && end < start {
utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", end, start), 0)
return
}
if concurrency == 0 {
utils.LogError(nil, "concurrency must be greater than 0", 0)
return
}
if end == math.MaxInt64 {
lastBlockFromBlocksTable, err := bt.GetLastBlockInBlocksTable()
if err != nil {
Expand All @@ -1628,68 +1636,93 @@ func reIndexBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.Erigon
}
end = uint64(lastBlockFromBlocksTable)
}

errFields := map[string]interface{}{
"start": start,
"end": end,
transformers, importENSChanges, err := getTransformers(transformerFlag, bt)
if err != nil {
utils.LogError(nil, err, 0)
return
}

batchSize := uint64(10000)
for from := start; from <= end; from += batchSize {
targetCount := batchSize
if from+targetCount >= end {
targetCount = end - from + 1
if importENSChanges {
if err := bt.ImportEnsUpdates(client.GetNativeClient(), math.MaxInt64); err != nil {
utils.LogError(err, "error importing ens from events", 0)
return
}
to := from + targetCount - 1
}

errFields["from"] = from
errFields["to"] = to
errFields["targetCount"] = targetCount
readGroup := errgroup.Group{}
readGroup.SetLimit(int(concurrency))

for block := from; block <= to; block++ {
bc, _, err := client.GetBlock(int64(block), "parity/geth")
if err != nil {
utils.LogError(err, fmt.Sprintf("error getting block %v from the node", block), 0, errFields)
return
}
if err := bt.SaveBlock(bc); err != nil {
utils.LogError(err, fmt.Sprintf("error saving block: %v ", block), 0, errFields)
return
writeGroup := errgroup.Group{}
writeGroup.SetLimit(int(concurrency*concurrency) + 1)

cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit
quit := make(chan struct{})

sink := make(chan *types.Eth1Block)
writeGroup.Go(func() error {
for {
select {
case block, ok := <-sink:
if !ok {
return nil
}
writeGroup.Go(func() error {
if err := bt.SaveBlock(block); err != nil {
return fmt.Errorf("error saving block %v: %w", block.Number, err)
}
err := bt.IndexBlocksWithTransformers([]*types.Eth1Block{block}, transformers, cache)
if err != nil {
return fmt.Errorf("error indexing from bigtable: %w", err)
}
logrus.Infof("%d indexed", block.Number)
return nil
})
case <-quit:
return nil
}
}
indexOldEth1Blocks(from, to, batchSize, 1, transformerFlag, bt, client)
}
}
})

func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, concurrency uint64, transformerFlag string, bt *db.Bigtable, client *rpc.ErigonClient) {
if endBlock > 0 && endBlock < startBlock {
utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", endBlock, startBlock), 0)
return
for i := start; i <= end; i = i + batchSize {
height := int64(i)
readGroup.Go(func() error {
heightEnd := height + int64(batchSize) - 1
if heightEnd > int64(end) {
heightEnd = int64(end)
}
blocks, err := client.GetBlocks(height, heightEnd, "geth")
if err != nil {
return fmt.Errorf("error getting block %v from the node: %w", height, err)
}
for _, block := range blocks {
sink <- block
}
return nil
})
}
if concurrency == 0 {
utils.LogError(nil, "concurrency must be greater than 0", 0)
return
if err := readGroup.Wait(); err != nil {
panic(err)
}
if bt == nil {
utils.LogError(nil, "no bigtable provided", 0)
return
quit <- struct{}{}
close(sink)
if err := writeGroup.Wait(); err != nil {
panic(err)
}
}

transforms := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
func getTransformers(transformerFlag string, bt *db.Bigtable) ([]db.TransformFunc, bool, error) {
transforms := make([]db.TransformFunc, 0)

logrus.Infof("transformerFlag: %v", transformerFlag)
transformerList := strings.Split(transformerFlag, ",")
if transformerFlag == "all" {
transformerList = []string{"TransformBlock", "TransformTx", "TransformBlobTx", "TransformItx", "TransformERC20", "TransformERC721", "TransformERC1155", "TransformWithdrawals", "TransformUncle", "TransformEnsNameRegistered", "TransformContract"}
} else if len(transformerList) == 0 {
utils.LogError(nil, "no transformer functions provided", 0)
return
return nil, false, fmt.Errorf("no transformer functions provided")
}
logrus.Infof("transformers: %v", transformerList)

importENSChanges := false
/**
* Add additional transformers you want to sync to this switch case
**/
for _, t := range transformerList {
switch t {
case "TransformBlock":
Expand All @@ -1716,10 +1749,31 @@ func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, co
case "TransformContract":
transforms = append(transforms, bt.TransformContract)
default:
utils.LogError(nil, "Invalid transformer flag %v", 0)
return
return nil, false, fmt.Errorf("invalid transformer flag %v", t)
}
}
return transforms, importENSChanges, nil
}

func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, concurrency uint64, transformerFlag string, bt *db.Bigtable, client *rpc.ErigonClient) {
if endBlock > 0 && endBlock < startBlock {
utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", endBlock, startBlock), 0)
return
}
if concurrency == 0 {
utils.LogError(nil, "concurrency must be greater than 0", 0)
return
}
if bt == nil {
utils.LogError(nil, "no bigtable provided", 0)
return
}

transforms, importENSChanges, err := getTransformers(transformerFlag, bt)
if err != nil {
utils.LogError(nil, err, 0)
return
}

cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit

Expand Down
38 changes: 38 additions & 0 deletions cmd/store/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"errors"
"flag"
"net/http"

"github.com/sirupsen/logrus"

"github.com/gobitfly/eth2-beaconchain-explorer/db2"
"github.com/gobitfly/eth2-beaconchain-explorer/db2/store"
"github.com/gobitfly/eth2-beaconchain-explorer/types"
"github.com/gobitfly/eth2-beaconchain-explorer/utils"
)

func main() {
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.Parse()

cfg := &types.Config{}
err := utils.ReadConfig(cfg, *configPath)
if err != nil {
panic(err)
}

bt, err := store.NewBigTable(cfg.RawBigtable.Bigtable.Project, cfg.RawBigtable.Bigtable.Instance, nil)
if err != nil {
panic(err)
}
remote := store.NewRemoteStore(store.Wrap(bt, db2.BlocksRawTable, ""))
go func() {
logrus.Info("starting remote raw store on port 8087")
if err := http.ListenAndServe("0.0.0.0:8087", remote.Routes()); err != nil && !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}()
utils.WaitForCtrlC()
}
55 changes: 54 additions & 1 deletion db/bigtable_eth1.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func TimestampToBigtableTimeDesc(ts time.Time) string {
return fmt.Sprintf("%04d%02d%02d%02d%02d%02d", 9999-ts.Year(), 12-ts.Month(), 31-ts.Day(), 23-ts.Hour(), 59-ts.Minute(), 59-ts.Second())
}

func (bigtable *Bigtable) IndexEventsWithTransformers(start, end int64, transforms []func(blk *types.Eth1Block, cache *freecache.Cache) (bulkData *types.BulkMutations, bulkMetadataUpdates *types.BulkMutations, err error), concurrency int64, cache *freecache.Cache) error {
func (bigtable *Bigtable) IndexEventsWithTransformers(start, end int64, transforms []TransformFunc, concurrency int64, cache *freecache.Cache) error {
g := new(errgroup.Group)
g.SetLimit(int(concurrency))

Expand Down Expand Up @@ -766,6 +766,59 @@ func (bigtable *Bigtable) IndexEventsWithTransformers(start, end int64, transfor
return nil
}

type TransformFunc func(blk *types.Eth1Block, cache *freecache.Cache) (bulkData *types.BulkMutations, bulkMetadataUpdates *types.BulkMutations, err error)

func (bigtable *Bigtable) blockKeysMutation(blockNumber uint64, blockHash []byte, keys string) (string, *gcp_bigtable.Mutation) {
mut := gcp_bigtable.NewMutation()
mut.Set(METADATA_UPDATES_FAMILY_BLOCKS, "keys", gcp_bigtable.Now(), []byte(keys))

key := fmt.Sprintf("%s:BLOCK:%s:%x", bigtable.chainId, reversedPaddedBlockNumber(blockNumber), blockHash)
return key, mut
}

func (bigtable *Bigtable) IndexBlocksWithTransformers(blocks []*types.Eth1Block, transforms []TransformFunc, cache *freecache.Cache) error {
bulkMutsData := types.BulkMutations{}
bulkMutsMetadataUpdate := types.BulkMutations{}
for _, block := range blocks {
for _, transform := range transforms {
mutsData, mutsMetadataUpdate, err := transform(block, cache)
if err != nil {
logrus.WithError(err).Errorf("error transforming block [%v]", block.Number)
}
bulkMutsData.Keys = append(bulkMutsData.Keys, mutsData.Keys...)
bulkMutsData.Muts = append(bulkMutsData.Muts, mutsData.Muts...)

if mutsMetadataUpdate != nil {
bulkMutsMetadataUpdate.Keys = append(bulkMutsMetadataUpdate.Keys, mutsMetadataUpdate.Keys...)
bulkMutsMetadataUpdate.Muts = append(bulkMutsMetadataUpdate.Muts, mutsMetadataUpdate.Muts...)
}

if len(mutsData.Keys) > 0 {
metaKeys := strings.Join(bulkMutsData.Keys, ",") // save block keys in order to be able to handle chain reorgs
key, mut := bigtable.blockKeysMutation(block.Number, block.Hash, metaKeys)
bulkMutsMetadataUpdate.Keys = append(bulkMutsMetadataUpdate.Keys, key)
bulkMutsMetadataUpdate.Muts = append(bulkMutsMetadataUpdate.Muts, mut)
}
}
}

if len(bulkMutsData.Keys) > 0 {
err := bigtable.WriteBulk(&bulkMutsData, bigtable.tableData, DEFAULT_BATCH_INSERTS)
if err != nil {
return fmt.Errorf("error writing blocks [%v-%v] to bigtable data table: %w", blocks[0].Number, blocks[len(blocks)-1].Number, err)
}
}

if len(bulkMutsMetadataUpdate.Keys) > 0 {
err := bigtable.WriteBulk(&bulkMutsMetadataUpdate, bigtable.tableMetadataUpdates, DEFAULT_BATCH_INSERTS)
if err != nil {
return fmt.Errorf("error writing blocks [%v-%v] to bigtable metadata updates table: %w", blocks[0].Number, blocks[len(blocks)-1].Number, err)
}
}

return nil
}

// TransformBlock extracts blocks from bigtable more specifically from the table blocks.
// It transforms the block and strips any information that is not necessary for a blocks view
// It writes blocks to table data:
Expand Down
2 changes: 1 addition & 1 deletion db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestTxRevertTransformer(t *testing.T) {
if err := bt.SaveBlock(block); err != nil {
t.Fatal(err)
}
transformers := []func(blk *types.Eth1Block, cache *freecache.Cache) (bulkData *types.BulkMutations, bulkMetadataUpdates *types.BulkMutations, err error){
transformers := []TransformFunc{
bt.TransformItx,
bt.TransformTx,
}
Expand Down
Loading

0 comments on commit 357057a

Please sign in to comment.