Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(BEDS-536): implement misc fix-internal-txs-from-node cmd with batch processing from Erigon node #2967

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
70f0627
wip: move itx status parsing logic to transformers func
Monika-Bitfly Oct 9, 2024
8f8a45e
wip: implementing fix-internal-txs misc cmd
Monika-Bitfly Oct 9, 2024
47d9361
wip: updated geth client initialisation
Monika-Bitfly Oct 10, 2024
40e5818
wip: moved blocks by batch logic to erigon.go
Monika-Bitfly Oct 10, 2024
f1c8c24
wip: updated block by number batch call
Monika-Bitfly Oct 10, 2024
6e1899b
updated blocks batch processing
Monika-Bitfly Oct 11, 2024
343dd10
feat: added transformers for bulk mutations
Monika-Bitfly Oct 11, 2024
2f61112
fix: save current block height in local variable when performing muta…
Monika-Bitfly Oct 11, 2024
8a38d51
fix: add check for start and end block flags
Monika-Bitfly Oct 11, 2024
ee94692
fix: updated erigon client
Monika-Bitfly Oct 11, 2024
650a0a6
fix: add check for batchcall size and return if it's empty
Monika-Bitfly Oct 14, 2024
3c726e3
cleanup unused code
Monika-Bitfly Oct 14, 2024
28d71a9
cleanup
Monika-Bitfly Oct 14, 2024
8e9ef04
Merge branch 'BEDS-536/fix-internal-transfer-merge' of github.com:gob…
Monika-Bitfly Oct 18, 2024
855010c
updated cmd name to fix-internal-txs-from-node
Monika-Bitfly Oct 18, 2024
869a1fe
fix: uncles, blobGasUsed & excessBlobGas parsing
Monika-Bitfly Oct 18, 2024
6bd549b
rm timings, update GetBlocksByBatch func
Monika-Bitfly Oct 22, 2024
97d162a
fix: revised ReindexITxsFromNode func logic
Monika-Bitfly Oct 22, 2024
f678b98
fix: update the logic for parsing uncles, BlobGasUsed and ExcessBlobGas
Monika-Bitfly Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip: updated block by number batch call
  • Loading branch information
Monika-Bitfly committed Oct 10, 2024
commit f1c8c24dc45c9bebcedb0aebcec580aa25885250
2 changes: 1 addition & 1 deletion cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func main() {
case "fix-epochs":
err = fixEpochs()
case "fix-internal-txs":
fixInternalTxs(opts.StartBlock, opts.EndBlock, opts.BatchSize, opts.DataConcurrency, erigonClient)
fixInternalTxs(opts.StartBlock, opts.EndBlock, opts.BatchSize, opts.DataConcurrency, bt, erigonClient)
case "validate-firebase-tokens":
err = validateFirebaseTokens()
default:
Expand Down
266 changes: 248 additions & 18 deletions rpc/erigon.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,37 @@ type Eth1InternalTransactionWithPosition struct {
txPosition int
}

type BlockResponse struct {
Hash string `json:"hash"`
ParentHash string `json:"parentHash"`
UncleHash string `json:"uncleHash"`
Coinbase string `json:"coinbase"`
Root string `json:"stateRoot"`
TxHash string `json:"transactionsHash"`
ReceiptHash string `json:"receiptsHash"`
Difficulty string `json:"difficulty"`
Number string `json:"number"`
GasLimit string `json:"gasLimit"`
GasUsed string `json:"gasUsed"`
Time string `json:"timestamp"`
Extra string `json:"extraData"`
MixDigest string `json:"mixHash"`
Bloom string `json:"logsBloom"`
Uncles []*geth_types.Block `json:"uncles"`
Transactions []*geth_types.Transaction `json:"transactions"`
Withdrawals []*geth_types.Withdrawal `json:"withdrawals"`
}

func (client *ErigonClient) GetBlocksByBatch(blocksChan chan *types.Eth1Block) error {
startTime := time.Now()
defer func() {
metrics.TaskDuration.WithLabelValues("rpc_el_get_block").Observe(time.Since(startTime).Seconds())
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// timings := &types.GetBlockTimings{}
var batchCall []geth_rpc.BatchElem

for block := range blocksChan {
Expand All @@ -835,6 +865,7 @@ func (client *ErigonClient) GetBlocksByBatch(blocksChan chan *types.Eth1Block) e
batchCall = append(batchCall, geth_rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{block.Number, true},
// Result: new(geth_types.Block),
Result: new(map[string]interface{}),
})

Expand All @@ -851,35 +882,234 @@ func (client *ErigonClient) GetBlocksByBatch(blocksChan chan *types.Eth1Block) e
})
}

headers := make(map[uint64]*geth_types.Block, len(blocksChan))
errors := make([]error, 0, len(blocksChan))

for block := range blocksChan {
result := &geth_types.Block{}
err := error(nil)
batchCall = append(batchCall, geth_rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{block.Number, true},
Result: result,
Error: err,
})
headers[block.Number] = result
errors = append(errors, err)
}

fmt.Printf("\n batchCall %v \n", batchCall)
err := client.GetRPCClient().BatchCall(batchCall)

gethClient, err := NewGethClient(utils.Config.Eth1ErigonEndpoint) // for testing only
if err != nil {
logger.Errorf("error when connecting to the client, error: %s", err)
}

err = gethClient.rpcClient.BatchCallContext(ctx, batchCall)
if err != nil {
logger.Errorf("error while batch calling rpc, error: %s", err)
}

for i, b := range batchCall {
if b.Error != nil {
logger.Errorf("error in batch call %d, error: %s", i, err)
batchCallNums := 3

for i := 0; i < len(batchCall)/batchCallNums; i++ {
blockResult := batchCall[i*batchCallNums].Result.(*geth_types.Block)
receiptsResult := batchCall[i*batchCallNums+1].Result.([]*geth_types.Receipt)
tracesResults := batchCall[i*batchCallNums+2].Result.([]*ParityTraceResult)

fmt.Printf("\n Block Results %v \n ", blockResult)

blockDetails := client.processBlockResult(*blockResult)
blockDetails = client.processReceiptsAndTraces(blockDetails, receiptsResult, tracesResults)

// blockDetails := client.processBlockResult(&blockResult)

// var blockResponse *geth_types.Block

// brJSON, err := json.Marshal(blockResult)
// if err != nil {
// logger.Errorf("error while marshaling block results, error: %s", err)
// }

// err = json.Unmarshal(brJSON, &blockResponse)
// if err != nil {
// logger.Errorf("error while unmarshaling block results, error: %s", err)
// }
// fmt.Printf("\n blockResponse %v \n ", blockResponse)

}

return nil

}

func (client *ErigonClient) processBlockResult(block geth_types.Block) *types.Eth1Block {
startTime := time.Now()
defer func() {
metrics.TaskDuration.WithLabelValues("rpc_el_process_block").Observe(time.Since(startTime).Seconds())
}()

timings := &types.GetBlockTimings{}

timings.Headers = time.Since(startTime)

ethBlock := &types.Eth1Block{
Hash: block.Hash().Bytes(),
ParentHash: block.ParentHash().Bytes(),
UncleHash: block.UncleHash().Bytes(),
Coinbase: block.Coinbase().Bytes(),
Root: block.Root().Bytes(),
TxHash: block.TxHash().Bytes(),
ReceiptHash: block.ReceiptHash().Bytes(),
Difficulty: block.Difficulty().Bytes(),
Number: block.NumberU64(),
GasLimit: block.GasLimit(),
GasUsed: block.GasUsed(),
Time: timestamppb.New(time.Unix(int64(block.Time()), 0)),
Extra: block.Extra(),
MixDigest: block.MixDigest().Bytes(),
Bloom: block.Bloom().Bytes(),
Uncles: []*types.Eth1Block{},
Transactions: []*types.Eth1Transaction{},
Withdrawals: []*types.Eth1Withdrawal{},
}
blobGasUsed := block.BlobGasUsed()
if blobGasUsed != nil {
ethBlock.BlobGasUsed = *blobGasUsed
}
excessBlobGas := block.ExcessBlobGas()
if excessBlobGas != nil {
ethBlock.ExcessBlobGas = *excessBlobGas
}

if block.BaseFee() != nil {
ethBlock.BaseFee = block.BaseFee().Bytes()
}

for _, uncle := range block.Uncles() {
pbUncle := &types.Eth1Block{
Hash: uncle.Hash().Bytes(),
ParentHash: uncle.ParentHash.Bytes(),
UncleHash: uncle.UncleHash.Bytes(),
Coinbase: uncle.Coinbase.Bytes(),
Root: uncle.Root.Bytes(),
TxHash: uncle.TxHash.Bytes(),
ReceiptHash: uncle.ReceiptHash.Bytes(),
Difficulty: uncle.Difficulty.Bytes(),
Number: uncle.Number.Uint64(),
GasLimit: uncle.GasLimit,
GasUsed: uncle.GasUsed,
Time: timestamppb.New(time.Unix(int64(uncle.Time), 0)),
Extra: uncle.Extra,
MixDigest: uncle.MixDigest.Bytes(),
Bloom: uncle.Bloom.Bytes(),
}

ethBlock.Uncles = append(ethBlock.Uncles, pbUncle)
}

if len(block.Withdrawals()) > 0 {
withdrawalsIndexed := make([]*types.Eth1Withdrawal, 0, len(block.Withdrawals()))
for _, w := range block.Withdrawals() {
withdrawalsIndexed = append(withdrawalsIndexed, &types.Eth1Withdrawal{
Index: w.Index,
ValidatorIndex: w.Validator,
Address: w.Address.Bytes(),
Amount: new(big.Int).SetUint64(w.Amount).Bytes(),
})
}
ethBlock.Withdrawals = withdrawalsIndexed
}

switch i {
case 0:
blockResults := (*b.Result.(*geth_types.Block))
fmt.Printf("\n Block: %v \n", blockResults)
// @TODO process block results
case 1:
blockReceipts := (*b.Result.(*[]interface{}))
fmt.Printf("\n Receipts: %v \n", blockReceipts)
// @TODO process block receipts
txs := block.Transactions()

case 2:
tracesResults := (*b.Result.(*[]interface{}))
fmt.Printf("\n Traces: %v \n", tracesResults)
// @TODO process block traces
for _, tx := range txs {

var from []byte
sender, err := geth_types.Sender(geth_types.NewCancunSigner(tx.ChainId()), tx)
if err != nil {
from, _ = hex.DecodeString("abababababababababababababababababababab")
logrus.Errorf("error converting tx %v to msg: %v", tx.Hash(), err)
} else {
from = sender.Bytes()
}

pbTx := &types.Eth1Transaction{
Type: uint32(tx.Type()),
Nonce: tx.Nonce(),
GasPrice: tx.GasPrice().Bytes(),
MaxPriorityFeePerGas: tx.GasTipCap().Bytes(),
MaxFeePerGas: tx.GasFeeCap().Bytes(),
Gas: tx.Gas(),
Value: tx.Value().Bytes(),
Data: tx.Data(),
From: from,
ChainId: tx.ChainId().Bytes(),
AccessList: []*types.AccessList{},
Hash: tx.Hash().Bytes(),
Itx: []*types.Eth1InternalTransaction{},
BlobVersionedHashes: [][]byte{},
}

if tx.BlobGasFeeCap() != nil {
pbTx.MaxFeePerBlobGas = tx.BlobGasFeeCap().Bytes()
}
for _, h := range tx.BlobHashes() {
pbTx.BlobVersionedHashes = append(pbTx.BlobVersionedHashes, h.Bytes())
}

if tx.To() != nil {
pbTx.To = tx.To().Bytes()
}

ethBlock.Transactions = append(ethBlock.Transactions, pbTx)

}

return nil
return ethBlock

}

// @TODO update processReceiptsAndTraces method to correctly parse traces
func (client *ErigonClient) processReceiptsAndTraces(ethBlock *types.Eth1Block, receipts []*geth_types.Receipt, traces []*ParityTraceResult) *types.Eth1Block {
traceIndex := 0
var traces []*Eth1InternalTransactionWithPosition

for txPosition, receipt := range receipts {
ethBlock.Transactions[txPosition].ContractAddress = receipt.ContractAddress[:]
ethBlock.Transactions[txPosition].CommulativeGasUsed = receipt.CumulativeGasUsed
ethBlock.Transactions[txPosition].GasUsed = receipt.GasUsed
ethBlock.Transactions[txPosition].LogsBloom = receipt.Bloom[:]
ethBlock.Transactions[txPosition].Logs = make([]*types.Eth1Log, 0, len(receipt.Logs))
ethBlock.Transactions[txPosition].Status = types.StatusType(receipt.Status)

if receipt.BlobGasPrice != nil {
ethBlock.Transactions[txPosition].BlobGasPrice = receipt.BlobGasPrice.Bytes()
}
ethBlock.Transactions[txPosition].BlobGasUsed = receipt.BlobGasUsed

for _, l := range receipt.Logs {
topics := make([][]byte, 0, len(l.Topics))
for _, t := range l.Topics {
topics = append(topics, t.Bytes())
}
ethBlock.Transactions[txPosition].Logs = append(c.Transactions[txPosition].Logs, &types.Eth1Log{
Address: l.Address.Bytes(),
Data: l.Data,
Removed: l.Removed,
Topics: topics,
})
}
if len(traces) == 0 {
continue
}
for ; traceIndex < len(traces) && traces[traceIndex].txPosition == txPosition; traceIndex++ {
ethBlock.Transactions[txPosition].Itx = append(ethBlock.Transactions[txPosition].Itx, &traces[traceIndex].Eth1InternalTransaction)
if traces[traceIndex].Reverted && ethBlock.Transactions[txPosition].Status == types.StatusType_SUCCESS {
ethBlock.Transactions[txPosition].Status = types.StatusType_PARTIAL
}
}
}

return ethBlock
}
1 change: 1 addition & 0 deletions rpc/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Client interface {

type Eth1Client interface {
GetBlock(number uint64) (*types.Eth1Block, *types.GetBlockTimings, error)
GetBlocksByBatch(blocksChan chan *types.Eth1Block) error
GetLatestEth1BlockNumber() (uint64, error)
GetChainID() *big.Int
Close()
Expand Down