diff --git a/Makefile b/Makefile index 4040009f7..6061e7aa0 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ generate: mockery --dir=storage --name=ReceiptIndexer --output=storage/mocks mockery --dir=storage --name=TransactionIndexer --output=storage/mocks mockery --dir=storage --name=AccountIndexer --output=storage/mocks - mockery --all --dir=services/events --output=services/events/mocks + mockery --all --dir=services/ingestion --output=services/ingestion/mocks mockery --dir=models --name=Engine --output=models/mocks .PHONY: ci diff --git a/README.md b/README.md index 34d5adcd2..348e574a7 100644 --- a/README.md +++ b/README.md @@ -90,23 +90,26 @@ it should return: The application can be configured using the following flags at runtime: -| Flag | Default Value | Description | -|---------------------------|------------------|------------------------------------------------------------------------------------------------------------------------| -| `--database-dir` | `./db` | Path to the directory for the database. | -| `--rpc-host` | `localhost` | Host for the JSON RPC API server. | -| `--rpc-port` | `8545` | Port for the JSON RPC API server. | -| `--access-node-grpc-host` | `localhost:3569` | Host to the Flow access node (AN) gRPC API. | -| `--evm-network-id` | `testnet` | EVM network ID (options: `testnet`, `mainnet`). | -| `--flow-network-id` | `emulator` | Flow network ID (options: `emulator`, `previewnet`). | -| `--coinbase` | (required) | Coinbase address to use for fee collection. | -| `--gas-price` | `1` | Static gas price used for EVM transactions. | -| `--coa-address` | (required) | Flow address that holds COA account used for submitting transactions. | -| `--coa-key` | (required) | *WARNING*: Do not use this flag in production! Private key value for the COA address used for submitting transactions. | -| `--coa-resource-create` | `false` | Auto-create the COA resource in the Flow COA account provided if one doesn't exist. | -| `--log-level` | `debug` | Define verbosity of the log output ('debug', 'info', 'error') | -| `--stream-limit` | 10 | Rate-limits the events sent to the client within one second | -| `--stream-timeout` | 3sec | Defines the timeout in seconds the server waits for the event to be sent to the client | - +| Flag | Default Value | Description | +|-----------------------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------| +| `--database-dir` | `./db` | Path to the directory for the database. | +| `--rpc-host` | `localhost` | Host for the JSON RPC API server. | +| `--rpc-port` | `8545` | Port for the JSON RPC API server. | +| `--access-node-grpc-host` | `localhost:3569` | Host to the current spork Flow access node (AN) gRPC API. | +| `--access-node-spork-hosts` | | Previous spork AN hosts, defined following the schema: `{latest height}@{host}` as comma separated list (e.g. `"200@host-1.com,300@host2.com"`) | +| `--evm-network-id` | `testnet` | EVM network ID (options: `testnet`, `mainnet`). | +| `--flow-network-id` | `emulator` | Flow network ID (options: `emulator`, `previewnet`). | +| `--coinbase` | (required) | Coinbase address to use for fee collection. | +| `--init-cadence-height` | 0 | Define the Cadence block height at which to start the indexing. | +| `--gas-price` | `1` | Static gas price used for EVM transactions. | +| `--coa-address` | (required) | Flow address that holds COA account used for submitting transactions. | +| `--coa-key` | (required) | *WARNING*: Do not use this flag in production! Private key value for the COA address used for submitting transactions. | +| `--coa-key-file` | | File path that contains JSON array of COA keys used in key-rotation mechanism, this is exclusive with `coa-key` flag. | +| `--coa-resource-create` | `false` | Auto-create the COA resource in the Flow COA account provided if one doesn't exist. | +| `--log-level` | `debug` | Define verbosity of the log output ('debug', 'info', 'error') | +| `--stream-limit` | 10 | Rate-limits the events sent to the client within one second | +| `--stream-timeout` | 3sec | Defines the timeout in seconds the server waits for the event to be sent to the client | +| `--filter-expiry` | `5m` | Filter defines the time it takes for an idle filter to expire | ## Getting Started diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index fe9690913..c81b4e7e7 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -5,6 +5,13 @@ import ( "errors" "fmt" + "github.com/onflow/flow-go-sdk/access" + "github.com/onflow/flow-go-sdk/access/grpc" + "github.com/onflow/flow-go-sdk/crypto" + broadcast "github.com/onflow/flow-go/engine" + "github.com/onflow/go-ethereum/rpc" + "github.com/rs/zerolog" + "github.com/onflow/flow-evm-gateway/api" "github.com/onflow/flow-evm-gateway/config" "github.com/onflow/flow-evm-gateway/models" @@ -13,11 +20,6 @@ import ( "github.com/onflow/flow-evm-gateway/storage" storageErrs "github.com/onflow/flow-evm-gateway/storage/errors" "github.com/onflow/flow-evm-gateway/storage/pebble" - "github.com/onflow/flow-go-sdk/access/grpc" - "github.com/onflow/flow-go-sdk/crypto" - broadcast "github.com/onflow/flow-go/engine" - "github.com/onflow/go-ethereum/rpc" - "github.com/rs/zerolog" ) func Start(ctx context.Context, cfg *config.Config) error { @@ -107,7 +109,23 @@ func startIngestion( ) error { logger.Info().Msg("starting up event ingestion") - client, err := grpc.NewClient(cfg.AccessNodeGRPCHost) + currentSporkClient, err := grpc.NewClient(cfg.AccessNodeHost) + if err != nil { + return fmt.Errorf("failed to create client connection for host: %s, with error: %w", cfg.AccessNodeHost, err) + } + + // if we provided access node previous spork hosts add them to the client + pastSporkClients := make([]access.Client, len(cfg.AccessNodePreviousSporkHosts)) + for i, host := range cfg.AccessNodePreviousSporkHosts { + grpcClient, err := grpc.NewClient(host) + if err != nil { + return fmt.Errorf("failed to create client connection for host: %s, with error: %w", host, err) + } + + pastSporkClients[i] = grpcClient + } + + client, err := requester.NewCrossSporkClient(currentSporkClient, pastSporkClients, logger) if err != nil { return err } @@ -123,7 +141,7 @@ func startIngestion( } // make sure the provided block to start the indexing can be loaded - _, err = client.GetBlockByHeight(context.Background(), latestCadenceHeight) + _, err = client.GetBlockHeaderByHeight(context.Background(), latestCadenceHeight) if err != nil { return fmt.Errorf("failed to get provided cadence height: %w", err) } @@ -134,7 +152,7 @@ func startIngestion( Uint64("missed-heights", blk.Height-latestCadenceHeight). Msg("indexing cadence height information") - subscriber := ingestion.NewRPCSubscriber(client, cfg.FlowNetworkID) + subscriber := ingestion.NewRPCSubscriber(client, cfg.FlowNetworkID, logger) engine := ingestion.NewEventIngestionEngine( subscriber, blocks, @@ -180,7 +198,7 @@ func startServer( srv := api.NewHTTPServer(l, rpc.DefaultHTTPTimeouts) - client, err := grpc.NewClient(cfg.AccessNodeGRPCHost) + client, err := grpc.NewClient(cfg.AccessNodeHost) if err != nil { return err } diff --git a/config/config.go b/config/config.go index f1fad47e2..43e6ad5d3 100644 --- a/config/config.go +++ b/config/config.go @@ -6,6 +6,7 @@ import ( "io" "math/big" "os" + "strings" "time" "github.com/goccy/go-json" @@ -29,8 +30,10 @@ const LiveNetworkInitCadenceHeght = uint64(1) type Config struct { // DatabaseDir is where the database should be stored. DatabaseDir string - // AccessNodeGRPCHost defines the Flow network AN host. - AccessNodeGRPCHost string + // AccessNodeHost defines the current spork Flow network AN host. + AccessNodeHost string + // AccessNodePreviousSporkHosts contains a list of the ANs hosts for each spork + AccessNodePreviousSporkHosts []string // GRPCPort for the RPC API server RPCPort int // GRPCHost for the RPC API server @@ -71,7 +74,7 @@ type Config struct { func FromFlags() (*Config, error) { cfg := &Config{} - var evmNetwork, coinbase, gas, coa, key, keysPath, flowNetwork, logLevel, filterExpiry string + var evmNetwork, coinbase, gas, coa, key, keysPath, flowNetwork, logLevel, filterExpiry, accessSporkHosts string var streamTimeout int var initHeight, forceStartHeight uint64 @@ -79,7 +82,8 @@ func FromFlags() (*Config, error) { flag.StringVar(&cfg.DatabaseDir, "database-dir", "./db", "Path to the directory for the database") flag.StringVar(&cfg.RPCHost, "rpc-host", "", "Host for the RPC API server") flag.IntVar(&cfg.RPCPort, "rpc-port", 8545, "Port for the RPC API server") - flag.StringVar(&cfg.AccessNodeGRPCHost, "access-node-grpc-host", "localhost:3569", "Host to the flow access node gRPC API") + flag.StringVar(&cfg.AccessNodeHost, "access-node-grpc-host", "localhost:3569", "Host to the flow access node gRPC API") + flag.StringVar(&accessSporkHosts, "access-node-spork-hosts", "", `Previous spork AN hosts, defined following the schema: {host1},{host2} as a comma separated list (e.g. "host-1.com,host2.com")`) flag.StringVar(&evmNetwork, "evm-network-id", "previewnet", "EVM network ID (previewnet, testnet, mainnet)") flag.StringVar(&flowNetwork, "flow-network-id", "flow-emulator", "Flow network ID (flow-emulator, flow-previewnet)") flag.StringVar(&coinbase, "coinbase", "", "Coinbase address to use for fee collection") @@ -184,6 +188,11 @@ func FromFlags() (*Config, error) { } cfg.FilterExpiry = exp + if accessSporkHosts != "" { + heightHosts := strings.Split(accessSporkHosts, ",") + cfg.AccessNodePreviousSporkHosts = append(cfg.AccessNodePreviousSporkHosts, heightHosts...) + } + if forceStartHeight != 0 { cfg.ForceStartCadenceHeight = forceStartHeight } diff --git a/models/events.go b/models/events.go index 942d4bb45..48b843076 100644 --- a/models/events.go +++ b/models/events.go @@ -107,3 +107,21 @@ func (c *CadenceEvents) CadenceHeight() uint64 { func (c *CadenceEvents) Length() int { return len(c.events.Events) } + +// BlockEvents is a wrapper around events streamed, and it also contains an error +type BlockEvents struct { + Events *CadenceEvents + Err error +} + +func NewBlockEvents(events flow.BlockEvents) BlockEvents { + return BlockEvents{ + Events: NewCadenceEvents(events), + } +} + +func NewBlockEventsError(err error) BlockEvents { + return BlockEvents{ + Err: err, + } +} diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index 6af993057..57f285b59 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -2,12 +2,10 @@ package ingestion import ( "context" - "errors" "fmt" "github.com/onflow/flow-evm-gateway/models" "github.com/onflow/flow-evm-gateway/storage" - "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/fvm/evm/types" gethTypes "github.com/onflow/go-ethereum/core/types" @@ -96,45 +94,21 @@ func (e *Engine) Run(ctx context.Context) error { e.log.Info().Uint64("start-cadence-height", latestCadence).Msg("starting ingestion") - events, errs, err := e.subscriber.Subscribe(ctx, latestCadence) - if err != nil { - return fmt.Errorf("failed to subscribe to events: %w", err) - } - e.status.MarkReady() - for { - select { - case <-ctx.Done(): - e.log.Info().Msg("event ingestion received done signal") - return nil - - case blockEvents, ok := <-events: - if !ok { - if ctx.Err() != nil { - return ctx.Err() - } - return models.ErrDisconnected - } - - err = e.processEvents(blockEvents) - if err != nil { - e.log.Error().Err(err).Msg("failed to process EVM events") - return err - } - - case err, ok := <-errs: - if !ok { - if ctx.Err() != nil { - return ctx.Err() - } - - return models.ErrDisconnected - } - - return errors.Join(err, models.ErrDisconnected) + for events := range e.subscriber.Subscribe(ctx, latestCadence) { + if events.Err != nil { + return fmt.Errorf("failure in event subscription: %w", events.Err) + } + + err = e.processEvents(events.Events) + if err != nil { + e.log.Error().Err(err).Msg("failed to process EVM events") + return err } } + + return nil } // processEvents converts the events to block and transactions and indexes them. @@ -149,14 +123,12 @@ func (e *Engine) Run(ctx context.Context) error { // https://github.com/onflow/flow-go/blob/master/fvm/evm/types/events.go // // Any error is unexpected and fatal. -func (e *Engine) processEvents(blockEvents flow.BlockEvents) error { +func (e *Engine) processEvents(events *models.CadenceEvents) error { e.log.Debug(). - Uint64("cadence-height", blockEvents.Height). - Int("cadence-event-length", len(blockEvents.Events)). + Uint64("cadence-height", events.CadenceHeight()). + Int("cadence-event-length", events.Length()). Msg("received new cadence evm events") - events := models.NewCadenceEvents(blockEvents) - // if heartbeat interval with no data still update the cadence height if events.Empty() { if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight()); err != nil { diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index 1ec0d99e6..23273663b 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -3,13 +3,14 @@ package ingestion import ( "context" "encoding/hex" + "github.com/onflow/flow-evm-gateway/services/ingestion/mocks" "math/big" "testing" "github.com/onflow/cadence" "github.com/onflow/cadence/runtime/common" "github.com/onflow/flow-evm-gateway/models" - "github.com/onflow/flow-evm-gateway/services/ingestion/mocks" + storageMock "github.com/onflow/flow-evm-gateway/storage/mocks" "github.com/onflow/flow-go-sdk" broadcast "github.com/onflow/flow-go/engine" @@ -41,12 +42,13 @@ func TestSerialBlockIngestion(t *testing.T) { On("Update"). Return(func() error { return nil }) - eventsChan := make(chan flow.BlockEvents) - subscriber := &mocks.Subscriber{} + eventsChan := make(chan models.BlockEvents) + + subscriber := &mocks.EventSubscriber{} subscriber. On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")). - Return(func(ctx context.Context, latest uint64) (<-chan flow.BlockEvents, <-chan error, error) { - return eventsChan, make(<-chan error), nil + Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents { + return eventsChan }) engine := NewEventIngestionEngine( @@ -64,7 +66,7 @@ func TestSerialBlockIngestion(t *testing.T) { done := make(chan struct{}) go func() { err := engine.Run(context.Background()) - assert.ErrorIs(t, err, models.ErrDisconnected) // we disconnect at the end + assert.NoError(t, err) close(done) }() @@ -85,13 +87,13 @@ func TestSerialBlockIngestion(t *testing.T) { }). Once() - eventsChan <- flow.BlockEvents{ + eventsChan <- models.NewBlockEvents(flow.BlockEvents{ Events: []flow.Event{{ Type: string(blockEvent.Etype), Value: blockCdc, }}, Height: cadenceHeight, - } + }) } close(eventsChan) @@ -117,12 +119,12 @@ func TestSerialBlockIngestion(t *testing.T) { On("Update", mock.Anything, mock.Anything). Return(func(t models.TransactionCall, r *gethTypes.Receipt) error { return nil }) - eventsChan := make(chan flow.BlockEvents) - subscriber := &mocks.Subscriber{} + eventsChan := make(chan models.BlockEvents) + subscriber := &mocks.EventSubscriber{} subscriber. On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")). - Return(func(ctx context.Context, latest uint64) (<-chan flow.BlockEvents, <-chan error, error) { - return eventsChan, make(<-chan error), nil + Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents { + return eventsChan }) engine := NewEventIngestionEngine( @@ -160,24 +162,28 @@ func TestSerialBlockIngestion(t *testing.T) { }). Once() // this should only be called for first valid block - eventsChan <- flow.BlockEvents{ - Events: []flow.Event{{ - Type: string(blockEvent.Etype), - Value: blockCdc, - }}, - Height: cadenceHeight, + eventsChan <- models.BlockEvents{ + Events: models.NewCadenceEvents(flow.BlockEvents{ + Events: []flow.Event{{ + Type: string(blockEvent.Etype), + Value: blockCdc, + }}, + Height: cadenceHeight, + }), } // fail with next block height being incorrect blockCdc, _, blockEvent, err = newBlock(latestHeight + 10) // not sequential next block height require.NoError(t, err) - eventsChan <- flow.BlockEvents{ - Events: []flow.Event{{ - Type: string(blockEvent.Etype), - Value: blockCdc, - }}, - Height: cadenceHeight + 1, + eventsChan <- models.BlockEvents{ + Events: models.NewCadenceEvents(flow.BlockEvents{ + Events: []flow.Event{{ + Type: string(blockEvent.Etype), + Value: blockCdc, + }}, + Height: cadenceHeight + 1, + }), } close(eventsChan) @@ -213,12 +219,12 @@ func TestBlockAndTransactionIngestion(t *testing.T) { On("Update", mock.AnythingOfType("models.TransactionCall"), mock.AnythingOfType("*types.Receipt")). Return(func(tx models.Transaction, receipt *gethTypes.Receipt) error { return nil }) - eventsChan := make(chan flow.BlockEvents) - subscriber := &mocks.Subscriber{} + eventsChan := make(chan models.BlockEvents) + subscriber := &mocks.EventSubscriber{} subscriber. On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")). - Return(func(ctx context.Context, latest uint64) (<-chan flow.BlockEvents, <-chan error, error) { - return eventsChan, make(<-chan error), nil + Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents { + return eventsChan }) txCdc, txEvent, transaction, result, err := newTransaction() @@ -241,7 +247,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { done := make(chan struct{}) go func() { err := engine.Run(context.Background()) - assert.ErrorIs(t, err, models.ErrDisconnected) // we disconnect at the end + assert.NoError(t, err) close(done) }() @@ -275,7 +281,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { }). Once() - eventsChan <- flow.BlockEvents{ + eventsChan <- models.NewBlockEvents(flow.BlockEvents{ Events: []flow.Event{{ Type: string(blockEvent.Etype), Value: blockCdc, @@ -284,7 +290,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { Value: txCdc, }}, Height: nextHeight, - } + }) close(eventsChan) <-done @@ -310,12 +316,12 @@ func TestBlockAndTransactionIngestion(t *testing.T) { On("Update", mock.AnythingOfType("models.TransactionCall"), mock.AnythingOfType("*types.Receipt")). Return(func(tx models.Transaction, receipt *gethTypes.Receipt) error { return nil }) - eventsChan := make(chan flow.BlockEvents) - subscriber := &mocks.Subscriber{} + eventsChan := make(chan models.BlockEvents) + subscriber := &mocks.EventSubscriber{} subscriber. On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")). - Return(func(ctx context.Context, latest uint64) (<-chan flow.BlockEvents, <-chan error, error) { - return eventsChan, make(<-chan error), nil + Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents { + return eventsChan }) txCdc, txEvent, _, _, err := newTransaction() @@ -338,7 +344,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { done := make(chan struct{}) go func() { err := engine.Run(context.Background()) - assert.ErrorIs(t, err, models.ErrDisconnected) // we disconnect at the end + assert.NoError(t, err) close(done) }() @@ -367,7 +373,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { }). Once() - eventsChan <- flow.BlockEvents{ + eventsChan <- models.NewBlockEvents(flow.BlockEvents{ Events: []flow.Event{ // first transaction { @@ -380,7 +386,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { Value: blockCdc, }}, Height: nextHeight, - } + }) close(eventsChan) <-done @@ -404,13 +410,13 @@ func TestBlockAndTransactionIngestion(t *testing.T) { On("Update", mock.Anything, mock.AnythingOfType("*types.Receipt")). Return(func(t models.Transaction, r *gethTypes.Receipt) error { return nil }) - eventsChan := make(chan flow.BlockEvents) - subscriber := &mocks.Subscriber{} + eventsChan := make(chan models.BlockEvents) + subscriber := &mocks.EventSubscriber{} subscriber. On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")). - Return(func(ctx context.Context, latest uint64) (<-chan flow.BlockEvents, <-chan error, error) { + Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents { assert.Equal(t, latestCadenceHeight, latest) - return eventsChan, make(<-chan error), nil + return eventsChan }). Once() @@ -429,7 +435,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { done := make(chan struct{}) go func() { err := engine.Run(context.Background()) - assert.ErrorIs(t, err, models.ErrDisconnected) // we disconnect at the end + assert.NoError(t, err) close(done) }() @@ -496,10 +502,10 @@ func TestBlockAndTransactionIngestion(t *testing.T) { // and it will make the first block be swapped with second block out-of-order events[1], events[2] = events[2], events[1] - eventsChan <- flow.BlockEvents{ + eventsChan <- models.NewBlockEvents(flow.BlockEvents{ Events: events, Height: latestCadenceHeight + 1, - } + }) close(eventsChan) <-done diff --git a/services/ingestion/mocks/EventSubscriber.go b/services/ingestion/mocks/EventSubscriber.go new file mode 100644 index 000000000..6a42a2e7f --- /dev/null +++ b/services/ingestion/mocks/EventSubscriber.go @@ -0,0 +1,50 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + models "github.com/onflow/flow-evm-gateway/models" +) + +// EventSubscriber is an autogenerated mock type for the EventSubscriber type +type EventSubscriber struct { + mock.Mock +} + +// Subscribe provides a mock function with given fields: ctx, height +func (_m *EventSubscriber) Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents { + ret := _m.Called(ctx, height) + + if len(ret) == 0 { + panic("no return value specified for Subscribe") + } + + var r0 <-chan models.BlockEvents + if rf, ok := ret.Get(0).(func(context.Context, uint64) <-chan models.BlockEvents); ok { + r0 = rf(ctx, height) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan models.BlockEvents) + } + } + + return r0 +} + +// NewEventSubscriber creates a new instance of EventSubscriber. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewEventSubscriber(t interface { + mock.TestingT + Cleanup(func()) +}) *EventSubscriber { + mock := &EventSubscriber{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/services/ingestion/mocks/Subscriber.go b/services/ingestion/mocks/Subscriber.go deleted file mode 100644 index 03f31cdd5..000000000 --- a/services/ingestion/mocks/Subscriber.go +++ /dev/null @@ -1,66 +0,0 @@ -// Code generated by mockery v2.21.4. DO NOT EDIT. - -package mocks - -import ( - context "context" - - flow "github.com/onflow/flow-go-sdk" - - mock "github.com/stretchr/testify/mock" -) - -// Subscriber is an autogenerated mock type for the Subscriber type -type Subscriber struct { - mock.Mock -} - -// Subscribe provides a mock function with given fields: ctx, height -func (_m *Subscriber) Subscribe(ctx context.Context, height uint64) (<-chan flow.BlockEvents, <-chan error, error) { - ret := _m.Called(ctx, height) - - var r0 <-chan flow.BlockEvents - var r1 <-chan error - var r2 error - if rf, ok := ret.Get(0).(func(context.Context, uint64) (<-chan flow.BlockEvents, <-chan error, error)); ok { - return rf(ctx, height) - } - if rf, ok := ret.Get(0).(func(context.Context, uint64) <-chan flow.BlockEvents); ok { - r0 = rf(ctx, height) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan flow.BlockEvents) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, uint64) <-chan error); ok { - r1 = rf(ctx, height) - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(<-chan error) - } - } - - if rf, ok := ret.Get(2).(func(context.Context, uint64) error); ok { - r2 = rf(ctx, height) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - -type mockConstructorTestingTNewSubscriber interface { - mock.TestingT - Cleanup(func()) -} - -// NewSubscriber creates a new instance of Subscriber. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewSubscriber(t mockConstructorTestingTNewSubscriber) *Subscriber { - mock := &Subscriber{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/services/ingestion/subscriber.go b/services/ingestion/subscriber.go index 088452d29..48fb2d325 100644 --- a/services/ingestion/subscriber.go +++ b/services/ingestion/subscriber.go @@ -2,55 +2,240 @@ package ingestion import ( "context" + "errors" "fmt" + "github.com/onflow/cadence/runtime/common" + + "github.com/onflow/flow-evm-gateway/models" + "github.com/onflow/flow-evm-gateway/services/requester" + "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go-sdk/access" "github.com/onflow/flow-go/fvm/evm/types" "github.com/onflow/flow-go/fvm/systemcontracts" flowGo "github.com/onflow/flow-go/model/flow" + "github.com/rs/zerolog" ) type EventSubscriber interface { - // Subscribe to relevant events from the provided block height. - // Returns a channel with block events and errors, - // if subscription fails returns an error as the third value. - Subscribe(ctx context.Context, height uint64) (<-chan flow.BlockEvents, <-chan error, error) + // Subscribe to EVM events from the provided height, and return a chanel with the events. + // + // The BlockEvents type will contain an optional error in case + // the error happens, the consumer of the chanel should handle it. + Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents } +var _ EventSubscriber = &RPCSubscriber{} + type RPCSubscriber struct { - client access.Client + client *requester.CrossSporkClient chain flowGo.ChainID + logger zerolog.Logger } -func NewRPCSubscriber(client access.Client, chainID flowGo.ChainID) *RPCSubscriber { - return &RPCSubscriber{client: client, chain: chainID} +func NewRPCSubscriber(client *requester.CrossSporkClient, chainID flowGo.ChainID, logger zerolog.Logger) *RPCSubscriber { + logger = logger.With().Str("component", "subscriber").Logger() + return &RPCSubscriber{client: client, chain: chainID, logger: logger} } -func (r *RPCSubscriber) Subscribe(ctx context.Context, height uint64) (<-chan flow.BlockEvents, <-chan error, error) { - // define events we subscribe to: A.{evm}.EVM.BlockExecuted and A.{evm}.EVM.TransactionExecuted, - // where {evm} is EVM deployed contract address, which depends on the chain ID we configure - evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address) - blockExecutedEvent := common.NewAddressLocation(nil, evmAddress, string(types.EventTypeBlockExecuted)).ID() - transactionExecutedEvent := common.NewAddressLocation(nil, evmAddress, string(types.EventTypeTransactionExecuted)).ID() +// Subscribe will retrieve all the events from the provided height. If the height is from previous +// sporks, it will first backfill all the events in all the previous sporks, and then continue +// to listen all new events in the current spork. +// +// If error is encountered during backfill the subscription will end and the response chanel will be closed. +func (r *RPCSubscriber) Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents { + events := make(chan models.BlockEvents) - filter := flow.EventFilter{ - EventTypes: []string{ - blockExecutedEvent, - transactionExecutedEvent, - }, - } + go func() { + defer func() { + close(events) + }() + + // if the height is from the previous spork, backfill all the events from previous sporks first + if r.client.IsPastSpork(height) { + r.logger.Info(). + Uint64("height", height). + Msg("height found in previous spork, starting to backfill") + + // backfill all the missed events, handling of context cancellation is done by the producer + for ev := range r.backfill(ctx, height) { + events <- ev + + if ev.Err != nil { + return + } - _, err := r.client.GetBlockByHeight(ctx, height) + // keep updating height, so after we are done back-filling + // it will be at the first height in the current spork + height = ev.Events.CadenceHeight() + } + + // after back-filling is done, increment height by one, + // so we start with the height in the current spork + height = height + 1 + } + + r.logger.Info(). + Uint64("next-height", height). + Msg("backfilling done, subscribe for live data") + + // subscribe in the current spork, handling of context cancellation is done by the producer + for ev := range r.subscribe(ctx, height) { + events <- ev + } + + r.logger.Warn().Msg("ended subscription for events") + }() + + return events +} + +// subscribe to events by the provided height and handle any errors. +// +// Subscribing to EVM specific events and handle any disconnection errors +// as well as context cancellations. +func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...access.SubscribeOption) <-chan models.BlockEvents { + events := make(chan models.BlockEvents) + + _, err := r.client.GetBlockHeaderByHeight(ctx, height) if err != nil { - return nil, nil, fmt.Errorf("failed to subscribe for events, the block height %d doesn't exist: %w", height, err) + err = fmt.Errorf("failed to subscribe for events, the block height %d doesn't exist: %w", height, err) + events <- models.NewBlockEventsError(err) + return events } - // todo revisit if we should use custom heartbeat interval grpc.WithHeartbeatInterval(1) - evs, errs, err := r.client.SubscribeEventsByBlockHeight(ctx, height, filter) + evs, errs, err := r.client.SubscribeEventsByBlockHeight(ctx, height, r.blocksFilter(), opts...) if err != nil { - return nil, nil, fmt.Errorf("failed to subscribe to events by block height: %w", err) + events <- models.NewBlockEventsError(fmt.Errorf("failed to subscribe to events by block height: %w", err)) + return events } - return evs, errs, nil + go func() { + defer func() { + close(events) + }() + + for ctx.Err() == nil { + select { + case <-ctx.Done(): + r.logger.Info().Msg("event ingestion received done signal") + return + + case blockEvents, ok := <-evs: + if !ok { + var err error + err = models.ErrDisconnected + if ctx.Err() != nil { + err = ctx.Err() + } + events <- models.NewBlockEventsError(err) + return + } + + events <- models.NewBlockEvents(blockEvents) + + case err, ok := <-errs: + if !ok { + var err error + err = models.ErrDisconnected + if ctx.Err() != nil { + err = ctx.Err() + } + events <- models.NewBlockEventsError(err) + return + } + + events <- models.NewBlockEventsError(errors.Join(err, models.ErrDisconnected)) + return + } + } + }() + + return events +} + +// backfill will use the provided height and with the client for the provided spork will start backfilling +// events. Before subscribing, it will check what is the latest block in the current spork (defined by height) +// and check for each event it receives whether we reached the end, if we reach the end it will increase +// the height by one (next height), and check if we are still in previous sporks, if so repeat everything, +// otherwise return. +func (r *RPCSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents { + events := make(chan models.BlockEvents) + + go func() { + defer func() { + close(events) + }() + + for { + // check if the current height is still in past sporks, and if not return since we are done with backfilling + if !r.client.IsPastSpork(height) { + r.logger.Info(). + Uint64("height", height). + Msg("completed backfilling") + + return + } + + latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height) + if err != nil { + events <- models.NewBlockEventsError(err) + return + } + + r.logger.Info(). + Uint64("start-height", height). + Uint64("last-spork-height", latestHeight). + Msg("backfilling spork") + + for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(1)) { + events <- ev + + if ev.Err != nil { + return + } + + r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", ev.Events.CadenceHeight(), latestHeight)) + + if ev.Events != nil && ev.Events.CadenceHeight() == latestHeight { + height = ev.Events.CadenceHeight() + 1 // go to next height in the next spork + + r.logger.Info(). + Uint64("next-height", height). + Msg("reached the end of spork, checking next spork") + + break + } + } + } + }() + + return events +} + +// blockFilter define events we subscribe to: +// A.{evm}.EVM.BlockExecuted and A.{evm}.EVM.TransactionExecuted, +// where {evm} is EVM deployed contract address, which depends on the chain ID we configure. +func (r *RPCSubscriber) blocksFilter() flow.EventFilter { + evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address) + + blockExecutedEvent := common.NewAddressLocation( + nil, + evmAddress, + string(types.EventTypeBlockExecuted), + ).ID() + + transactionExecutedEvent := common.NewAddressLocation( + nil, + evmAddress, + string(types.EventTypeTransactionExecuted), + ).ID() + + return flow.EventFilter{ + EventTypes: []string{ + blockExecutedEvent, + transactionExecutedEvent, + }, + } } diff --git a/services/ingestion/subscriber_test.go b/services/ingestion/subscriber_test.go new file mode 100644 index 000000000..c55163748 --- /dev/null +++ b/services/ingestion/subscriber_test.go @@ -0,0 +1,122 @@ +package ingestion + +import ( + "context" + "testing" + + "github.com/onflow/flow-go-sdk/access" + + "github.com/onflow/flow-evm-gateway/models" + "github.com/onflow/flow-evm-gateway/services/requester" + + "github.com/onflow/flow-go-sdk" + "github.com/onflow/flow-go-sdk/access/mocks" + flowGo "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +type mockClient struct { + *mocks.Client + getLatestBlockHeaderFunc func(context.Context, bool) (*flow.BlockHeader, error) + getBlockHeaderByHeightFunc func(context.Context, uint64) (*flow.BlockHeader, error) + subscribeEventsByBlockHeightFunc func(context.Context, uint64, flow.EventFilter, ...access.SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error) +} + +func (c *mockClient) GetBlockHeaderByHeight(ctx context.Context, height uint64) (*flow.BlockHeader, error) { + return c.getBlockHeaderByHeightFunc(ctx, height) +} + +func (c *mockClient) GetLatestBlockHeader(ctx context.Context, sealed bool) (*flow.BlockHeader, error) { + return c.getLatestBlockHeaderFunc(ctx, sealed) +} + +func (c *mockClient) SubscribeEventsByBlockHeight( + ctx context.Context, + startHeight uint64, + filter flow.EventFilter, + opts ...access.SubscribeOption, +) (<-chan flow.BlockEvents, <-chan error, error) { + return c.subscribeEventsByBlockHeightFunc(ctx, startHeight, filter, opts...) +} + +func setupClient(startHeight uint64, endHeight uint64) access.Client { + return &mockClient{ + Client: &mocks.Client{}, + getLatestBlockHeaderFunc: func(ctx context.Context, sealed bool) (*flow.BlockHeader, error) { + return &flow.BlockHeader{ + Height: endHeight, + }, nil + }, + getBlockHeaderByHeightFunc: func(ctx context.Context, height uint64) (*flow.BlockHeader, error) { + if height < startHeight || height > endHeight { + return nil, storage.ErrNotFound + } + + return &flow.BlockHeader{ + Height: height, + }, nil + }, + subscribeEventsByBlockHeightFunc: func( + ctx context.Context, + startHeight uint64, + filter flow.EventFilter, + opts ...access.SubscribeOption, + ) (<-chan flow.BlockEvents, <-chan error, error) { + events := make(chan flow.BlockEvents) + + go func() { + defer close(events) + + for i := startHeight; i <= endHeight; i++ { + events <- flow.BlockEvents{ + Height: i, + } + } + }() + + return events, make(chan error), nil + }, + } +} + +// this test simulates two previous sporks and current spork +// the subscriber should start with spork1Client then proceed to +// spork2Client and end with currentClient. +// All event heights should be emitted in sequence. +func Test_Subscribing(t *testing.T) { + + const endHeight = 50 + sporkClients := []access.Client{ + setupClient(1, 10), + setupClient(11, 20), + } + currentClient := setupClient(21, endHeight) + + client, err := requester.NewCrossSporkClient(currentClient, sporkClients, zerolog.Nop()) + require.NoError(t, err) + + subscriber := NewRPCSubscriber(client, flowGo.Emulator, zerolog.Nop()) + + events := subscriber.Subscribe(context.Background(), 1) + + var prevHeight uint64 + + for ev := range events { + if prevHeight == endHeight { + require.ErrorIs(t, ev.Err, models.ErrDisconnected) + break + } + + require.NoError(t, ev.Err) + + // this makes sure all the event heights are sequential + eventHeight := ev.Events.CadenceHeight() + require.Equal(t, prevHeight+1, eventHeight) + prevHeight = eventHeight + } + + // this makes sure we indexed all the events + require.Equal(t, uint64(endHeight), prevHeight) +} diff --git a/services/requester/cross-spork_client.go b/services/requester/cross-spork_client.go new file mode 100644 index 000000000..f33093247 --- /dev/null +++ b/services/requester/cross-spork_client.go @@ -0,0 +1,155 @@ +package requester + +import ( + "context" + "fmt" + + "github.com/onflow/cadence" + "github.com/onflow/flow-go-sdk" + "github.com/onflow/flow-go-sdk/access" + "github.com/rs/zerolog" + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" +) + +// CrossSporkClient is a wrapper around the Flow AN client that can +// access different AN APIs based on the height boundaries of the sporks. +// +// Each spork is defined with the last height included in that spork, +// based on the list we know which AN client to use when requesting the data. +// +// Any API that supports cross-spork access must have a defined function +// that shadows the original access Client function. +type CrossSporkClient struct { + logger zerolog.Logger + // this map holds the last heights and clients for each spork + sporkClients map[uint64]access.Client + sporkBoundaries []uint64 + access.Client +} + +// NewCrossSporkClient creates a new instance of the multi-spork client. It requires +// the current spork client and a slice of past spork clients. +func NewCrossSporkClient( + currentSpork access.Client, + pastSporks []access.Client, + logger zerolog.Logger, +) (*CrossSporkClient, error) { + client := &CrossSporkClient{ + logger: logger, + sporkClients: make(map[uint64]access.Client), + Client: currentSpork, + } + + for _, sporkClient := range pastSporks { + if err := client.addSpork(sporkClient); err != nil { + return nil, err + } + } + + // create a descending list of block heights that represent boundaries + // of each spork, after crossing each height, we use a different client + heights := maps.Keys(client.sporkClients) + slices.Sort(heights) + slices.Reverse(heights) // make it descending + client.sporkBoundaries = heights + + return client, nil +} + +// addSpork will add a new spork host defined by the last height boundary in that spork. +func (c *CrossSporkClient) addSpork(client access.Client) error { + header, err := client.GetLatestBlockHeader(context.Background(), true) + if err != nil { + return fmt.Errorf("could not get latest height using the spork client: %w", err) + } + + lastHeight := header.Height + + if _, ok := c.sporkClients[lastHeight]; ok { + return fmt.Errorf("provided last height already exists") + } + + c.sporkClients[lastHeight] = client + + c.logger.Info(). + Uint64("spork-boundary", lastHeight). + Msg("added spork specific client") + + return nil +} + +// IsPastSpork will check if the provided height is contained in the previous sporks. +func (c *CrossSporkClient) IsPastSpork(height uint64) bool { + return len(c.sporkBoundaries) > 0 && height <= c.sporkBoundaries[0] +} + +// getClientForHeight returns the client for the given height. It starts by using the current spork client, +// then iteratively checks the upper height boundaries in descending order and returns the last client +// that still contains the given height within its upper height limit. If no client is found, it returns +// the current spork client. +// Please note that even if a client for provided height is found we don't guarantee the data being available +// because it still might not have access to the height provided, because there might be other sporks with +// lower height boundaries that we didn't configure for. +// This would result in the error when using the client to access such data. +func (c *CrossSporkClient) getClientForHeight(height uint64) access.Client { + + // start by using the current spork client, then iterate all the upper height boundaries + // and find the last client that still contains the height in its upper height limit + client := c.Client + for _, upperBound := range c.sporkBoundaries { + if upperBound >= height { + client = c.sporkClients[upperBound] + + c.logger.Debug(). + Uint64("spork-boundary", upperBound). + Msg("using previous spork client") + } + } + + return client +} + +// GetLatestHeightForSpork will determine the spork client in which the provided height is contained +// and then find the latest height in that spork. +func (c *CrossSporkClient) GetLatestHeightForSpork(ctx context.Context, height uint64) (uint64, error) { + block, err := c. + getClientForHeight(height). + GetLatestBlockHeader(ctx, true) + if err != nil { + return 0, err + } + + return block.Height, nil +} + +func (c *CrossSporkClient) GetBlockHeaderByHeight( + ctx context.Context, + height uint64, +) (*flow.BlockHeader, error) { + return c. + getClientForHeight(height). + GetBlockHeaderByHeight(ctx, height) +} + +func (c *CrossSporkClient) ExecuteScriptAtBlockHeight( + ctx context.Context, + height uint64, + script []byte, + arguments []cadence.Value, +) (cadence.Value, error) { + return c. + getClientForHeight(height). + ExecuteScriptAtBlockHeight(ctx, height, script, arguments) +} + +func (c *CrossSporkClient) SubscribeEventsByBlockHeight( + ctx context.Context, + startHeight uint64, + filter flow.EventFilter, + opts ...access.SubscribeOption, +) (<-chan flow.BlockEvents, <-chan error, error) { + return c. + getClientForHeight(startHeight). + SubscribeEventsByBlockHeight(ctx, startHeight, filter, opts...) +} diff --git a/tests/helpers.go b/tests/helpers.go index a1d169d5e..8238d254e 100644 --- a/tests/helpers.go +++ b/tests/helpers.go @@ -129,21 +129,21 @@ func servicesSetup(t *testing.T) (emulator.Emulator, func()) { // default config cfg := &config.Config{ - DatabaseDir: t.TempDir(), - AccessNodeGRPCHost: "localhost:3569", // emulator - RPCPort: 8545, - RPCHost: "127.0.0.1", - FlowNetworkID: "flow-emulator", - EVMNetworkID: evmTypes.FlowEVMPreviewNetChainID, - Coinbase: common.HexToAddress(eoaTestAddress), - COAAddress: service.Address, - COAKey: service.PrivateKey, - CreateCOAResource: false, - GasPrice: new(big.Int).SetUint64(0), - LogLevel: zerolog.DebugLevel, - LogWriter: zerolog.NewConsoleWriter(), - StreamTimeout: time.Second * 30, - StreamLimit: 10, + DatabaseDir: t.TempDir(), + AccessNodeHost: "localhost:3569", // emulator + RPCPort: 8545, + RPCHost: "127.0.0.1", + FlowNetworkID: "flow-emulator", + EVMNetworkID: evmTypes.FlowEVMPreviewNetChainID, + Coinbase: common.HexToAddress(eoaTestAddress), + COAAddress: service.Address, + COAKey: service.PrivateKey, + CreateCOAResource: false, + GasPrice: new(big.Int).SetUint64(0), + LogLevel: zerolog.DebugLevel, + LogWriter: zerolog.NewConsoleWriter(), + StreamTimeout: time.Second * 30, + StreamLimit: 10, } if !logOutput { diff --git a/tests/integration_test.go b/tests/integration_test.go index 1607e3b26..5277db43a 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -56,19 +56,19 @@ func Test_ConcurrentTransactionSubmission(t *testing.T) { require.NoError(t, err) cfg := &config.Config{ - DatabaseDir: t.TempDir(), - AccessNodeGRPCHost: grpcHost, - RPCPort: 8545, - RPCHost: "127.0.0.1", - FlowNetworkID: "flow-emulator", - EVMNetworkID: types.FlowEVMTestNetChainID, - Coinbase: eoaTestAccount, - COAAddress: *createdAddr, - COAKeys: keys, - CreateCOAResource: true, - GasPrice: new(big.Int).SetUint64(0), - LogLevel: zerolog.DebugLevel, - LogWriter: os.Stdout, + DatabaseDir: t.TempDir(), + AccessNodeHost: grpcHost, + RPCPort: 8545, + RPCHost: "127.0.0.1", + FlowNetworkID: "flow-emulator", + EVMNetworkID: types.FlowEVMTestNetChainID, + Coinbase: eoaTestAccount, + COAAddress: *createdAddr, + COAKeys: keys, + CreateCOAResource: true, + GasPrice: new(big.Int).SetUint64(0), + LogLevel: zerolog.DebugLevel, + LogWriter: os.Stdout, } // todo change this test to use ingestion and emulator directly so we can completely remove