diff --git a/Makefile b/Makefile index 026a75590..9205c2706 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,6 @@ generate: mockery --dir=storage --name=TransactionIndexer --output=storage/mocks mockery --dir=storage --name=AccountIndexer --output=storage/mocks mockery --dir=storage --name=TraceIndexer --output=storage/mocks - mockery --all --dir=services/traces --output=services/traces/mocks mockery --all --dir=services/ingestion --output=services/ingestion/mocks mockery --dir=models --name=Engine --output=models/mocks diff --git a/README.md b/README.md index 393dd6396..8cc4d35e3 100644 --- a/README.md +++ b/README.md @@ -222,14 +222,10 @@ The application can be configured using the following flags at runtime: | `stream-limit` | `10` | Rate-limit for client events sent per second | | `rate-limit` | `50` | Requests per second limit for clients over any protocol (ws/http) | | `address-header` | `""` | Header for client IP when server is behind a proxy | -| `heartbeat-interval` | `100` | Interval for AN event subscription heartbeats | | `stream-timeout` | `3` | Timeout in seconds for sending events to clients | | `force-start-height` | `0` | Force-set starting Cadence height (local/testing use only) | | `wallet-api-key` | `""` | ECDSA private key for wallet APIs (local/testing use only) | | `filter-expiry` | `5m` | Expiry time for idle filters | -| `traces-gcp-bucket` | `""` | GCP bucket name for transaction traces | -| `traces-backfill-start-height` | `0` | Start height for backfilling transaction traces | -| `traces-backfill-end-height` | `0` | End height for backfilling transaction traces | | `index-only` | `false` | Run in index-only mode, allowing state queries and indexing but no transaction sending | | `profiler-enabled` | `false` | Enable the pprof profiler server | | `profiler-host` | `localhost` | Host for the pprof profiler | diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 17da8425c..9398019d8 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -25,7 +25,6 @@ import ( "github.com/onflow/flow-evm-gateway/services/ingestion" "github.com/onflow/flow-evm-gateway/services/replayer" "github.com/onflow/flow-evm-gateway/services/requester" - "github.com/onflow/flow-evm-gateway/services/traces" "github.com/onflow/flow-evm-gateway/storage" "github.com/onflow/flow-evm-gateway/storage/pebble" @@ -64,7 +63,6 @@ type Bootstrap struct { server *api.Server metrics *metrics.Server events *ingestion.Engine - traces *traces.Engine profiler *api.ProfileServer } @@ -169,71 +167,6 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error { return nil } -func (b *Bootstrap) StartTraceDownloader(ctx context.Context) error { - l := b.logger.With().Str("component", "bootstrap-traces").Logger() - l.Info().Msg("starting engine") - - // create gcp downloader - downloader, err := traces.NewGCPDownloader(b.config.TracesBucketName, b.logger) - if err != nil { - return err - } - - // initialize trace downloader engine - b.traces = traces.NewTracesIngestionEngine( - b.publishers.Block, - b.storages.Blocks, - b.storages.Traces, - downloader, - b.logger, - b.collector, - ) - - StartEngine(ctx, b.traces, l) - - if b.config.TracesBackfillStartHeight > 0 { - startHeight := b.config.TracesBackfillStartHeight - if _, err := b.storages.Blocks.GetByHeight(startHeight); err != nil { - return fmt.Errorf("failed to get provided start height %d in db: %w", startHeight, err) - } - - cadenceStartHeight, err := b.storages.Blocks.GetCadenceHeight(startHeight) - if err != nil { - return fmt.Errorf("failed to get cadence height for backfill start height %d: %w", startHeight, err) - } - - if cadenceStartHeight < b.config.InitCadenceHeight { - b.logger.Warn(). - Uint64("evm-start-height", startHeight). - Uint64("cadence-start-height", cadenceStartHeight). - Uint64("init-cadence-height", b.config.InitCadenceHeight). - Msg("backfill start height is before initial cadence height. data may be missing from configured traces bucket") - } - - endHeight := b.config.TracesBackfillEndHeight - if endHeight == 0 { - endHeight, err = b.storages.Blocks.LatestEVMHeight() - if err != nil { - return fmt.Errorf("failed to get latest EVM height: %w", err) - } - } else if _, err := b.storages.Blocks.GetByHeight(endHeight); err != nil { - return fmt.Errorf("failed to get provided end height %d in db: %w", endHeight, err) - } - - go b.traces.Backfill(startHeight, endHeight) - } - - return nil -} - -func (b *Bootstrap) StopTraceDownloader() { - if b.traces == nil { - return - } - b.logger.Warn().Msg("stopping trace downloader engine") - b.traces.Stop() -} - func (b *Bootstrap) StopEventIngestion() { if b.events == nil { return @@ -336,10 +269,7 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error { ratelimiter, ) - var debugAPI *api.DebugAPI - if b.config.TracesEnabled { - debugAPI = api.NewDebugAPI(b.storages.Traces, b.storages.Blocks, b.logger, b.collector) - } + var debugAPI = api.NewDebugAPI(b.storages.Traces, b.storages.Blocks, b.logger, b.collector) var walletAPI *api.WalletAPI if b.config.WalletEnabled { @@ -562,12 +492,6 @@ func Run(ctx context.Context, cfg *config.Config, ready chan struct{}) error { return err } - if cfg.TracesEnabled { - if err := boot.StartTraceDownloader(ctx); err != nil { - return fmt.Errorf("failed to start trace downloader engine: %w", err) - } - } - if err := boot.StartEventIngestion(ctx); err != nil { return fmt.Errorf("failed to start event ingestion engine: %w", err) } @@ -593,7 +517,6 @@ func Run(ctx context.Context, cfg *config.Config, ready chan struct{}) error { boot.StopEventIngestion() boot.StopMetricsServer() - boot.StopTraceDownloader() boot.StopAPIServer() return nil diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index 2d9182c4e..332ae7c01 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -72,7 +72,7 @@ func parseConfigFromFlags() error { if g, ok := new(big.Int).SetString(gas, 10); ok { cfg.GasPrice = g - } else if !ok { + } else { return fmt.Errorf("invalid gas price") } @@ -201,12 +201,6 @@ func parseConfigFromFlags() error { cfg.ForceStartCadenceHeight = forceStartHeight } - cfg.TracesEnabled = cfg.TracesBucketName != "" - - if cfg.TracesBackfillStartHeight > 0 && cfg.TracesBackfillEndHeight > 0 && cfg.TracesBackfillStartHeight > cfg.TracesBackfillEndHeight { - return fmt.Errorf("traces backfill start height must be less than the end height") - } - if walletKey != "" { k, err := gethCrypto.HexToECDSA(walletKey) if err != nil { @@ -272,9 +266,6 @@ func init() { Cmd.Flags().IntVar(&streamTimeout, "stream-timeout", 3, "Defines the timeout in seconds the server waits for the event to be sent to the client") Cmd.Flags().Uint64Var(&forceStartHeight, "force-start-height", 0, "Force set starting Cadence height. WARNING: This should only be used locally or for testing, never in production.") Cmd.Flags().StringVar(&filterExpiry, "filter-expiry", "5m", "Filter defines the time it takes for an idle filter to expire") - Cmd.Flags().StringVar(&cfg.TracesBucketName, "traces-gcp-bucket", "", "GCP bucket name where transaction traces are stored") - Cmd.Flags().Uint64Var(&cfg.TracesBackfillStartHeight, "traces-backfill-start-height", 0, "evm block height from which to start backfilling missing traces.") - Cmd.Flags().Uint64Var(&cfg.TracesBackfillEndHeight, "traces-backfill-end-height", 0, "evm block height until which to backfill missing traces. If 0, backfill until the latest block") Cmd.Flags().StringVar(&cloudKMSProjectID, "coa-cloud-kms-project-id", "", "The project ID containing the KMS keys, e.g. 'flow-evm-gateway'") Cmd.Flags().StringVar(&cloudKMSLocationID, "coa-cloud-kms-location-id", "", "The location ID where the key ring is grouped into, e.g. 'global'") Cmd.Flags().StringVar(&cloudKMSKeyRingID, "coa-cloud-kms-key-ring-id", "", "The key ring ID where the KMS keys exist, e.g. 'tx-signing'") diff --git a/config/config.go b/config/config.go index 13f572070..ef71500be 100644 --- a/config/config.go +++ b/config/config.go @@ -74,14 +74,6 @@ type Config struct { FilterExpiry time.Duration // ForceStartCadenceHeight will force set the starting Cadence height, this should be only used for testing or locally. ForceStartCadenceHeight uint64 - // TracesBucketName sets the GCP bucket name where transaction traces are being stored. - TracesBucketName string - // TracesEnabled sets whether the node is supporting transaction traces. - TracesEnabled bool - // TracesBackfillStartHeight sets the starting block height for backfilling missing traces. - TracesBackfillStartHeight uint64 - // TracesBackfillEndHeight sets the ending block height for backfilling missing traces. - TracesBackfillEndHeight uint64 // WalletEnabled sets whether wallet APIs are enabled WalletEnabled bool // WalletKey used for signing transactions diff --git a/go.mod b/go.mod index 462b19566..8358b3b3a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/onflow/flow-evm-gateway go 1.22 require ( - cloud.google.com/go/storage v1.36.0 github.com/cockroachdb/pebble v1.1.1 github.com/goccy/go-json v0.10.2 github.com/hashicorp/golang-lru/v2 v2.0.7 @@ -23,7 +22,6 @@ require ( github.com/stretchr/testify v1.9.0 golang.org/x/exp v0.0.0-20240119083558-1b970713d09a golang.org/x/sync v0.8.0 - google.golang.org/api v0.162.0 google.golang.org/grpc v1.63.2 ) @@ -33,6 +31,7 @@ require ( cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v1.1.6 // indirect cloud.google.com/go/kms v1.15.7 // indirect + cloud.google.com/go/storage v1.36.0 // indirect github.com/DataDog/zstd v1.5.2 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/SaveTheRbtz/mph v0.1.1-0.20240117162131-4166ec7869bc // indirect @@ -200,6 +199,7 @@ require ( golang.org/x/time v0.5.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect gonum.org/v1/gonum v0.14.0 // indirect + google.golang.org/api v0.162.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect diff --git a/services/traces/downloader.go b/services/traces/downloader.go deleted file mode 100644 index 4647ee594..000000000 --- a/services/traces/downloader.go +++ /dev/null @@ -1,78 +0,0 @@ -package traces - -import ( - "context" - "encoding/json" - "fmt" - "io" - "time" - - "cloud.google.com/go/storage" - "github.com/onflow/flow-go-sdk" - "github.com/onflow/go-ethereum/common" - "github.com/rs/zerolog" - "google.golang.org/api/option" -) - -const downloadTimeout = 60 * time.Minute - -type Downloader interface { - // Download traces or returning an error with the failure - Download(txID common.Hash, blockIO flow.Identifier) (json.RawMessage, error) -} - -var _ Downloader = &GCPDownloader{} - -type GCPDownloader struct { - client *storage.Client - logger zerolog.Logger - bucket *storage.BucketHandle -} - -func NewGCPDownloader(bucketName string, logger zerolog.Logger) (*GCPDownloader, error) { - if bucketName == "" { - return nil, fmt.Errorf("must provide bucket name") - } - - ctx := context.Background() - // we don't require authentication for public bucket - client, err := storage.NewClient(ctx, option.WithoutAuthentication()) - if err != nil { - return nil, fmt.Errorf("failed to create Google Cloud Storage client: %w", err) - } - - return &GCPDownloader{ - client: client, - logger: logger, - bucket: client.Bucket(bucketName), - }, nil -} - -func (g *GCPDownloader) Download(txID common.Hash, blockID flow.Identifier) (json.RawMessage, error) { - l := g.logger.With(). - Str("tx-id", txID.String()). - Str("cadence-block-id", blockID.String()). - Logger() - - l.Debug().Msg("downloading transaction trace") - - ctx, cancel := context.WithTimeout(context.Background(), downloadTimeout) - defer cancel() - - id := fmt.Sprintf("%s-%s", blockID.String(), txID.String()) - - rc, err := g.bucket.Object(id).NewReader(ctx) - if err != nil { - return nil, fmt.Errorf("failed to download object id %s: %w", id, err) - } - defer rc.Close() - - trace, err := io.ReadAll(rc) - if err != nil { - return nil, fmt.Errorf("failed to read trace id %s: %w", id, err) - } - - l.Info().Int("trace-size", len(trace)).Msg("transaction trace downloaded") - - return trace, nil -} diff --git a/services/traces/engine.go b/services/traces/engine.go deleted file mode 100644 index a76830c01..000000000 --- a/services/traces/engine.go +++ /dev/null @@ -1,193 +0,0 @@ -package traces - -import ( - "context" - "sync" - "time" - - "github.com/onflow/flow-go-sdk" - gethCommon "github.com/onflow/go-ethereum/common" - "github.com/rs/zerolog" - "github.com/sethvargo/go-retry" - - "github.com/onflow/flow-evm-gateway/metrics" - "github.com/onflow/flow-evm-gateway/models" - "github.com/onflow/flow-evm-gateway/storage" -) - -var _ models.Engine = &Engine{} - -// Engine is an implementation of the trace downloader engine. -// -// Traces are ethereum transaction execution traces: https://geth.ethereum.org/docs/developers/evm-tracing -// Currently EVM gateway doesn't produce the traces since it doesn't -// execute the transactions and is thus relying on the execution node -// to produce and upload the traces during execution. This engine -// listens for new transaction events and then downloads and index the -// traces from the transaction execution. -type Engine struct { - *models.EngineStatus - - logger zerolog.Logger - blocksPublisher *models.Publisher[*models.Block] - blocks storage.BlockIndexer - traces storage.TraceIndexer - downloader Downloader - collector metrics.Collector -} - -// NewTracesIngestionEngine creates a new instance of the engine. -func NewTracesIngestionEngine( - blocksPublisher *models.Publisher[*models.Block], - blocks storage.BlockIndexer, - traces storage.TraceIndexer, - downloader Downloader, - logger zerolog.Logger, - collector metrics.Collector, -) *Engine { - return &Engine{ - EngineStatus: models.NewEngineStatus(), - - logger: logger.With().Str("component", "trace-ingestion").Logger(), - blocksPublisher: blocksPublisher, - blocks: blocks, - traces: traces, - downloader: downloader, - collector: collector, - } -} - -// Run the engine. -// TODO: use the context to stop the engine. -func (e *Engine) Run(ctx context.Context) error { - // subscribe to new blocks - e.blocksPublisher.Subscribe(e) - - e.MarkReady() - return nil -} - -// Notify is a handler that is being used to subscribe for new EVM block notifications. -// This method should be non-blocking. -func (e *Engine) Notify(block *models.Block) { - // If the block has no transactions, we simply return early - // as there are no transaction traces to index. - if len(block.TransactionHashes) == 0 { - return - } - - l := e.logger.With().Uint64("evm-height", block.Height).Logger() - - cadenceID, err := e.blocks.GetCadenceID(block.Height) - if err != nil { - l.Error().Err(err).Msg("failed to get cadence block ID") - return - } - - go e.indexBlockTraces(block, cadenceID, false) -} - -// indexBlockTraces iterates the block transaction hashes and tries to download the traces -func (e *Engine) indexBlockTraces(evmBlock *models.Block, cadenceBlockID flow.Identifier, skipExisting bool) { - ctx, cancel := context.WithTimeout(context.Background(), downloadTimeout) - defer cancel() - - const maxConcurrentDownloads = 5 // limit number of concurrent downloads - limiter := make(chan struct{}, maxConcurrentDownloads) - - wg := sync.WaitGroup{} - - for _, h := range evmBlock.TransactionHashes { - wg.Add(1) - limiter <- struct{}{} // acquire a slot - - go func(h gethCommon.Hash) { - defer wg.Done() - defer func() { <-limiter }() // release a slot after done - - l := e.logger.With(). - Str("tx-id", h.String()). - Uint64("evm-height", evmBlock.Height). - Str("cadence-block-id", cadenceBlockID.String()). - Logger() - - if skipExisting { - if _, err := e.traces.GetTransaction(h); err == nil { - l.Debug().Msg("trace already downloaded") - return - } - } - - err := retry.Fibonacci(ctx, time.Second*1, func(ctx context.Context) error { - trace, err := e.downloader.Download(h, cadenceBlockID) - if err != nil { - l.Warn().Err(err).Msg("retrying failed download") - return retry.RetryableError(err) - } - - return e.traces.StoreTransaction(h, trace, nil) - }) - if err != nil { - e.collector.TraceDownloadFailed() - l.Error().Err(err).Msg("failed to download trace") - return - } - l.Info().Msg("trace downloaded successfully") - }(h) - } - - wg.Wait() -} - -// Error is required by the publisher, and we just return a nil, -// since the errors are handled gracefully in the indexBlockTraces -func (e *Engine) Error() <-chan error { - return nil -} - -func (e *Engine) Stop() { - e.MarkStopped() -} - -// Backfill redownloads traces for blocks from EVM start to end height. -func (e *Engine) Backfill(start uint64, end uint64) { - select { - case <-e.Ready(): - case <-e.Done(): - return - } - - lg := e.logger.With().Uint64("start", start).Uint64("end", end).Logger() - - lg.Info().Msg("backfilling traces") - for height := start; height <= end; height++ { - select { - case <-e.Done(): - return - case <-e.Stopped(): - return - default: - } - - l := lg.With().Uint64("evm-height", height).Logger() - - block, err := e.blocks.GetByHeight(height) - if err != nil { - l.Error().Err(err).Msg("failed to get block by height") - return - } - - if len(block.TransactionHashes) == 0 { - continue - } - - cadenceID, err := e.blocks.GetCadenceID(block.Height) - if err != nil { - l.Error().Err(err).Msg("failed to get cadence block ID") - return - } - - e.indexBlockTraces(block, cadenceID, true) - } - lg.Info().Msg("done backfilling traces") -} diff --git a/services/traces/engine_test.go b/services/traces/engine_test.go deleted file mode 100644 index 028309918..000000000 --- a/services/traces/engine_test.go +++ /dev/null @@ -1,288 +0,0 @@ -package traces - -import ( - "context" - "encoding/json" - "fmt" - "slices" - "strings" - "testing" - "time" - - pebbleDB "github.com/cockroachdb/pebble" - "github.com/onflow/flow-go-sdk" - gethCommon "github.com/onflow/go-ethereum/common" - "github.com/rs/zerolog" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - "github.com/onflow/flow-evm-gateway/metrics" - "github.com/onflow/flow-evm-gateway/models" - "github.com/onflow/flow-evm-gateway/services/traces/mocks" - storageMock "github.com/onflow/flow-evm-gateway/storage/mocks" -) - -// this test makes sure once a notification for a new block is triggered -// the block transaction hashes are iterated, and for each a trace is -// downloaded and stored. -func TestTraceIngestion(t *testing.T) { - t.Run("successful single block ingestion", func(t *testing.T) { - blockPublisher := models.NewPublisher[*models.Block]() - blocks := &storageMock.BlockIndexer{} - trace := &storageMock.TraceIndexer{} - downloader := &mocks.Downloader{} - - txTrace := func(id gethCommon.Hash) json.RawMessage { - return json.RawMessage(fmt.Sprintf(`{ - "id": "%s", - "from":"0x42fdd562221741a1db62a0f69a5a680367f07e33", - "gas":"0x15f900", - "gasUsed":"0x387dc", - "to":"0xca11bde05977b3631167028862be2a173976ca11" - }`, id.String())) - } - - latestHeight := uint64(0) - blockID := flow.Identifier{0x09} - hashes := []gethCommon.Hash{{0x1}, {0x2}, {0x3}} - block := storageMock.NewBlock(latestHeight + 1) - block.TransactionHashes = hashes - - blocks. - On("GetByHeight", mock.Anything). - Return(func(height uint64) (*models.Block, error) { - require.Equal(t, latestHeight+1, height) // make sure it gets next block - return block, nil - }) - - blocks. - On("GetCadenceID", mock.Anything). - Return(func(height uint64) (flow.Identifier, error) { - require.Equal(t, latestHeight+1, height) - return blockID, nil - }) - - downloader. - On("Download", mock.Anything, mock.Anything). - Return(func(txID gethCommon.Hash, blkID flow.Identifier) (json.RawMessage, error) { - require.Equal(t, blockID, blkID) - time.Sleep(time.Millisecond * 200) // simulate download delay - return txTrace(txID), nil - }) - - stored := make(chan gethCommon.Hash, len(hashes)) - trace. - On("StoreTransaction", mock.Anything, mock.Anything, mock.Anything). - Return(func(ID gethCommon.Hash, trace json.RawMessage, _ *pebbleDB.Batch) error { - require.Equal(t, txTrace(ID), trace) - stored <- ID - return nil - }) - - engine := NewTracesIngestionEngine( - blockPublisher, - blocks, - trace, - downloader, - zerolog.Nop(), - metrics.NopCollector, - ) - - err := engine.Run(context.Background()) - require.NoError(t, err) - - blockPublisher.Publish(block) - - // make sure stored was called as many times as block contained hashes - require.Eventuallyf(t, func() bool { - return len(stored) == len(hashes) - }, time.Second, time.Millisecond*50, "index not run") - - close(stored) - storedHashes := make([]string, 0) - for h := range stored { - storedHashes = append(storedHashes, h.String()) - } - - // make sure we stored all the hashes in the block - for _, h := range hashes { - require.True(t, slices.Contains(storedHashes, h.String())) - } - }) - - t.Run("successful multiple blocks ingestion", func(t *testing.T) { - blocksPublisher := models.NewPublisher[*models.Block]() - blocks := &storageMock.BlockIndexer{} - trace := &storageMock.TraceIndexer{} - downloader := &mocks.Downloader{} - - txTrace := func(id gethCommon.Hash) json.RawMessage { - return json.RawMessage(fmt.Sprintf(`{ - "id": "%s", - "from":"0x42fdd562221741a1db62a0f69a5a680367f07e33", - "gas":"0x15f900", - "gasUsed":"0x387dc", - "to":"0xca11bde05977b3631167028862be2a173976ca11" - }`, id.String())) - } - - latestHeight := uint64(0) - - const blockCount = 10 - const txCount = 50 - - // generate mock blocks, each with mock transactions - mockBlocks := make([]*models.Block, blockCount+1) - mockCadenceIDs := make([]flow.Identifier, blockCount+1) - - for i := range mockBlocks { - b := storageMock.NewBlock(uint64(i)) - cid := flow.Identifier{byte(i + 10)} - - h := make([]gethCommon.Hash, txCount) - for j := range h { - h[j] = gethCommon.Hash{byte(j), byte(i)} - } - - b.TransactionHashes = h - mockBlocks[i] = b - mockCadenceIDs[i] = cid - } - - blocks. - On("GetCadenceID", mock.Anything). - Return(func(height uint64) (flow.Identifier, error) { - latestHeight++ - require.Equal(t, latestHeight, height) - require.Less(t, int(height), len(mockCadenceIDs)) - return mockCadenceIDs[height], nil - }) - - downloadedIDs := make(chan string, blockCount*txCount) - downloader. - On("Download", mock.Anything, mock.Anything). - Return(func(txID gethCommon.Hash, blkID flow.Identifier) (json.RawMessage, error) { - id := fmt.Sprintf("%s-%s", blkID.String(), txID.String()) - downloadedIDs <- id - time.Sleep(time.Millisecond * 200) // simulate download delay - return txTrace(txID), nil - }) - - stored := make(chan gethCommon.Hash, blockCount*txCount) - trace. - On("StoreTransaction", mock.Anything, mock.Anything, mock.Anything). - Return(func(ID gethCommon.Hash, trace json.RawMessage, _ *pebbleDB.Batch) error { - require.Equal(t, txTrace(ID), trace) - stored <- ID - return nil - }) - - engine := NewTracesIngestionEngine( - blocksPublisher, - blocks, - trace, - downloader, - zerolog.Nop(), - metrics.NopCollector, - ) - - err := engine.Run(context.Background()) - require.NoError(t, err) - - for i := 0; i < blockCount; i++ { - blocksPublisher.Publish(mockBlocks[i+1]) - time.Sleep(time.Millisecond * 100) // simulate block delay - } - - // make sure download was called as many times as all blocks times the hashes it contained - require.Eventuallyf(t, func() bool { - return len(downloadedIDs) == blockCount*txCount - }, time.Second*10, time.Millisecond*100, "traces not downloaded") - - close(downloadedIDs) - - // make sure stored was called as many times as all blocks times the hashes it contained - require.Eventuallyf(t, func() bool { - return len(stored) == blockCount*txCount - }, time.Second*10, time.Millisecond*100, "traces not indexed") - - close(stored) - - // make sure we downloaded and indexed all the hashes in the block - for id := range downloadedIDs { - found := false - for _, b := range mockBlocks { - for _, h := range b.TransactionHashes { - txID := strings.Split(id, "-")[1] - if txID == h.String() { - found = true - break - } - } - if found { - break - } - } - require.True(t, found, fmt.Sprintf("id %s not found", id)) - } - }) - - t.Run("failed download retries", func(t *testing.T) { - blockBroadcaster := models.NewPublisher[*models.Block]() - blocks := &storageMock.BlockIndexer{} - downloader := &mocks.Downloader{} - trace := &storageMock.TraceIndexer{} - logger := zerolog.New(zerolog.NewTestWriter(t)) - collector := metrics.NopCollector - - latestHeight := uint64(0) - blockID := flow.Identifier{0x09} - hashes := []gethCommon.Hash{{0x1}} - block := storageMock.NewBlock(latestHeight + 1) - block.TransactionHashes = hashes - - blocks. - On("GetByHeight", mock.Anything). - Return(func(height uint64) (*models.Block, error) { - require.Equal(t, latestHeight+1, height) // make sure it gets next block - return block, nil - }) - - blocks. - On("GetCadenceID", mock.Anything). - Return(func(height uint64) (flow.Identifier, error) { - require.Equal(t, latestHeight+1, height) - return blockID, nil - }) - - const retriesNum = 3 - downloads := make(chan struct{}, retriesNum) - downloader. - On("Download", mock.Anything, mock.Anything). - Return(func(txID gethCommon.Hash, blkID flow.Identifier) (json.RawMessage, error) { - downloads <- struct{}{} - return nil, fmt.Errorf("failed download") - }) - - engine := NewTracesIngestionEngine( - blockBroadcaster, - blocks, - trace, - downloader, - logger, - collector, - ) - - err := engine.Run(context.Background()) - require.NoError(t, err) - - blockBroadcaster.Publish(block) - - // make sure stored was called as many times as block contained hashes - require.Eventuallyf(t, func() bool { - return len(downloads) == retriesNum - }, time.Second*10, time.Millisecond*200, "download not retried") - - close(downloads) - }) -} diff --git a/services/traces/mocks/Downloader.go b/services/traces/mocks/Downloader.go deleted file mode 100644 index 83353acb0..000000000 --- a/services/traces/mocks/Downloader.go +++ /dev/null @@ -1,61 +0,0 @@ -// Code generated by mockery v2.43.2. DO NOT EDIT. - -package mocks - -import ( - flow "github.com/onflow/flow-go-sdk" - common "github.com/onflow/go-ethereum/common" - - json "encoding/json" - - mock "github.com/stretchr/testify/mock" -) - -// Downloader is an autogenerated mock type for the Downloader type -type Downloader struct { - mock.Mock -} - -// Download provides a mock function with given fields: txID, blockIO -func (_m *Downloader) Download(txID common.Hash, blockIO flow.Identifier) (json.RawMessage, error) { - ret := _m.Called(txID, blockIO) - - if len(ret) == 0 { - panic("no return value specified for Download") - } - - var r0 json.RawMessage - var r1 error - if rf, ok := ret.Get(0).(func(common.Hash, flow.Identifier) (json.RawMessage, error)); ok { - return rf(txID, blockIO) - } - if rf, ok := ret.Get(0).(func(common.Hash, flow.Identifier) json.RawMessage); ok { - r0 = rf(txID, blockIO) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(json.RawMessage) - } - } - - if rf, ok := ret.Get(1).(func(common.Hash, flow.Identifier) error); ok { - r1 = rf(txID, blockIO) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// NewDownloader creates a new instance of Downloader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewDownloader(t interface { - mock.TestingT - Cleanup(func()) -}) *Downloader { - mock := &Downloader{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -}