diff --git a/cmd/misc/main.go b/cmd/misc/main.go index 941bd6ee0b..6d3d955d88 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -439,6 +439,8 @@ func main() { err = disableUserPerEmail() case "fix-epochs": err = fixEpochs() + case "fix-internal-txs-from-node": + fixInternalTxsFromNode(opts.StartBlock, opts.EndBlock, opts.BatchSize, opts.DataConcurrency, bt) case "validate-firebase-tokens": err = validateFirebaseTokens() default: @@ -543,6 +545,52 @@ func disableUserPerEmail() error { return nil } +func fixInternalTxsFromNode(startBlock, endBlock, batchSize, concurrency uint64, bt *db.Bigtable) { + if endBlock > 0 && endBlock < startBlock { + utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", endBlock, startBlock), 0) + return + } + + if concurrency == 0 { + utils.LogError(nil, "concurrency must be greater than 0", 0) + return + } + if bt == nil { + utils.LogError(nil, "no bigtable provided", 0) + return + } + + transformers := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0) + transformers = append(transformers, bt.TransformBlock, bt.TransformTx, bt.TransformItx) + + to := endBlock + if endBlock == math.MaxInt64 { + lastBlockFromBlocksTable, err := bt.GetLastBlockInBlocksTable() + if err != nil { + utils.LogError(err, "error retrieving last blocks from blocks table", 0) + return + } + + to = uint64(lastBlockFromBlocksTable) + } + + cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit + blockCount := utilMath.MaxU64(1, batchSize) + + logrus.Infof("Starting to reindex all txs for blocks ranging from %d to %d", startBlock, to) + for from := startBlock; from <= to; from = from + blockCount { + toBlock := utilMath.MinU64(to, from+blockCount-1) + + logrus.Infof("reindexing txs for blocks from height %v to %v in data table ...", from, toBlock) + err := bt.ReindexITxsFromNode(int64(from), int64(toBlock), int64(batchSize), int64(concurrency), transformers, cache) + if err != nil { + utils.LogError(err, "error indexing from bigtable", 0) + } + cache.Clear() + + } +} + func fixEns(erigonClient *rpc.ErigonClient) error { logrus.WithField("dry", opts.DryRun).Infof("command: fix-ens") addrs := []struct { diff --git a/db/bigtable_eth1.go b/db/bigtable_eth1.go index 8933ca40a1..8bc5ce8292 100644 --- a/db/bigtable_eth1.go +++ b/db/bigtable_eth1.go @@ -35,7 +35,6 @@ import ( "github.com/ethereum/go-ethereum/common/math" eth_types "github.com/ethereum/go-ethereum/core/types" "github.com/go-redis/redis/v8" - "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" ) @@ -4924,3 +4923,95 @@ func (bigtable *Bigtable) GetGasNowHistory(ts, pastTs time.Time) ([]types.GasNow } return history, nil } + +func (bigtable *Bigtable) ReindexITxsFromNode(start, end, batchSize, concurrency int64, transforms []func(blk *types.Eth1Block, cache *freecache.Cache) (bulkData *types.BulkMutations, bulkMetadataUpdates *types.BulkMutations, err error), cache *freecache.Cache) error { + g := new(errgroup.Group) + g.SetLimit(int(concurrency)) + + if start == 0 && end == 0 { + return fmt.Errorf("start or end block height can't be 0") + } + + if end < start { + return fmt.Errorf("end block must be grater or equal to start block") + } + + logrus.Infof("reindexing txs for blocks from %d to %d", start, end) + + for i := start; i <= end; i += batchSize { + firstBlock := i + lastBlock := firstBlock + batchSize - 1 + if lastBlock > end { + lastBlock = end + } + + blockNumbers := make([]int64, 0, lastBlock-firstBlock+1) + for b := firstBlock; b <= lastBlock; b++ { + blockNumbers = append(blockNumbers, b) + } + + g.Go(func() error { + blocks, err := rpc.CurrentErigonClient.GetBlocksByBatch(blockNumbers) + if err != nil { + return fmt.Errorf("error getting blocks by batch from %v to %v: %v", firstBlock, lastBlock, err) + } + + subG := new(errgroup.Group) + subG.SetLimit(int(concurrency)) + + for _, block := range blocks { + currentBlock := block + subG.Go(func() error { + bulkMutsData := types.BulkMutations{} + bulkMutsMetadataUpdate := types.BulkMutations{} + for _, transform := range transforms { + mutsData, mutsMetadataUpdate, err := transform(currentBlock, cache) + if err != nil { + logrus.WithError(err).Errorf("error transforming block [%v]", currentBlock.Number) + } + bulkMutsData.Keys = append(bulkMutsData.Keys, mutsData.Keys...) + bulkMutsData.Muts = append(bulkMutsData.Muts, mutsData.Muts...) + + if mutsMetadataUpdate != nil { + bulkMutsMetadataUpdate.Keys = append(bulkMutsMetadataUpdate.Keys, mutsMetadataUpdate.Keys...) + bulkMutsMetadataUpdate.Muts = append(bulkMutsMetadataUpdate.Muts, mutsMetadataUpdate.Muts...) + } + } + + if len(bulkMutsData.Keys) > 0 { + metaKeys := strings.Join(bulkMutsData.Keys, ",") // save block keys in order to be able to handle chain reorgs + err := bigtable.SaveBlockKeys(currentBlock.Number, currentBlock.Hash, metaKeys) + if err != nil { + return fmt.Errorf("error saving block [%v] keys to bigtable metadata updates table: %w", currentBlock.Number, err) + } + + err = bigtable.WriteBulk(&bulkMutsData, bigtable.tableData, DEFAULT_BATCH_INSERTS) + if err != nil { + return fmt.Errorf("error writing block [%v] to bigtable data table: %w", currentBlock.Number, err) + } + } + + if len(bulkMutsMetadataUpdate.Keys) > 0 { + err := bigtable.WriteBulk(&bulkMutsMetadataUpdate, bigtable.tableMetadataUpdates, DEFAULT_BATCH_INSERTS) + if err != nil { + return fmt.Errorf("error writing block [%v] to bigtable metadata updates table: %w", currentBlock.Number, err) + } + } + + return nil + }) + } + return subG.Wait() + }) + + } + + if err := g.Wait(); err == nil { + logrus.Info("data table indexing completed") + } else { + utils.LogError(err, "wait group error", 0) + return err + } + + return nil +} diff --git a/rpc/erigon.go b/rpc/erigon.go index 2a70952e85..a76e865a40 100644 --- a/rpc/erigon.go +++ b/rpc/erigon.go @@ -3,8 +3,10 @@ package rpc import ( "context" "encoding/hex" + "encoding/json" "fmt" "math/big" + "strconv" "strings" "sync" "time" @@ -857,3 +859,328 @@ type Eth1InternalTransactionWithPosition struct { types.Eth1InternalTransaction txPosition int } + +type BlockResponse struct { + Hash string `json:"hash"` + ParentHash string `json:"parentHash"` + UncleHash string `json:"uncleHash"` + Coinbase string `json:"coinbase"` + Root string `json:"stateRoot"` + TxHash string `json:"transactionsHash"` + ReceiptHash string `json:"receiptsHash"` + Difficulty string `json:"difficulty"` + Number string `json:"number"` + GasLimit string `json:"gasLimit"` + GasUsed string `json:"gasUsed"` + Time string `json:"timestamp"` + Extra string `json:"extraData"` + MixDigest string `json:"mixHash"` + Bloom string `json:"logsBloom"` + Transactions []*geth_types.Transaction `json:"transactions"` + Withdrawals []*geth_types.Withdrawal `json:"withdrawals"` + BlobGasUsed *string `json:"blobGasUsed"` + ExcessBlobGas *string `json:"excessBlobGas"` + BaseFee string `json:"baseFee"` +} + +type BlockResponseWithUncles struct { + BlockResponse + Uncles []*geth_types.Block +} + +type RPCBlock struct { + Hash common.Hash `json:"hash"` + UncleHashes []common.Hash `json:"uncles"` +} + +func (client *ErigonClient) GetBlocksByBatch(blockNumbers []int64) ([]*types.Eth1Block, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + var ethBlock []*types.Eth1Block + var batchCall []geth_rpc.BatchElem + batchCallNums := 3 + + if len(blockNumbers) == 0 { + return nil, fmt.Errorf("block numbers slice is empty") + } + + for _, blockNumber := range blockNumbers { + batchCall = append(batchCall, geth_rpc.BatchElem{ + Method: "eth_getBlockByNumber", + Args: []interface{}{blockNumber, true}, + Result: new(json.RawMessage), + }) + + batchCall = append(batchCall, geth_rpc.BatchElem{ + Method: "eth_getBlockReceipts", + Args: []interface{}{blockNumber}, + Result: new([]geth_types.Receipt), + }) + + batchCall = append(batchCall, geth_rpc.BatchElem{ + Method: "trace_block", + Args: []interface{}{blockNumber}, + Result: new([]ParityTraceResult), + }) + } + + if len(batchCall) == 0 { + return ethBlock, nil + } + + err := client.rpcClient.BatchCallContext(ctx, batchCall) + if err != nil { + logger.Errorf("error while batch calling rpc for block details, error: %s", err) + return nil, err + } + + for i := 0; i < len(batchCall)/batchCallNums; i++ { + blockResult := batchCall[i*batchCallNums].Result.(*json.RawMessage) + receiptsResult := batchCall[i*batchCallNums+1].Result.(*[]geth_types.Receipt) + tracesResults := batchCall[i*batchCallNums+2].Result.(*[]ParityTraceResult) + + var head *geth_types.Header + if err := json.Unmarshal(*blockResult, &head); err != nil { + return nil, fmt.Errorf("error while unmarshaling block results to Header type, error: %v", err) + } + var body RPCBlock + if err := json.Unmarshal(*blockResult, &body); err != nil { + return nil, fmt.Errorf("error while unmarshaling block results to RPCBlock type, error: %v", err) + } + + if head.UncleHash == geth_types.EmptyUncleHash && len(body.UncleHashes) > 0 { + return nil, fmt.Errorf("server returned non-empty uncle list but block header indicates no uncles") + } + if head.UncleHash != geth_types.EmptyUncleHash && len(body.UncleHashes) == 0 { + return nil, fmt.Errorf("server returned empty uncle list but block header indicates uncles") + } + + var uncles []*geth_types.Block + if len(body.UncleHashes) > 0 { + uncles = make([]*geth_types.Block, len(body.UncleHashes)) + uncleHashes := make([]geth_rpc.BatchElem, len(body.UncleHashes)) + for i := range uncleHashes { + uncleHashes[i] = geth_rpc.BatchElem{ + Method: "eth_getUncleByBlockHashAndIndex", + Args: []interface{}{body.Hash, hexutil.EncodeUint64(uint64(i))}, + Result: &uncles[i], + } + } + if err := client.rpcClient.BatchCallContext(ctx, uncleHashes); err != nil { + return nil, fmt.Errorf("error while batch calling uncle hashes, error: %v", err) + } + + for i := range uncleHashes { + if uncleHashes[i].Error != nil { + return nil, fmt.Errorf("error in uncle hash, error: %v", uncleHashes[i].Error) + } + if uncles[i] == nil { + return nil, fmt.Errorf("got null header for uncle %d of block %x", i, body.Hash[:]) + } + } + } + + var blockResponse BlockResponse + err := json.Unmarshal(*blockResult, &blockResponse) + if err != nil { + logger.Errorf("error while unmarshalling block results to BlockResponse type: %s", err) + continue + } + + blockResp := BlockResponseWithUncles{ + BlockResponse: blockResponse, + Uncles: uncles, + } + + blockDetails := client.processBlockResult(blockResp) + client.processReceiptsAndTraces(blockDetails, *receiptsResult, *tracesResults) + ethBlock = append(ethBlock, blockDetails) + } + + return ethBlock, nil +} + +func (client *ErigonClient) processBlockResult(block BlockResponseWithUncles) *types.Eth1Block { + blockNumber, err := strconv.ParseUint(block.Number, 0, 64) + if err != nil { + logger.Errorf("error while parsing block number to uint64, error: %s", err) + } + gasLimit, err := strconv.ParseUint(block.GasLimit, 0, 64) + if err != nil { + logger.Errorf("error while parsing gas limit, block: %d, error: %s", blockNumber, err) + } + gasUsed, err := strconv.ParseUint(block.GasUsed, 0, 64) + if err != nil { + logger.Errorf("error while parsing gas used, block: %d, error: %s", blockNumber, err) + } + blockTime, err := strconv.ParseInt(block.Time, 0, 64) + if err != nil { + logger.Errorf("error while parsing block time, block: %d, error: %s", blockNumber, err) + } + + var blobGasUsed, excessBlobGas uint64 + if block.BlobGasUsed != nil { + blobGasUsedStr := *block.BlobGasUsed + blobGasUsed, err = strconv.ParseUint(blobGasUsedStr[2:], 16, 64) // remove "0x" and parse as hex + if err != nil { + logger.Errorf("error while parsing blob gas used, block: %d, error: %s", blockNumber, err) + } + } + if block.ExcessBlobGas != nil { + excessBlobGasStr := *block.ExcessBlobGas + excessBlobGas, err = strconv.ParseUint(excessBlobGasStr[2:], 16, 64) + if err != nil { + logger.Errorf("error while parsing excess blob gas, block: %d, error: %s", blockNumber, err) + } + } + + ethBlock := &types.Eth1Block{ + Hash: []byte(block.Hash), + ParentHash: []byte(block.ParentHash), + UncleHash: []byte(block.UncleHash), + Coinbase: []byte(block.Coinbase), + Root: []byte(block.Root), + TxHash: []byte(block.TxHash), + ReceiptHash: []byte(block.ReceiptHash), + Difficulty: []byte(block.Difficulty), + Number: blockNumber, + GasLimit: gasLimit, + GasUsed: gasUsed, + Time: timestamppb.New(time.Unix(blockTime, 0)), + Extra: []byte(block.Extra), + MixDigest: []byte(block.MixDigest), + Bloom: []byte(block.Bloom), + Uncles: []*types.Eth1Block{}, + Transactions: []*types.Eth1Transaction{}, + Withdrawals: []*types.Eth1Withdrawal{}, + BlobGasUsed: blobGasUsed, + ExcessBlobGas: excessBlobGas, + BaseFee: []byte(block.BaseFee), + } + + if len(block.Withdrawals) > 0 { + withdrawalsIndexed := make([]*types.Eth1Withdrawal, 0, len(block.Withdrawals)) + for _, w := range block.Withdrawals { + withdrawalsIndexed = append(withdrawalsIndexed, &types.Eth1Withdrawal{ + Index: w.Index, + ValidatorIndex: w.Validator, + Address: w.Address.Bytes(), + Amount: new(big.Int).SetUint64(w.Amount).Bytes(), + }) + } + ethBlock.Withdrawals = withdrawalsIndexed + } + + txs := block.Transactions + + for _, tx := range txs { + + var from []byte + sender, err := geth_types.Sender(geth_types.NewCancunSigner(tx.ChainId()), tx) + if err != nil { + from, _ = hex.DecodeString("abababababababababababababababababababab") + logrus.Errorf("error converting tx %v to msg: %v", tx.Hash(), err) + } else { + from = sender.Bytes() + } + + pbTx := &types.Eth1Transaction{ + Type: uint32(tx.Type()), + Nonce: tx.Nonce(), + GasPrice: tx.GasPrice().Bytes(), + MaxPriorityFeePerGas: tx.GasTipCap().Bytes(), + MaxFeePerGas: tx.GasFeeCap().Bytes(), + Gas: tx.Gas(), + Value: tx.Value().Bytes(), + Data: tx.Data(), + From: from, + ChainId: tx.ChainId().Bytes(), + AccessList: []*types.AccessList{}, + Hash: tx.Hash().Bytes(), + Itx: []*types.Eth1InternalTransaction{}, + BlobVersionedHashes: [][]byte{}, + } + + if tx.BlobGasFeeCap() != nil { + pbTx.MaxFeePerBlobGas = tx.BlobGasFeeCap().Bytes() + } + for _, h := range tx.BlobHashes() { + pbTx.BlobVersionedHashes = append(pbTx.BlobVersionedHashes, h.Bytes()) + } + + if tx.To() != nil { + pbTx.To = tx.To().Bytes() + } + + ethBlock.Transactions = append(ethBlock.Transactions, pbTx) + + } + + return ethBlock +} + +func (client *ErigonClient) processReceiptsAndTraces(ethBlock *types.Eth1Block, receipts []geth_types.Receipt, traces []ParityTraceResult) { + traceIndex := 0 + var indexedTraces []*Eth1InternalTransactionWithPosition + + for _, trace := range traces { + if trace.Type == "reward" { + continue + } + if trace.TransactionHash == "" { + continue + } + if trace.TransactionPosition >= len(ethBlock.Transactions) { + logrus.Errorf("error transaction position %v out of range", trace.TransactionPosition) + return + } + + from, to, value, traceType := trace.ConvertFields() + indexedTraces = append(indexedTraces, &Eth1InternalTransactionWithPosition{ + Eth1InternalTransaction: types.Eth1InternalTransaction{ + Type: traceType, + From: from, + To: to, + Value: value, + ErrorMsg: trace.Error, + Path: fmt.Sprint(trace.TraceAddress), + }, + txPosition: trace.TransactionPosition, + }) + } + + for txPosition, receipt := range receipts { + ethBlock.Transactions[txPosition].ContractAddress = receipt.ContractAddress[:] + ethBlock.Transactions[txPosition].CommulativeGasUsed = receipt.CumulativeGasUsed + ethBlock.Transactions[txPosition].GasUsed = receipt.GasUsed + ethBlock.Transactions[txPosition].LogsBloom = receipt.Bloom[:] + ethBlock.Transactions[txPosition].Logs = make([]*types.Eth1Log, 0, len(receipt.Logs)) + ethBlock.Transactions[txPosition].Status = receipt.Status + + if receipt.BlobGasPrice != nil { + ethBlock.Transactions[txPosition].BlobGasPrice = receipt.BlobGasPrice.Bytes() + } + ethBlock.Transactions[txPosition].BlobGasUsed = receipt.BlobGasUsed + + for _, l := range receipt.Logs { + topics := make([][]byte, 0, len(l.Topics)) + for _, t := range l.Topics { + topics = append(topics, t.Bytes()) + } + ethBlock.Transactions[txPosition].Logs = append(ethBlock.Transactions[txPosition].Logs, &types.Eth1Log{ + Address: l.Address.Bytes(), + Data: l.Data, + Removed: l.Removed, + Topics: topics, + }) + } + if len(indexedTraces) == 0 { + continue + } + for ; traceIndex < len(indexedTraces) && indexedTraces[traceIndex].txPosition == txPosition; traceIndex++ { + ethBlock.Transactions[txPosition].Itx = append(ethBlock.Transactions[txPosition].Itx, &indexedTraces[traceIndex].Eth1InternalTransaction) + } + } + +}