Skip to content

Commit

Permalink
add receipt fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Dec 4, 2023
1 parent 71b5a40 commit 98bd59d
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 47 deletions.
87 changes: 52 additions & 35 deletions block/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

type ToEthBlock func(in *rpc.Block, logs []*rpc.LogEntry) (*pbeth.Block, map[string]bool)
func RpcToEthBlock(in *rpc.Block, logs []*rpc.LogEntry, receipts map[string]*rpc.TransactionReceipt) (*pbeth.Block, map[string]bool) {

func RpcToEthBlock(in *rpc.Block, logs []*rpc.LogEntry) (*pbeth.Block, map[string]bool) {

trx, hashesWithoutTo := toFirehoseTraces(in.Transactions, logs)
trx, hashesWithoutTo := toFirehoseTraces(in.Transactions, logs, receipts)

out := &pbeth.Block{
DetailLevel: pbeth.Block_DETAILLEVEL_BASE,
Expand Down Expand Up @@ -81,47 +79,42 @@ func toAccessList(in rpc.AccessList) []*pbeth.AccessTuple {
return out
}

func toFirehoseTraces(in *rpc.BlockTransactions, logs []*rpc.LogEntry) (traces []*pbeth.TransactionTrace, hashesWithoutTo map[string]bool) {

func toFirehoseTraces(in *rpc.BlockTransactions, logs []*rpc.LogEntry, receipt map[string]*rpc.TransactionReceipt) (traces []*pbeth.TransactionTrace, hashesWithoutTo map[string]bool) {
ordinal := uint64(0)

receipts, _ := in.Receipts()
out := make([]*pbeth.TransactionTrace, len(receipts))
transactions, _ := in.Receipts() //todo: this is confusing, Why is it not call Transactions?
out := make([]*pbeth.TransactionTrace, len(transactions))
hashesWithoutTo = make(map[string]bool)
for i := range receipts {
txHash := eth.Hash(receipts[i].Hash.Bytes()).String()
for i := range transactions {
txHash := eth.Hash(transactions[i].Hash.Bytes()).String()
var toBytes []byte
if receipts[i].To != nil {
toBytes = receipts[i].To.Bytes()
if transactions[i].To != nil {
toBytes = transactions[i].To.Bytes()
} else {
hashesWithoutTo[txHash] = true
}

receipt := receipt[transactions[i].Hash.Pretty()]

out[i] = &pbeth.TransactionTrace{
Hash: receipts[i].Hash.Bytes(),
To: toBytes,
Nonce: uint64(receipts[i].Nonce),
GasLimit: uint64(receipts[i].Gas),
GasPrice: BigIntFromEthUint256(receipts[i].GasPrice),
Input: receipts[i].Input.Bytes(),
Value: BigIntFromEthUint256(receipts[i].Value),
From: receipts[i].From.Bytes(),
Index: uint32(receipts[i].TransactionIndex),
Receipt: &pbeth.TransactionReceipt{
// Logs: , // filled below
// CumulativeGasUsed: // only available on getTransactionReceipt
// StateRoot: // only available on getTransactionReceipt
// LogsBloom: // only available on getTransactionReceipt
},
V: pbeth.NewBigInt(int64(receipts[i].V)).Bytes,
R: BigIntFromEthUint256(receipts[i].R).Bytes,
S: BigIntFromEthUint256(receipts[i].S).Bytes,
AccessList: toAccessList(receipts[i].AccessList),
Hash: transactions[i].Hash.Bytes(),
To: toBytes,
Nonce: uint64(transactions[i].Nonce),
GasLimit: uint64(transactions[i].Gas),
GasPrice: BigIntFromEthUint256(transactions[i].GasPrice),
Input: transactions[i].Input.Bytes(),
Value: BigIntFromEthUint256(transactions[i].Value),
From: transactions[i].From.Bytes(),
Index: uint32(transactions[i].TransactionIndex),
Receipt: toFirehoseReceipts(receipt),
V: pbeth.NewBigInt(int64(transactions[i].V)).Bytes,
R: BigIntFromEthUint256(transactions[i].R).Bytes,
S: BigIntFromEthUint256(transactions[i].S).Bytes,
AccessList: toAccessList(transactions[i].AccessList),
BeginOrdinal: ordinal,

// Status: // only available on getTransactionReceipt
// Type: // only available on getTransactionReceipt
// GasUsed: // only available on getTransactionReceipt
Status: pbeth.TransactionTraceStatus(receipt.Status),
Type: pbeth.TransactionTrace_Type(receipt.Type),
GasUsed: uint64(receipt.GasUsed),
// MaxFeePerGas: // not available on RPC
// MaxPriorityFeePerGas: // not available on RPC
// ReturnData: // not available on RPC
Expand Down Expand Up @@ -157,6 +150,30 @@ func toFirehoseTraces(in *rpc.BlockTransactions, logs []*rpc.LogEntry) (traces [
return out, hashesWithoutTo
}

func toFirehoseReceipts(receipt *rpc.TransactionReceipt) *pbeth.TransactionReceipt {

return &pbeth.TransactionReceipt{
StateRoot: receipt.Root,
CumulativeGasUsed: uint64(receipt.CumulativeGasUsed),
LogsBloom: receipt.LogsBloom.Bytes(),
Logs: toFirehoseLogs(receipt.Logs),
}
}

func toFirehoseLogs(logs []*rpc.LogEntry) []*pbeth.Log {
out := make([]*pbeth.Log, len(logs))
for i, log := range logs {
out[i] = &pbeth.Log{
Address: log.Address.Bytes(),
Topics: HashesToBytes(logs[i].Topics),
Data: log.Data.Bytes(),
BlockIndex: log.ToLog().BlockIndex,
Ordinal: uint64(log.LogIndex),
}
}
return out
}

func BigIntFromEthUint256(in *eth.Uint256) *pbeth.BigInt {
if in == nil {
return &pbeth.BigInt{}
Expand Down
29 changes: 29 additions & 0 deletions blockfetcher/abrone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package blockfetcher

import (
"context"
"time"

"go.uber.org/zap"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/eth-go/rpc"
"github.com/streamingfast/firehose-ethereum/block"
)

type ArbOneBlockFetcher struct {
fetcher *BlockFetcher
}

func NewArbOneBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *OptimismBlockFetcher {
fetcher := NewBlockFetcher(rpcClient, intervalBetweenFetch, latestBlockRetryInterval, block.RpcToEthBlock, logger)
return &OptimismBlockFetcher{
fetcher: fetcher,
}
}

func (f *ArbOneBlockFetcher) PollingInterval() time.Duration { return 1 * time.Second }

func (f *ArbOneBlockFetcher) Fetch(ctx context.Context, blockNum uint64) (*pbbstream.Block, error) {
return f.fetcher.Fetch(ctx, blockNum)
}
39 changes: 35 additions & 4 deletions blockfetcher/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/abourget/llerrgroup"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/eth-go/rpc"
pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2"
Expand All @@ -13,7 +14,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

type ToEthBlock func(in *rpc.Block, logs []*rpc.LogEntry) (*pbeth.Block, map[string]bool)
type ToEthBlock func(in *rpc.Block, logs []*rpc.LogEntry, receipts map[string]*rpc.TransactionReceipt) (*pbeth.Block, map[string]bool)

type BlockFetcher struct {
rpcClient *rpc.Client
Expand All @@ -37,15 +38,15 @@ func NewBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch, latestBlockRet

func (f *BlockFetcher) Fetch(ctx context.Context, blockNum uint64) (block *pbbstream.Block, err error) {
f.logger.Debug("fetching block", zap.Uint64("block_num", blockNum))
for f.latest <= blockNum {
for f.latest < blockNum {
f.latest, err = f.rpcClient.LatestBlockNum(ctx)
if err != nil {
return nil, fmt.Errorf("fetching latest block num: %w", err)
}

f.logger.Info("got latest block", zap.Uint64("latest", f.latest), zap.Uint64("block_num", blockNum))

if f.latest <= blockNum {
if f.latest < blockNum {
time.Sleep(f.latestBlockRetryInterval)
continue
}
Expand All @@ -67,13 +68,18 @@ func (f *BlockFetcher) Fetch(ctx context.Context, blockNum uint64) (block *pbbst
ToBlock: rpc.BlockNumber(blockNum),
})

receipts, err := FetchReceipts(ctx, rpcBlock, f.rpcClient)
if err != nil {
return nil, fmt.Errorf("fetching receipts for block %d %q: %w", rpcBlock.Number, rpcBlock.Hash.Pretty(), err)
}

f.lastFetchAt = time.Now()

if err != nil {
return nil, fmt.Errorf("fetching logs for block %d %q: %w", rpcBlock.Number, rpcBlock.Hash.Pretty(), err)
}

ethBlock, _ := f.toEthBlock(rpcBlock, logs)
ethBlock, _ := f.toEthBlock(rpcBlock, logs, receipts)
anyBlock, err := anypb.New(ethBlock)
if err != nil {
return nil, fmt.Errorf("create any block: %w", err)
Expand All @@ -89,3 +95,28 @@ func (f *BlockFetcher) Fetch(ctx context.Context, blockNum uint64) (block *pbbst
Payload: anyBlock,
}, nil
}

func FetchReceipts(ctx context.Context, block *rpc.Block, client *rpc.Client) (out map[string]*rpc.TransactionReceipt, err error) {
out = make(map[string]*rpc.TransactionReceipt)

eg := llerrgroup.New(10)
for _, tx := range block.Transactions.Transactions {
if eg.Stop() {
continue // short-circuit the loop if we got an error
}
eg.Go(func() error {
receipt, err := client.TransactionReceipt(ctx, tx.Hash)
if err != nil {
return fmt.Errorf("fetching receipt for tx %q: %w", tx.Hash.Pretty(), err)
}
out[tx.Hash.Pretty()] = receipt
return err
})
}

if err := eg.Wait(); err != nil {
return nil, err
}

return
}
13 changes: 10 additions & 3 deletions cmd/fireeth/tools_compare_blocks_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"os"
"strconv"

"github.com/streamingfast/firehose-ethereum/blockfetcher"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"

jd "github.com/josephburnett/jd/lib"
Expand Down Expand Up @@ -134,7 +136,12 @@ func createCompareBlocksRPCE(logger *zap.Logger) firecore.CommandExecutor {
panic(err)
}

identical, diffs := CompareFirehoseToRPC(firehoseBlock, rpcBlock, logs)
receipts, err := blockfetcher.FetchReceipts(ctx, rpcBlock, cli)
if err != nil {
panic(err)
}

identical, diffs := CompareFirehoseToRPC(firehoseBlock, rpcBlock, logs, receipts)
if !identical {
fmt.Println("different", diffs)
} else {
Expand Down Expand Up @@ -375,12 +382,12 @@ func stripFirehoseTrxReceipt(in *pbeth.TransactionReceipt) {
in.CumulativeGasUsed = 0 // only available on getTransactionReceipt
}

func CompareFirehoseToRPC(fhBlock *pbeth.Block, rpcBlock *rpc.Block, logs []*rpc.LogEntry) (isEqual bool, differences []string) {
func CompareFirehoseToRPC(fhBlock *pbeth.Block, rpcBlock *rpc.Block, logs []*rpc.LogEntry, receipts map[string]*rpc.TransactionReceipt) (isEqual bool, differences []string) {
if fhBlock == nil && rpcBlock == nil {
return true, nil
}

rpcAsPBEth, hashesWithoutTo := block.RpcToEthBlock(rpcBlock, logs)
rpcAsPBEth, hashesWithoutTo := block.RpcToEthBlock(rpcBlock, logs, receipts)
stripFirehoseBlock(fhBlock, hashesWithoutTo)

// tweak that new block for comparison
Expand Down
9 changes: 8 additions & 1 deletion cmd/fireeth/tools_compare_oneblock_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"fmt"
"os"

"github.com/streamingfast/firehose-ethereum/blockfetcher"

"github.com/klauspost/compress/zstd"
"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
Expand Down Expand Up @@ -64,7 +66,12 @@ func compareOneblockRPCE(cmd *cobra.Command, args []string) error {
return err
}

identical, diffs := CompareFirehoseToRPC(fhBlock, rpcBlock, logs)
receipts, err := blockfetcher.FetchReceipts(ctx, rpcBlock, cli)
if err != nil {
return err
}

identical, diffs := CompareFirehoseToRPC(fhBlock, rpcBlock, logs, receipts)
if !identical {
fmt.Println("different", diffs)
} else {
Expand Down
10 changes: 9 additions & 1 deletion cmd/fireeth/tools_poll_rpc_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strconv"
"time"

"github.com/streamingfast/firehose-ethereum/blockfetcher"

"github.com/spf13/cobra"
"github.com/streamingfast/eth-go/rpc"
firecore "github.com/streamingfast/firehose-core"
Expand Down Expand Up @@ -86,7 +88,13 @@ func createPollRPCBlocksE(logger *zap.Logger) firecore.CommandExecutor {
continue
}

ethBlock, _ := block.RpcToEthBlock(rpcBlock, logs)
receipts, err := blockfetcher.FetchReceipts(ctx, rpcBlock, client)
if err != nil {
delay(fmt.Errorf("fetching receipts for block %d %q: %w", rpcBlock.Number, rpcBlock.Hash.Pretty(), err))
continue
}

ethBlock, _ := block.RpcToEthBlock(rpcBlock, logs, receipts)
cnt, err := proto.Marshal(ethBlock)
if err != nil {
return fmt.Errorf("failed to proto marshal pb sol block: %w", err)
Expand Down
5 changes: 4 additions & 1 deletion devel/merged-blocks-compare/compare-merged-blocks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ function process_block_range() {
local stop_block="$3"

local output_file="$version-$start_block-$stop_block.jsonl"
local block_range="$start_block:$stop_block"

# local block_range="$start_block:$stop_block"
local block_range="$start_block"

fireeth tools print merged-blocks "gs://dfuseio-global-blocks-uscentral/arb-one/$version?project=dfuseio-global" "$block_range" -o jsonl | \
jq 'del(
.detail_level,
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ module github.com/streamingfast/firehose-ethereum

go 1.21

replace (
github.com/streamingfast/eth-go => ../eth-go
github.com/streamingfast/firehose-core => ../firehose-core
)

require (
github.com/RoaringBitmap/roaring v0.9.4
github.com/bobg/go-generics/v2 v2.1.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,6 @@ github.com/streamingfast/dstore v0.1.1-0.20230620124109-3924b3b36c77 h1:u7FWLqz3
github.com/streamingfast/dstore v0.1.1-0.20230620124109-3924b3b36c77/go.mod h1:ngKU7WzHwVjOFpt2g+Wtob5mX4IvN90HYlnARcTRbmQ=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1jAL+/gBS7Bh9jyzWaTib6N47m06gZOTUPwQ=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns=
github.com/streamingfast/eth-go v0.0.0-20231123190800-9a523ad6eaf0 h1:rJLdBduXBtlpIj8WUJ3t6ONqTOMaN6/bLB96znNdN1g=
github.com/streamingfast/eth-go v0.0.0-20231123190800-9a523ad6eaf0/go.mod h1:UEm8dqibr3c3A1iIA3CHpkhN7j3X78prN7/55sXf3A0=
github.com/streamingfast/firehose v0.1.1-0.20231109192301-ebfed7417cf6 h1:hcSx7R9f1y+wWoAkJc3XBUXi2p9bYlc2dbt+mZUwdbQ=
github.com/streamingfast/firehose v0.1.1-0.20231109192301-ebfed7417cf6/go.mod h1:lGC1T6mpAAApjBQNF5COSXb3SbrYRI3dBR1f6/PZE54=
github.com/streamingfast/firehose-core v0.2.4-0.20231128190553-f5e9f304eac2 h1:/S7IY6TDiEYpKL85uxZDuIdPPkDAmyhURJitEPr4bew=
Expand Down

0 comments on commit 98bd59d

Please sign in to comment.