From 3b0e184f019058892b33546ddb685317a1be683f Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Fri, 22 Mar 2024 17:05:28 +0100 Subject: [PATCH 01/19] add log broadcast --- services/ingestion/engine.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index 9c478eba2..271d28104 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -26,6 +26,7 @@ type Engine struct { status *models.EngineStatus blocksBroadcaster *engine.Broadcaster transactionsBroadcaster *engine.Broadcaster + logsBroadcaster *engine.Broadcaster } func NewEventIngestionEngine( @@ -36,6 +37,7 @@ func NewEventIngestionEngine( accounts storage.AccountIndexer, blocksBroadcaster *engine.Broadcaster, transactionsBroadcaster *engine.Broadcaster, + logsBroadcaster *engine.Broadcaster, log zerolog.Logger, ) *Engine { log = log.With().Str("component", "ingestion").Logger() @@ -50,6 +52,7 @@ func NewEventIngestionEngine( status: models.NewEngineStatus(), blocksBroadcaster: blocksBroadcaster, transactionsBroadcaster: transactionsBroadcaster, + logsBroadcaster: logsBroadcaster, } } @@ -228,5 +231,11 @@ func (e *Engine) processTransactionEvent(event cadence.Event) error { } e.transactionsBroadcaster.Publish() + + // only notify if we have new logs + if len(receipt.Logs) > 0 { + e.logsBroadcaster.Publish() + } + return nil } From e89d81abd010f5b2d1fb4bd368a58f34d63a8a8b Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Fri, 22 Mar 2024 17:12:00 +0100 Subject: [PATCH 02/19] add logs stream --- api/stream.go | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/api/stream.go b/api/stream.go index 7e9999f0e..fce4baac1 100644 --- a/api/stream.go +++ b/api/stream.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/onflow/flow-evm-gateway/services/logs" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rpc" @@ -30,6 +31,7 @@ type StreamAPI struct { accounts storage.AccountIndexer blocksBroadcaster *engine.Broadcaster transactionsBroadcaster *engine.Broadcaster + logsBroadcaster *engine.Broadcaster } func NewStreamAPI( @@ -41,6 +43,7 @@ func NewStreamAPI( accounts storage.AccountIndexer, blocksBroadcaster *engine.Broadcaster, transactionsBroadcaster *engine.Broadcaster, + logsBroadcaster *engine.Broadcaster, ) *StreamAPI { return &StreamAPI{ logger: logger, @@ -51,6 +54,7 @@ func NewStreamAPI( accounts: accounts, blocksBroadcaster: blocksBroadcaster, transactionsBroadcaster: transactionsBroadcaster, + logsBroadcaster: logsBroadcaster, } } @@ -140,7 +144,7 @@ func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { Number: hexutil.Uint64(block.Height), ParentHash: block.ParentBlockHash, ReceiptsRoot: block.ReceiptRoot, - Transactions: block.TransactionHashes,}, nil + Transactions: block.TransactionHashes}, nil }, ) } @@ -205,3 +209,25 @@ func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (* }, ) } + +// Logs creates a subscription that fires for all new log that match the given filter criteria. +func (s *StreamAPI) Logs(ctx context.Context, criteria logs.FilterCriteria) (*rpc.Subscription, error) { + + return s.newSubscription( + ctx, + s.logsBroadcaster, + func(ctx context.Context, height uint64) (interface{}, error) { + block, err := s.blocks.GetByHeight(height) + if err != nil { + return nil, fmt.Errorf("failed to get block at height: %d: %w", height, err) + } + + id, err := block.Hash() + if err != nil { + return nil, err + } + + return logs.NewIDFilter(id, criteria, s.blocks, s.receipts).Match() + }, + ) +} From 6460f2e9eca8ae4009f8a5b72ac088f6b72ebe9a Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Fri, 22 Mar 2024 17:17:09 +0100 Subject: [PATCH 03/19] update api change --- bootstrap/bootstrap.go | 7 +++++++ integration/helpers.go | 2 ++ 2 files changed, 9 insertions(+) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 1c9c7d942..94c2438f0 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -37,6 +37,7 @@ func Start(ctx context.Context, cfg *config.Config) error { blocksBroadcaster := broadcast.NewBroadcaster() transactionsBroadcaster := broadcast.NewBroadcaster() + logsBroadcaster := broadcast.NewBroadcaster() // if database is not initialized require init height if _, err := blocks.LatestCadenceHeight(); errors.Is(err, storageErrs.ErrNotInitialized) { @@ -56,6 +57,7 @@ func Start(ctx context.Context, cfg *config.Config) error { accounts, blocksBroadcaster, transactionsBroadcaster, + logsBroadcaster, logger, ) if err != nil { @@ -73,6 +75,7 @@ func Start(ctx context.Context, cfg *config.Config) error { accounts, blocksBroadcaster, transactionsBroadcaster, + logsBroadcaster, logger, ) if err != nil { @@ -91,6 +94,7 @@ func startIngestion( accounts storage.AccountIndexer, blocksBroadcaster *broadcast.Broadcaster, transactionsBroadcaster *broadcast.Broadcaster, + logsBroadcaster *broadcast.Broadcaster, logger zerolog.Logger, ) error { logger.Info().Msg("starting up event ingestion") @@ -131,6 +135,7 @@ func startIngestion( accounts, blocksBroadcaster, transactionsBroadcaster, + logsBroadcaster, logger, ) const retries = 15 @@ -159,6 +164,7 @@ func startServer( accounts storage.AccountIndexer, blocksBroadcaster *broadcast.Broadcaster, transactionsBroadcaster *broadcast.Broadcaster, + logsBroadcaster *broadcast.Broadcaster, logger zerolog.Logger, ) error { l := logger.With().Str("component", "API").Logger() @@ -215,6 +221,7 @@ func startServer( accounts, blocksBroadcaster, transactionsBroadcaster, + logsBroadcaster, ) supportedAPIs := api.SupportedAPIs(blockchainAPI, streamAPI) diff --git a/integration/helpers.go b/integration/helpers.go index 973054322..d52eb2227 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -117,6 +117,7 @@ func startEventIngestionEngine(ctx context.Context, dbDir string) ( txs := pebble.NewTransactions(db) blocksBroadcaster := broadcast.NewBroadcaster() txBroadcaster := broadcast.NewBroadcaster() + logsBroadcaster := broadcast.NewBroadcaster() err = blocks.InitHeights(config.EmulatorInitCadenceHeight) if err != nil { @@ -132,6 +133,7 @@ func startEventIngestionEngine(ctx context.Context, dbDir string) ( accounts, blocksBroadcaster, txBroadcaster, + logsBroadcaster, log, ) From f899e468838d77ef7f312ef270acdcd1c4d421d4 Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Fri, 22 Mar 2024 18:01:25 +0100 Subject: [PATCH 04/19] add log stream test --- integration/e2e_test.go | 90 +++++++++++++++++++++++++++++++++-------- 1 file changed, 73 insertions(+), 17 deletions(-) diff --git a/integration/e2e_test.go b/integration/e2e_test.go index 767f201d8..d61d60842 100644 --- a/integration/e2e_test.go +++ b/integration/e2e_test.go @@ -1003,7 +1003,7 @@ func TestE2E_Streaming(t *testing.T) { // connect and subscribe to new blocks blkWrite, blkRead, err := rpcTester.wsConnect() require.NoError(t, err) - err = blkWrite(newHeadsRequest()) + err = blkWrite(newHeadsSubscription()) require.NoError(t, err) // first block stream response is for successful subscription @@ -1013,7 +1013,7 @@ func TestE2E_Streaming(t *testing.T) { // connect and subscribe to new transactions txWrite, txRead, err := rpcTester.wsConnect() require.NoError(t, err) - err = txWrite(newTransactionsRequest()) + err = txWrite(newTransactionsSubscription()) require.NoError(t, err) // first block stream response is for successful subscription @@ -1044,15 +1044,15 @@ func TestE2E_Streaming(t *testing.T) { event, err := blkRead() require.NoError(t, err) - var msg streamMsg - require.NoError(t, json.Unmarshal(event, &msg)) + var res map[string]string + require.NoError(t, json.Unmarshal(event.Params.Result, &res)) // this makes sure we receive the events in correct order - h, err := hexutil.DecodeUint64(msg.Params.Result["number"].(string)) + h, err := hexutil.DecodeUint64(res["number"]) require.NoError(t, err) assert.Equal(t, currentHeight, int(h)) currentHeight++ - blkSubID = msg.Params.Subscription + blkSubID = event.Params.Subscription } // iterate over all transactions and make sure all were received @@ -1062,30 +1062,86 @@ func TestE2E_Streaming(t *testing.T) { event, err := txRead() require.NoError(t, err) - var msg streamMsg - require.NoError(t, json.Unmarshal(event, &msg)) - + var res map[string]string + require.NoError(t, json.Unmarshal(event.Params.Result, &res)) // this makes sure we received txs in correct order - h, err := hexutil.DecodeUint64(msg.Params.Result["blockNumber"].(string)) + h, err := hexutil.DecodeUint64(res["blockNumber"]) require.NoError(t, err) assert.Equal(t, currentHeight, int(h)) - require.Equal(t, transferEOAAdress.Hex(), msg.Params.Result["to"].(string)) + require.Equal(t, transferEOAAdress.Hex(), res["to"]) currentHeight++ - txSubID = msg.Params.Subscription + txSubID = event.Params.Subscription } unsubscribe(t, blkWrite, blkRead, blkSubID) unsubscribe(t, txWrite, txRead, txSubID) + + // deploy contract for logs/events stream filtering + nonce := uint64(0) + gasLimit := uint64(4700000) // arbitrarily big + eoaKey, err := crypto.HexToECDSA(fundEOARawKey) + require.NoError(t, err) + + deployData, err := hex.DecodeString(testContractBinary) + require.NoError(t, err) + + signed, _, err := evmSign(nil, gasLimit, eoaKey, nonce, nil, deployData) + hash, err := rpcTester.sendRawTx(signed) + require.NoError(t, err) + require.NotNil(t, hash) + + rcp, err := rpcTester.getReceipt(hash.Hex()) + require.NoError(t, err) + contractAddress := rcp.ContractAddress + + storeContract, err := newContract(testContractBinary, testContractABI) + require.NoError(t, err) + + // different subscription to logs with different filters + allLogsWrite, allLogsRead, err := rpcTester.wsConnect() + require.NoError(t, err) + err = allLogsWrite(newLogsSubscription(contractAddress.String(), []string{})) + require.NoError(t, err) + + _, _ = allLogsRead() // ignore current block todo - we need to skip transmissions if no data, there is a PR merged in master that updates subscriber we need to update to + + // submit transactions that emit logs + logCount := 5 + for i := 0; i < logCount; i++ { + sumA := big.NewInt(5) + sumB := big.NewInt(int64(i)) + callSum, err := storeContract.call("sum", sumA, sumB) + require.NoError(t, err) + + nonce++ + res, _, err := evmSignAndRun(emu, nil, gasLimit, eoaKey, nonce, &contractAddress, callSum) + require.NoError(t, err) + require.NoError(t, res.Error) + time.Sleep(300 * time.Millisecond) + } + + for i := 0; i < logCount; i++ { + event, err := allLogsRead() + require.NoError(t, err) + + var res map[string]string + require.NoError(t, json.Unmarshal(event.Params.Result, &res)) + h, err := hexutil.DecodeUint64(res["blockNumber"]) + require.NoError(t, err) + assert.Equal(t, currentHeight, int(h)) + require.Equal(t, contractAddress.String(), res["address"]) + } } -func unsubscribe(t *testing.T, write func(string) error, read func() ([]byte, error), id string) { +func unsubscribe(t *testing.T, write func(string) error, read func() (streamMsg, error), id string) { require.NotEmpty(t, id) require.NoError(t, write(unsubscribeRequest(id))) - res, err := read() + event, err := read() require.NoError(t, err) - var u map[string]any - require.NoError(t, json.Unmarshal(res, &u)) - require.True(t, u["result"].(bool)) // successfully unsubscribed + + var u map[string]bool + require.NoError(t, json.Unmarshal(event.Params.Result, &u)) + require.True(t, u["result"]) // successfully unsubscribed } // checkSumLogValue makes sure the match is correct by checking sum value From 1f3f0fca64fab2c741eeeeff355e79580c492488 Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Fri, 22 Mar 2024 18:01:32 +0100 Subject: [PATCH 05/19] update stream response type --- integration/helpers.go | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/integration/helpers.go b/integration/helpers.go index d52eb2227..2b6991a29 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -676,7 +676,7 @@ func (r *rpcTest) getCode(from common.Address) ([]byte, error) { // wsConnect creates a new websocket connection and returns a write and read function // that can be used to easily write to the stream as well as read the next data. -func (r *rpcTest) wsConnect() (func(string) error, func() ([]byte, error), error) { +func (r *rpcTest) wsConnect() (func(string) error, func() (*streamMsg, error), error) { u := url.URL{Scheme: "ws", Host: r.url, Path: "/"} c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) if err != nil { @@ -688,26 +688,43 @@ func (r *rpcTest) wsConnect() (func(string) error, func() ([]byte, error), error return c.WriteMessage(websocket.TextMessage, []byte(req)) } - read := func() ([]byte, error) { + read := func() (*streamMsg, error) { _, message, err := c.ReadMessage() if err != nil { return nil, err } log.Println("<-- ws: ", string(message)) - return message, nil + + var msg streamMsg + err = json.Unmarshal(message, &msg) + if err != nil { + return nil, err + } + return &msg, nil } return write, read, nil } -func newHeadsRequest() string { +func newHeadsSubscription() string { return `{"jsonrpc":"2.0","id":0,"method":"eth_subscribe","params":["newHeads"]}` } -func newTransactionsRequest() string { +func newTransactionsSubscription() string { return `{"jsonrpc":"2.0","id":0,"method":"eth_subscribe","params":["newPendingTransactions"]}` } +func newLogsSubscription(address string, topics []string) string { + return fmt.Sprintf(` + { + "jsonrpc": "2.0", + "id": 0, + "method": "eth_subscribe", + "params": ["logs", {"address":"%s","topics": [%s]}] + } + `, address, strings.Join(topics, ",")) +} + func unsubscribeRequest(id string) string { return fmt.Sprintf(`{"jsonrpc":"2.0","id":0,"method":"eth_unsubscribe","params":["%s"]}`, id) } @@ -755,8 +772,8 @@ func (b *rpcBlock) FullTransactions() []map[string]interface{} { } type streamParams struct { - Subscription string `json:"subscription"` - Result map[string]any `json:"result"` + Subscription string `json:"subscription"` + Result json.RawMessage `json:"result"` } type streamMsg struct { From 16f0f4fff76f9524caeca77f8bfd6694287d3395 Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Fri, 22 Mar 2024 18:01:53 +0100 Subject: [PATCH 06/19] typo fix --- integration/e2e_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/e2e_test.go b/integration/e2e_test.go index d61d60842..474059306 100644 --- a/integration/e2e_test.go +++ b/integration/e2e_test.go @@ -1133,7 +1133,7 @@ func TestE2E_Streaming(t *testing.T) { } } -func unsubscribe(t *testing.T, write func(string) error, read func() (streamMsg, error), id string) { +func unsubscribe(t *testing.T, write func(string) error, read func() (*streamMsg, error), id string) { require.NotEmpty(t, id) require.NoError(t, write(unsubscribeRequest(id))) event, err := read() From 1d2691186eaf0429d5b33e57639779d1fe6b0d80 Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Fri, 22 Mar 2024 19:11:29 +0100 Subject: [PATCH 07/19] logs wip --- integration/e2e_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/integration/e2e_test.go b/integration/e2e_test.go index 474059306..36841aba6 100644 --- a/integration/e2e_test.go +++ b/integration/e2e_test.go @@ -1121,15 +1121,14 @@ func TestE2E_Streaming(t *testing.T) { } for i := 0; i < logCount; i++ { + fmt.Println("getting event", i) event, err := allLogsRead() require.NoError(t, err) - var res map[string]string + var res []map[string]string require.NoError(t, json.Unmarshal(event.Params.Result, &res)) - h, err := hexutil.DecodeUint64(res["blockNumber"]) - require.NoError(t, err) - assert.Equal(t, currentHeight, int(h)) - require.Equal(t, contractAddress.String(), res["address"]) + + fmt.Println("events", res) } } From 5dba16f0475eea6b224f511c04f9df91e8f7d71f Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Fri, 22 Mar 2024 19:22:29 +0100 Subject: [PATCH 08/19] use background context --- api/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/stream.go b/api/stream.go index fce4baac1..16ede0c7b 100644 --- a/api/stream.go +++ b/api/stream.go @@ -90,7 +90,7 @@ func (s *StreamAPI) newSubscription( s.config.StreamTimeout, s.config.StreamLimit, sub, - ).Stream(ctx) + ).Stream(context.Background()) // todo investigate why the passed in context is canceled so quickly go func() { for { From 8f8ca53fd4ea938154ab07b14db6a49fc8b95198 Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Fri, 22 Mar 2024 19:36:31 +0100 Subject: [PATCH 09/19] fix if nil --- api/api.go | 7 ++++++- api/stream.go | 7 ++++++- integration/helpers.go | 3 +++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/api/api.go b/api/api.go index 6df4085df..6d8c9ae9b 100644 --- a/api/api.go +++ b/api/api.go @@ -171,12 +171,17 @@ func (b *BlockChainAPI) GetTransactionByHash( v, r, s := tx.RawSignatureValues() index := uint64(rcp.TransactionIndex) + var to string + if tx.To() != nil { + to = tx.To().Hex() + } + txResult := &RPCTransaction{ Hash: txHash, BlockHash: &rcp.BlockHash, BlockNumber: (*hexutil.Big)(rcp.BlockNumber), From: from.Hex(), - To: tx.To().Hex(), + To: to, Gas: hexutil.Uint64(rcp.GasUsed), GasPrice: (*hexutil.Big)(rcp.EffectiveGasPrice), Input: tx.Data(), diff --git a/api/stream.go b/api/stream.go index 16ede0c7b..245b469c2 100644 --- a/api/stream.go +++ b/api/stream.go @@ -190,12 +190,17 @@ func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (* v, r, ss := tx.RawSignatureValues() + var to string + if tx.To() != nil { + to = tx.To().Hex() + } + return &RPCTransaction{ Hash: h, BlockHash: &rcp.BlockHash, BlockNumber: (*hexutil.Big)(rcp.BlockNumber), From: from.Hex(), - To: tx.To().Hex(), + To: to, Gas: hexutil.Uint64(rcp.GasUsed), GasPrice: (*hexutil.Big)(rcp.EffectiveGasPrice), Input: tx.Data(), diff --git a/integration/helpers.go b/integration/helpers.go index 2b6991a29..66e114a5c 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -547,6 +547,7 @@ func (r *rpcTest) getReceipt(hash string) (*types.Receipt, error) { return nil, err } + fmt.Println("RCP", string(rpcRes)) var rcp types.Receipt err = json.Unmarshal(rpcRes, &rcp) if err != nil { @@ -780,4 +781,6 @@ type streamMsg struct { Jsonrpc string `json:"jsonrpc"` Method string `json:"method"` Params streamParams `json:"params"` + Result any `json:"result"` + ID any `json:"id"` } From a878628ef741567d67afd9f0894eb120dd2f89d0 Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Fri, 22 Mar 2024 19:38:30 +0100 Subject: [PATCH 10/19] fix test api change --- integration/e2e_test.go | 17 +++++++++-------- integration/helpers.go | 1 - 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/integration/e2e_test.go b/integration/e2e_test.go index 36841aba6..7f73d95a0 100644 --- a/integration/e2e_test.go +++ b/integration/e2e_test.go @@ -973,7 +973,7 @@ func TestE2E_Streaming(t *testing.T) { COAAddress: gwAddress, COAKey: gwKey, CreateCOAResource: true, - GasPrice: new(big.Int).SetUint64(1), + GasPrice: new(big.Int).SetUint64(0), LogWriter: os.Stdout, LogLevel: zerolog.DebugLevel, StreamLimit: 5, @@ -1044,11 +1044,11 @@ func TestE2E_Streaming(t *testing.T) { event, err := blkRead() require.NoError(t, err) - var res map[string]string + var res map[string]any require.NoError(t, json.Unmarshal(event.Params.Result, &res)) // this makes sure we receive the events in correct order - h, err := hexutil.DecodeUint64(res["number"]) + h, err := hexutil.DecodeUint64(res["number"].(string)) require.NoError(t, err) assert.Equal(t, currentHeight, int(h)) currentHeight++ @@ -1077,7 +1077,9 @@ func TestE2E_Streaming(t *testing.T) { unsubscribe(t, txWrite, txRead, txSubID) // deploy contract for logs/events stream filtering - nonce := uint64(0) + nonce, err := rpcTester.getNonce(fundEOAAddress) + require.NoError(t, err) + gasLimit := uint64(4700000) // arbitrarily big eoaKey, err := crypto.HexToECDSA(fundEOARawKey) require.NoError(t, err) @@ -1090,6 +1092,8 @@ func TestE2E_Streaming(t *testing.T) { require.NoError(t, err) require.NotNil(t, hash) + time.Sleep(300 * time.Millisecond) + rcp, err := rpcTester.getReceipt(hash.Hex()) require.NoError(t, err) contractAddress := rcp.ContractAddress @@ -1137,10 +1141,7 @@ func unsubscribe(t *testing.T, write func(string) error, read func() (*streamMsg require.NoError(t, write(unsubscribeRequest(id))) event, err := read() require.NoError(t, err) - - var u map[string]bool - require.NoError(t, json.Unmarshal(event.Params.Result, &u)) - require.True(t, u["result"]) // successfully unsubscribed + require.True(t, event.Result.(bool)) // successfully unsubscribed } // checkSumLogValue makes sure the match is correct by checking sum value diff --git a/integration/helpers.go b/integration/helpers.go index 66e114a5c..c340a6009 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -547,7 +547,6 @@ func (r *rpcTest) getReceipt(hash string) (*types.Receipt, error) { return nil, err } - fmt.Println("RCP", string(rpcRes)) var rcp types.Receipt err = json.Unmarshal(rpcRes, &rcp) if err != nil { From 16f198dd37f65f55de9325eceabbdc03d0c10f18 Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Tue, 26 Mar 2024 12:40:55 +0100 Subject: [PATCH 11/19] add wildcard test --- services/logs/filter_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/services/logs/filter_test.go b/services/logs/filter_test.go index d414fc20f..ee6985c8d 100644 --- a/services/logs/filter_test.go +++ b/services/logs/filter_test.go @@ -148,6 +148,14 @@ func TestIDFilter(t *testing.T) { expectLogs []*gethTypes.Log criteria FilterCriteria }{{ + desc: "wildcard all logs", + id: mustHash(blocks[0]), + criteria: FilterCriteria{ + Addresses: []common.Address{lgs[0].Address}, + Topics: [][]common.Hash{}, + }, + expectLogs: lgs[:2], // first two are all logs from the address + }, { desc: "single topic, single address match single log", id: mustHash(blocks[0]), criteria: FilterCriteria{ From 9d1bedb15e7ac8d4e9a8e6a8c9ae0b7080c68367 Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Tue, 26 Mar 2024 14:16:29 +0100 Subject: [PATCH 12/19] refactor tests to use models --- integration/e2e_test.go | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/integration/e2e_test.go b/integration/e2e_test.go index 7f73d95a0..0f9b4165a 100644 --- a/integration/e2e_test.go +++ b/integration/e2e_test.go @@ -1104,15 +1104,17 @@ func TestE2E_Streaming(t *testing.T) { // different subscription to logs with different filters allLogsWrite, allLogsRead, err := rpcTester.wsConnect() require.NoError(t, err) - err = allLogsWrite(newLogsSubscription(contractAddress.String(), []string{})) + err = allLogsWrite(newLogsSubscription(contractAddress.String(), ``)) require.NoError(t, err) _, _ = allLogsRead() // ignore current block todo - we need to skip transmissions if no data, there is a PR merged in master that updates subscriber we need to update to + // [{"address":"0x35c2cd9bee2ca40f8b91c924188c5018df2984e2","topics":["0x76efea95e5da1fa661f235b2921ae1d89b99e457ec73fb88e34a1d150f95c64b","0x000000000000000000000000facf71692421039876a5bb4f10ef7a439d8ef61e","0x0000000000000000000000000000000000000000000000000000000000000005","0x0000000000000000000000000000000000000000000000000000000000000001"] + // submit transactions that emit logs logCount := 5 + sumA := big.NewInt(5) for i := 0; i < logCount; i++ { - sumA := big.NewInt(5) sumB := big.NewInt(int64(i)) callSum, err := storeContract.call("sum", sumA, sumB) require.NoError(t, err) @@ -1121,18 +1123,33 @@ func TestE2E_Streaming(t *testing.T) { res, _, err := evmSignAndRun(emu, nil, gasLimit, eoaKey, nonce, &contractAddress, callSum) require.NoError(t, err) require.NoError(t, res.Error) - time.Sleep(300 * time.Millisecond) } + time.Sleep(500 * time.Millisecond) + currentHeight = startHeight + logCount + 1 // todo why one extra block - fix maybe use latest block request for i := 0; i < logCount; i++ { - fmt.Println("getting event", i) event, err := allLogsRead() require.NoError(t, err) - var res []map[string]string - require.NoError(t, json.Unmarshal(event.Params.Result, &res)) + var l []gethTypes.Log + assert.NoError(t, json.Unmarshal(event.Params.Result, &l)) - fmt.Println("events", res) + if len(l) == 0 { + fmt.Println("---------") + fmt.Println(event) // todo figure out why + fmt.Println("---------") + continue + } + + require.Len(t, l, 1) + log := l[0] + // this makes sure we received logs in correct order + assert.Equal(t, uint64(currentHeight), log.BlockNumber) + assert.Equal(t, contractAddress.Hex(), log.Address.Hex()) + assert.Len(t, log.Topics, 4) + assert.Equal(t, common.BigToHash(sumA), log.Topics[2]) + + currentHeight++ } } From e473f5ebc1264156f29d7bc7cd90a28021735d53 Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Tue, 26 Mar 2024 14:16:48 +0100 Subject: [PATCH 13/19] convert to correct type --- api/stream.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/api/stream.go b/api/stream.go index 245b469c2..be54dae05 100644 --- a/api/stream.go +++ b/api/stream.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/onflow/flow-evm-gateway/services/logs" "github.com/ethereum/go-ethereum/common/hexutil" @@ -216,7 +217,7 @@ func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (* } // Logs creates a subscription that fires for all new log that match the given filter criteria. -func (s *StreamAPI) Logs(ctx context.Context, criteria logs.FilterCriteria) (*rpc.Subscription, error) { +func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (*rpc.Subscription, error) { return s.newSubscription( ctx, @@ -232,7 +233,12 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria logs.FilterCriteria) (*rp return nil, err } - return logs.NewIDFilter(id, criteria, s.blocks, s.receipts).Match() + // convert from the API type + f := logs.FilterCriteria{ + Addresses: criteria.Addresses, + Topics: criteria.Topics, + } + return logs.NewIDFilter(id, f, s.blocks, s.receipts).Match() }, ) } From b933babbc94e28ea22cfd408e39ceb6eb3bc337f Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Tue, 26 Mar 2024 14:16:53 +0100 Subject: [PATCH 14/19] improve helpers --- integration/helpers.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration/helpers.go b/integration/helpers.go index c340a6009..c1ba72d68 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -714,15 +714,15 @@ func newTransactionsSubscription() string { return `{"jsonrpc":"2.0","id":0,"method":"eth_subscribe","params":["newPendingTransactions"]}` } -func newLogsSubscription(address string, topics []string) string { +func newLogsSubscription(address string, topics string) string { return fmt.Sprintf(` { "jsonrpc": "2.0", "id": 0, "method": "eth_subscribe", - "params": ["logs", {"address":"%s","topics": [%s]}] + "params": ["logs", {"address":"%s"}] } - `, address, strings.Join(topics, ",")) + `, address) } func unsubscribeRequest(id string) string { From 9372a42c1ea10bbbfe80d752a889d1f4e8cd0adc Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Tue, 26 Mar 2024 14:17:53 +0100 Subject: [PATCH 15/19] check value of topic --- integration/e2e_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration/e2e_test.go b/integration/e2e_test.go index 0f9b4165a..7e8c9ccfc 100644 --- a/integration/e2e_test.go +++ b/integration/e2e_test.go @@ -1148,6 +1148,8 @@ func TestE2E_Streaming(t *testing.T) { assert.Equal(t, contractAddress.Hex(), log.Address.Hex()) assert.Len(t, log.Topics, 4) assert.Equal(t, common.BigToHash(sumA), log.Topics[2]) + sumB := big.NewInt(int64(i)) + assert.Equal(t, common.BigToHash(sumB), log.Topics[3]) currentHeight++ } From 7dfc13d1c65863ea8b0cb88f5c61782dc0adc062 Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Tue, 26 Mar 2024 14:30:29 +0100 Subject: [PATCH 16/19] fix one off height --- integration/e2e_test.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/integration/e2e_test.go b/integration/e2e_test.go index 7e8c9ccfc..97137ba94 100644 --- a/integration/e2e_test.go +++ b/integration/e2e_test.go @@ -1107,7 +1107,8 @@ func TestE2E_Streaming(t *testing.T) { err = allLogsWrite(newLogsSubscription(contractAddress.String(), ``)) require.NoError(t, err) - _, _ = allLogsRead() // ignore current block todo - we need to skip transmissions if no data, there is a PR merged in master that updates subscriber we need to update to + _, _ = allLogsRead() // ignore successful subscription result + _, _ = allLogsRead() // ignore current block // [{"address":"0x35c2cd9bee2ca40f8b91c924188c5018df2984e2","topics":["0x76efea95e5da1fa661f235b2921ae1d89b99e457ec73fb88e34a1d150f95c64b","0x000000000000000000000000facf71692421039876a5bb4f10ef7a439d8ef61e","0x0000000000000000000000000000000000000000000000000000000000000005","0x0000000000000000000000000000000000000000000000000000000000000001"] @@ -1126,7 +1127,8 @@ func TestE2E_Streaming(t *testing.T) { } time.Sleep(500 * time.Millisecond) - currentHeight = startHeight + logCount + 1 // todo why one extra block - fix maybe use latest block request + + currentHeight = startHeight + logCount + 1 // + 1 height for deploy of contract for i := 0; i < logCount; i++ { event, err := allLogsRead() require.NoError(t, err) @@ -1134,13 +1136,6 @@ func TestE2E_Streaming(t *testing.T) { var l []gethTypes.Log assert.NoError(t, json.Unmarshal(event.Params.Result, &l)) - if len(l) == 0 { - fmt.Println("---------") - fmt.Println(event) // todo figure out why - fmt.Println("---------") - continue - } - require.Len(t, l, 1) log := l[0] // this makes sure we received logs in correct order From 4a8242f476d4e9ae774f7335d93efa689e35f45b Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Tue, 26 Mar 2024 14:49:45 +0100 Subject: [PATCH 17/19] add test for specific topic subs --- integration/e2e_test.go | 44 ++++++++++++++++++++++++++++++++++++++--- integration/helpers.go | 4 ++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/integration/e2e_test.go b/integration/e2e_test.go index 97137ba94..48ec0a917 100644 --- a/integration/e2e_test.go +++ b/integration/e2e_test.go @@ -1092,7 +1092,7 @@ func TestE2E_Streaming(t *testing.T) { require.NoError(t, err) require.NotNil(t, hash) - time.Sleep(300 * time.Millisecond) + time.Sleep(300 * time.Millisecond) // todo replace all sleeps with checking for receipt rcp, err := rpcTester.getReceipt(hash.Hex()) require.NoError(t, err) @@ -1107,9 +1107,27 @@ func TestE2E_Streaming(t *testing.T) { err = allLogsWrite(newLogsSubscription(contractAddress.String(), ``)) require.NoError(t, err) - _, _ = allLogsRead() // ignore successful subscription result - _, _ = allLogsRead() // ignore current block + singleLogWrite, singleLogRead, err := rpcTester.wsConnect() + require.NoError(t, err) + topic4 := common.HexToHash("0x3") + err = singleLogWrite(newLogsSubscription( + contractAddress.String(), + fmt.Sprintf(`null, null, null, "%s"`, topic4), + )) + require.NoError(t, err) + + // ignore successful subscription result + _, err = allLogsRead() + require.NoError(t, err) + _, err = singleLogRead() + require.NoError(t, err) + // ignore current block + _, err = allLogsRead() + require.NoError(t, err) + _, err = singleLogRead() + require.NoError(t, err) + // example event // [{"address":"0x35c2cd9bee2ca40f8b91c924188c5018df2984e2","topics":["0x76efea95e5da1fa661f235b2921ae1d89b99e457ec73fb88e34a1d150f95c64b","0x000000000000000000000000facf71692421039876a5bb4f10ef7a439d8ef61e","0x0000000000000000000000000000000000000000000000000000000000000005","0x0000000000000000000000000000000000000000000000000000000000000001"] // submit transactions that emit logs @@ -1148,6 +1166,26 @@ func TestE2E_Streaming(t *testing.T) { currentHeight++ } + + // todo remove this once the change in broadcaster is made to ignore null results + singleLogRead() + singleLogRead() + singleLogRead() + + event, err := singleLogRead() + require.NoError(t, err) + + var l []gethTypes.Log + assert.NoError(t, json.Unmarshal(event.Params.Result, &l)) + require.Len(t, l, 1) + log := l[0] + fmt.Println(log) + + assert.Equal(t, uint64(startHeight+logCount+1+3), log.BlockNumber) + assert.Equal(t, contractAddress.Hex(), log.Address.Hex()) + assert.Len(t, log.Topics, 4) + assert.Equal(t, common.BigToHash(sumA), log.Topics[2]) + assert.Equal(t, topic4, log.Topics[3]) } func unsubscribe(t *testing.T, write func(string) error, read func() (*streamMsg, error), id string) { diff --git a/integration/helpers.go b/integration/helpers.go index c1ba72d68..f133fe3eb 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -720,9 +720,9 @@ func newLogsSubscription(address string, topics string) string { "jsonrpc": "2.0", "id": 0, "method": "eth_subscribe", - "params": ["logs", {"address":"%s"}] + "params": ["logs", {"address":"%s", "topics": [%s]}] } - `, address) + `, address, topics) } func unsubscribeRequest(id string) string { From 92966ceb349dccd00581f88104f4e81ee1c445a1 Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Fri, 29 Mar 2024 17:52:21 +0100 Subject: [PATCH 18/19] merge update --- api/stream.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/api/stream.go b/api/stream.go index 407b059f6..8f8768233 100644 --- a/api/stream.go +++ b/api/stream.go @@ -40,7 +40,6 @@ func NewStreamAPI( blocks storage.BlockIndexer, transactions storage.TransactionIndexer, receipts storage.ReceiptIndexer, - accounts storage.AccountIndexer, blocksBroadcaster *engine.Broadcaster, transactionsBroadcaster *engine.Broadcaster, logsBroadcaster *engine.Broadcaster, @@ -51,7 +50,6 @@ func NewStreamAPI( blocks: blocks, transactions: transactions, receipts: receipts, - accounts: accounts, blocksBroadcaster: blocksBroadcaster, transactionsBroadcaster: transactionsBroadcaster, logsBroadcaster: logsBroadcaster, @@ -90,7 +88,7 @@ func (s *StreamAPI) newSubscription( s.config.StreamTimeout, s.config.StreamLimit, sub, - ).Stream(context.Background()) // todo investigate why the passed in context is canceled so quickly + ).Stream(context.Background()) // todo investigate why the passed in context is canceled so quickly go func() { for { From 4eda15dcd72ec8b47460fb3d395997056e5f5c1d Mon Sep 17 00:00:00 2001 From: Gregor Gololicic Date: Thu, 4 Apr 2024 20:54:34 +0200 Subject: [PATCH 19/19] limit addresses and topics --- api/models.go | 8 ++++++-- api/stream.go | 6 ++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/api/models.go b/api/models.go index 73f88a3fd..910bac6fb 100644 --- a/api/models.go +++ b/api/models.go @@ -11,13 +11,17 @@ import ( ) var ( - errFilterNotFound = errors.New("filter not found") - errExceedMaxTopics = errors.New("exceed max topics") + errFilterNotFound = errors.New("filter not found") + errExceedMaxTopics = errors.New("exceed max topics") + errExceedMaxAddresses = errors.New("exceed max addresses") ) // The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0 const maxTopics = 4 +// The maximum number of addresses allowed +const maxAddresses = 6 + // TransactionArgs represents the arguments to construct a new transaction // or a message call. type TransactionArgs struct { diff --git a/api/stream.go b/api/stream.go index 8f8768233..ba59abea3 100644 --- a/api/stream.go +++ b/api/stream.go @@ -183,6 +183,12 @@ func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (* // Logs creates a subscription that fires for all new log that match the given filter criteria. func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (*rpc.Subscription, error) { + if len(criteria.Topics) > maxTopics { + return nil, errExceedMaxTopics + } + if len(criteria.Addresses) > maxAddresses { + return nil, errExceedMaxAddresses + } return s.newSubscription( ctx,