Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming of logs #177

Merged
7 changes: 6 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,17 @@ func (b *BlockChainAPI) GetTransactionByHash(
v, r, s := tx.RawSignatureValues()
index := uint64(rcp.TransactionIndex)

var to string
if tx.To() != nil {
to = tx.To().Hex()
}

txResult := &RPCTransaction{
Hash: txHash,
BlockHash: &rcp.BlockHash,
BlockNumber: (*hexutil.Big)(rcp.BlockNumber),
From: from.Hex(),
To: tx.To().Hex(),
To: to,
Gas: hexutil.Uint64(rcp.GasUsed),
GasPrice: (*hexutil.Big)(rcp.EffectiveGasPrice),
Input: tx.Data(),
Expand Down
43 changes: 40 additions & 3 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/onflow/flow-evm-gateway/services/logs"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
Expand All @@ -30,6 +32,7 @@ type StreamAPI struct {
accounts storage.AccountIndexer
blocksBroadcaster *engine.Broadcaster
transactionsBroadcaster *engine.Broadcaster
logsBroadcaster *engine.Broadcaster
}

func NewStreamAPI(
Expand All @@ -41,6 +44,7 @@ func NewStreamAPI(
accounts storage.AccountIndexer,
blocksBroadcaster *engine.Broadcaster,
transactionsBroadcaster *engine.Broadcaster,
logsBroadcaster *engine.Broadcaster,
) *StreamAPI {
return &StreamAPI{
logger: logger,
Expand All @@ -51,6 +55,7 @@ func NewStreamAPI(
accounts: accounts,
blocksBroadcaster: blocksBroadcaster,
transactionsBroadcaster: transactionsBroadcaster,
logsBroadcaster: logsBroadcaster,
}
}

Expand Down Expand Up @@ -86,7 +91,7 @@ func (s *StreamAPI) newSubscription(
s.config.StreamTimeout,
s.config.StreamLimit,
sub,
).Stream(ctx)
).Stream(context.Background()) // todo investigate why the passed in context is canceled so quickly
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this happening during the E2E tests? Or even in local environment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During tests yeah, didn't yet test otherwise. I'm not sure how the timeout is defined for tests, it might be that we fix the requests.


go func() {
for {
Expand Down Expand Up @@ -140,7 +145,7 @@ func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
Number: hexutil.Uint64(block.Height),
ParentHash: block.ParentBlockHash,
ReceiptsRoot: block.ReceiptRoot,
Transactions: block.TransactionHashes,}, nil
Transactions: block.TransactionHashes}, nil
},
)
}
Expand Down Expand Up @@ -186,12 +191,17 @@ func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*

v, r, ss := tx.RawSignatureValues()

var to string
if tx.To() != nil {
to = tx.To().Hex()
}

return &RPCTransaction{
Hash: h,
BlockHash: &rcp.BlockHash,
BlockNumber: (*hexutil.Big)(rcp.BlockNumber),
From: from.Hex(),
To: tx.To().Hex(),
To: to,
Gas: hexutil.Uint64(rcp.GasUsed),
GasPrice: (*hexutil.Big)(rcp.EffectiveGasPrice),
Input: tx.Data(),
Expand All @@ -205,3 +215,30 @@ func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*
},
)
}

// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (*rpc.Subscription, error) {

return s.newSubscription(
ctx,
s.logsBroadcaster,
func(ctx context.Context, height uint64) (interface{}, error) {
block, err := s.blocks.GetByHeight(height)
if err != nil {
return nil, fmt.Errorf("failed to get block at height: %d: %w", height, err)
}

id, err := block.Hash()
if err != nil {
return nil, err
}

// convert from the API type
f := logs.FilterCriteria{
Addresses: criteria.Addresses,
Topics: criteria.Topics,
Comment on lines +209 to +210
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to validate these or is that already done by go-ethereum?

}
return logs.NewIDFilter(id, f, s.blocks, s.receipts).Match()
},
)
}
7 changes: 7 additions & 0 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func Start(ctx context.Context, cfg *config.Config) error {

blocksBroadcaster := broadcast.NewBroadcaster()
transactionsBroadcaster := broadcast.NewBroadcaster()
logsBroadcaster := broadcast.NewBroadcaster()

// if database is not initialized require init height
if _, err := blocks.LatestCadenceHeight(); errors.Is(err, storageErrs.ErrNotInitialized) {
Expand All @@ -56,6 +57,7 @@ func Start(ctx context.Context, cfg *config.Config) error {
accounts,
blocksBroadcaster,
transactionsBroadcaster,
logsBroadcaster,
logger,
)
if err != nil {
Expand All @@ -73,6 +75,7 @@ func Start(ctx context.Context, cfg *config.Config) error {
accounts,
blocksBroadcaster,
transactionsBroadcaster,
logsBroadcaster,
logger,
)
if err != nil {
Expand All @@ -91,6 +94,7 @@ func startIngestion(
accounts storage.AccountIndexer,
blocksBroadcaster *broadcast.Broadcaster,
transactionsBroadcaster *broadcast.Broadcaster,
logsBroadcaster *broadcast.Broadcaster,
logger zerolog.Logger,
) error {
logger.Info().Msg("starting up event ingestion")
Expand Down Expand Up @@ -131,6 +135,7 @@ func startIngestion(
accounts,
blocksBroadcaster,
transactionsBroadcaster,
logsBroadcaster,
logger,
)
const retries = 15
Expand Down Expand Up @@ -159,6 +164,7 @@ func startServer(
accounts storage.AccountIndexer,
blocksBroadcaster *broadcast.Broadcaster,
transactionsBroadcaster *broadcast.Broadcaster,
logsBroadcaster *broadcast.Broadcaster,
logger zerolog.Logger,
) error {
l := logger.With().Str("component", "API").Logger()
Expand Down Expand Up @@ -215,6 +221,7 @@ func startServer(
accounts,
blocksBroadcaster,
transactionsBroadcaster,
logsBroadcaster,
)

supportedAPIs := api.SupportedAPIs(blockchainAPI, streamAPI)
Expand Down
144 changes: 126 additions & 18 deletions integration/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ func TestE2E_Streaming(t *testing.T) {
COAAddress: gwAddress,
COAKey: gwKey,
CreateCOAResource: true,
GasPrice: new(big.Int).SetUint64(1),
GasPrice: new(big.Int).SetUint64(0),
LogWriter: os.Stdout,
LogLevel: zerolog.DebugLevel,
StreamLimit: 5,
Expand Down Expand Up @@ -1003,7 +1003,7 @@ func TestE2E_Streaming(t *testing.T) {
// connect and subscribe to new blocks
blkWrite, blkRead, err := rpcTester.wsConnect()
require.NoError(t, err)
err = blkWrite(newHeadsRequest())
err = blkWrite(newHeadsSubscription())
require.NoError(t, err)

// first block stream response is for successful subscription
Expand All @@ -1013,7 +1013,7 @@ func TestE2E_Streaming(t *testing.T) {
// connect and subscribe to new transactions
txWrite, txRead, err := rpcTester.wsConnect()
require.NoError(t, err)
err = txWrite(newTransactionsRequest())
err = txWrite(newTransactionsSubscription())
require.NoError(t, err)

// first block stream response is for successful subscription
Expand Down Expand Up @@ -1044,15 +1044,15 @@ func TestE2E_Streaming(t *testing.T) {
event, err := blkRead()
require.NoError(t, err)

var msg streamMsg
require.NoError(t, json.Unmarshal(event, &msg))
var res map[string]any
require.NoError(t, json.Unmarshal(event.Params.Result, &res))

// this makes sure we receive the events in correct order
h, err := hexutil.DecodeUint64(msg.Params.Result["number"].(string))
h, err := hexutil.DecodeUint64(res["number"].(string))
require.NoError(t, err)
assert.Equal(t, currentHeight, int(h))
currentHeight++
blkSubID = msg.Params.Subscription
blkSubID = event.Params.Subscription
}

// iterate over all transactions and make sure all were received
Expand All @@ -1062,30 +1062,138 @@ func TestE2E_Streaming(t *testing.T) {
event, err := txRead()
require.NoError(t, err)

var msg streamMsg
require.NoError(t, json.Unmarshal(event, &msg))

var res map[string]string
require.NoError(t, json.Unmarshal(event.Params.Result, &res))
// this makes sure we received txs in correct order
h, err := hexutil.DecodeUint64(msg.Params.Result["blockNumber"].(string))
h, err := hexutil.DecodeUint64(res["blockNumber"])
require.NoError(t, err)
assert.Equal(t, currentHeight, int(h))
require.Equal(t, transferEOAAdress.Hex(), msg.Params.Result["to"].(string))
require.Equal(t, transferEOAAdress.Hex(), res["to"])
currentHeight++
txSubID = msg.Params.Subscription
txSubID = event.Params.Subscription
}

unsubscribe(t, blkWrite, blkRead, blkSubID)
unsubscribe(t, txWrite, txRead, txSubID)

// deploy contract for logs/events stream filtering
nonce, err := rpcTester.getNonce(fundEOAAddress)
require.NoError(t, err)

gasLimit := uint64(4700000) // arbitrarily big
eoaKey, err := crypto.HexToECDSA(fundEOARawKey)
require.NoError(t, err)

deployData, err := hex.DecodeString(testContractBinary)
require.NoError(t, err)

signed, _, err := evmSign(nil, gasLimit, eoaKey, nonce, nil, deployData)
hash, err := rpcTester.sendRawTx(signed)
require.NoError(t, err)
require.NotNil(t, hash)

time.Sleep(300 * time.Millisecond) // todo replace all sleeps with checking for receipt

rcp, err := rpcTester.getReceipt(hash.Hex())
require.NoError(t, err)
contractAddress := rcp.ContractAddress

storeContract, err := newContract(testContractBinary, testContractABI)
require.NoError(t, err)

// different subscription to logs with different filters
allLogsWrite, allLogsRead, err := rpcTester.wsConnect()
require.NoError(t, err)
err = allLogsWrite(newLogsSubscription(contractAddress.String(), ``))
require.NoError(t, err)

singleLogWrite, singleLogRead, err := rpcTester.wsConnect()
require.NoError(t, err)
topic4 := common.HexToHash("0x3")
err = singleLogWrite(newLogsSubscription(
contractAddress.String(),
fmt.Sprintf(`null, null, null, "%s"`, topic4),
))
require.NoError(t, err)

// ignore successful subscription result
_, err = allLogsRead()
require.NoError(t, err)
_, err = singleLogRead()
require.NoError(t, err)
// ignore current block
_, err = allLogsRead()
require.NoError(t, err)
_, err = singleLogRead()
require.NoError(t, err)

// example event
// [{"address":"0x35c2cd9bee2ca40f8b91c924188c5018df2984e2","topics":["0x76efea95e5da1fa661f235b2921ae1d89b99e457ec73fb88e34a1d150f95c64b","0x000000000000000000000000facf71692421039876a5bb4f10ef7a439d8ef61e","0x0000000000000000000000000000000000000000000000000000000000000005","0x0000000000000000000000000000000000000000000000000000000000000001"]

// submit transactions that emit logs
logCount := 5
sumA := big.NewInt(5)
for i := 0; i < logCount; i++ {
sumB := big.NewInt(int64(i))
callSum, err := storeContract.call("sum", sumA, sumB)
require.NoError(t, err)

nonce++
res, _, err := evmSignAndRun(emu, nil, gasLimit, eoaKey, nonce, &contractAddress, callSum)
require.NoError(t, err)
require.NoError(t, res.Error)
}

time.Sleep(500 * time.Millisecond)

currentHeight = startHeight + logCount + 1 // + 1 height for deploy of contract
for i := 0; i < logCount; i++ {
event, err := allLogsRead()
require.NoError(t, err)

var l []gethTypes.Log
assert.NoError(t, json.Unmarshal(event.Params.Result, &l))

require.Len(t, l, 1)
log := l[0]
// this makes sure we received logs in correct order
assert.Equal(t, uint64(currentHeight), log.BlockNumber)
assert.Equal(t, contractAddress.Hex(), log.Address.Hex())
assert.Len(t, log.Topics, 4)
assert.Equal(t, common.BigToHash(sumA), log.Topics[2])
sumB := big.NewInt(int64(i))
assert.Equal(t, common.BigToHash(sumB), log.Topics[3])

currentHeight++
}

// todo remove this once the change in broadcaster is made to ignore null results
singleLogRead()
singleLogRead()
singleLogRead()

event, err := singleLogRead()
require.NoError(t, err)

var l []gethTypes.Log
assert.NoError(t, json.Unmarshal(event.Params.Result, &l))
require.Len(t, l, 1)
log := l[0]
fmt.Println(log)

assert.Equal(t, uint64(startHeight+logCount+1+3), log.BlockNumber)
assert.Equal(t, contractAddress.Hex(), log.Address.Hex())
assert.Len(t, log.Topics, 4)
assert.Equal(t, common.BigToHash(sumA), log.Topics[2])
assert.Equal(t, topic4, log.Topics[3])
}

func unsubscribe(t *testing.T, write func(string) error, read func() ([]byte, error), id string) {
func unsubscribe(t *testing.T, write func(string) error, read func() (*streamMsg, error), id string) {
require.NotEmpty(t, id)
require.NoError(t, write(unsubscribeRequest(id)))
res, err := read()
event, err := read()
require.NoError(t, err)
var u map[string]any
require.NoError(t, json.Unmarshal(res, &u))
require.True(t, u["result"].(bool)) // successfully unsubscribed
require.True(t, event.Result.(bool)) // successfully unsubscribed
}

// checkSumLogValue makes sure the match is correct by checking sum value
Expand Down
Loading