diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index df8e53a7..27bc648c 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -117,7 +117,12 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error { Msg("indexing cadence height information") // create event subscriber - subscriber := ingestion.NewRPCEventSubscriber(b.logger, b.client, b.config.FlowNetworkID, latestCadenceHeight, b.config.HeartbeatInterval) + subscriber := ingestion.NewRPCEventSubscriber( + b.logger, + b.client, + b.config.FlowNetworkID, + latestCadenceHeight, + ) // initialize event ingestion engine b.events = ingestion.NewEventIngestionEngine( diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index aa90aed2..2d9182c4 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -268,7 +268,6 @@ func init() { Cmd.Flags().Float64Var(&cfg.StreamLimit, "stream-limit", 10, "Rate-limits the events sent to the client within one second") Cmd.Flags().Uint64Var(&cfg.RateLimit, "rate-limit", 50, "Rate-limit requests per second made by the client over any protocol (ws/http)") Cmd.Flags().StringVar(&cfg.AddressHeader, "address-header", "", "Address header that contains the client IP, this is useful when the server is behind a proxy that sets the source IP of the client. Leave empty if no proxy is used.") - Cmd.Flags().Uint64Var(&cfg.HeartbeatInterval, "heartbeat-interval", 100, "Heartbeat interval for AN event subscription") Cmd.Flags().UintVar(&cfg.CacheSize, "script-cache-size", 10000, "Cache size used for script execution in items kept in cache") Cmd.Flags().IntVar(&streamTimeout, "stream-timeout", 3, "Defines the timeout in seconds the server waits for the event to be sent to the client") Cmd.Flags().Uint64Var(&forceStartHeight, "force-start-height", 0, "Force set starting Cadence height. WARNING: This should only be used locally or for testing, never in production.") diff --git a/config/config.go b/config/config.go index 0c8b6559..13f57207 100644 --- a/config/config.go +++ b/config/config.go @@ -74,8 +74,6 @@ type Config struct { FilterExpiry time.Duration // ForceStartCadenceHeight will force set the starting Cadence height, this should be only used for testing or locally. ForceStartCadenceHeight uint64 - // HeartbeatInterval sets custom heartbeat interval for events - HeartbeatInterval uint64 // TracesBucketName sets the GCP bucket name where transaction traces are being stored. TracesBucketName string // TracesEnabled sets whether the node is supporting transaction traces. diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index d8508e68..47da8972 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -29,13 +29,7 @@ type EventSubscriber interface { var _ EventSubscriber = &RPCEventSubscriber{} -type RPCEventSubscriberConfig struct { - HeartbeatInterval uint64 -} - type RPCEventSubscriber struct { - RPCEventSubscriberConfig - logger zerolog.Logger client *requester.CrossSporkClient @@ -51,14 +45,9 @@ func NewRPCEventSubscriber( client *requester.CrossSporkClient, chainID flowGo.ChainID, startHeight uint64, - heartbeatInterval uint64, ) *RPCEventSubscriber { logger = logger.With().Str("component", "subscriber").Logger() return &RPCEventSubscriber{ - RPCEventSubscriberConfig: RPCEventSubscriberConfig{ - HeartbeatInterval: heartbeatInterval, - }, - logger: logger, client: client, @@ -86,7 +75,7 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockE Uint64("height", r.height). Msg("height found in previous spork, starting to backfill") - // backfill all the missed eventsChan, handling of context cancellation is done by the producer + // backfill all the missed events, handling of context cancellation is done by the producer for ev := range r.backfill(ctx, r.height) { eventsChan <- ev @@ -109,8 +98,7 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockE Msg("backfilling done, subscribe for live data") // subscribe in the current spork, handling of context cancellation is done by the producer - // TODO(JanezP): I think the heartbeat interval should always be 1 here - for ev := range r.subscribe(ctx, r.height, access.WithHeartbeatInterval(r.HeartbeatInterval)) { + for ev := range r.subscribe(ctx, r.height) { eventsChan <- ev } @@ -124,7 +112,7 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockE // // Subscribing to EVM specific events and handle any disconnection errors // as well as context cancellations. -func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64, opts ...access.SubscribeOption) <-chan models.BlockEvents { +func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents { eventsChan := make(chan models.BlockEvents) _, err := r.client.GetBlockHeaderByHeight(ctx, height) @@ -134,7 +122,13 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64, opts return eventsChan } - eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(ctx, height, blocksFilter(r.chain), opts...) + // we always use heartbeat interval of 1 to have the least amount of delay from the access node + eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight( + ctx, + height, + blocksFilter(r.chain), + access.WithHeartbeatInterval(1), + ) if err != nil { eventsChan <- models.NewBlockEventsError( fmt.Errorf("failed to subscribe to events by block height: %d, with: %w", height, err), @@ -230,7 +224,7 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan Uint64("last-spork-height", latestHeight). Msg("backfilling spork") - for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(1)) { + for ev := range r.subscribe(ctx, height) { eventsChan <- ev if ev.Err != nil { diff --git a/services/ingestion/event_subscriber_test.go b/services/ingestion/event_subscriber_test.go index 19151ea2..04626af2 100644 --- a/services/ingestion/event_subscriber_test.go +++ b/services/ingestion/event_subscriber_test.go @@ -43,7 +43,7 @@ func Test_Subscribing(t *testing.T) { ) require.NoError(t, err) - subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100) + subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1) events := subscriber.Subscribe(context.Background()) @@ -83,7 +83,7 @@ func Test_MissingBlockEvent(t *testing.T) { ) require.NoError(t, err) - subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100) + subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1) events := subscriber.Subscribe(context.Background()) @@ -185,7 +185,7 @@ func Test_SubscribingWithRetryOnError(t *testing.T) { ) require.NoError(t, err) - subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100) + subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1) events := subscriber.Subscribe(context.Background()) @@ -248,7 +248,7 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { ) require.NoError(t, err) - subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100) + subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1) events := subscriber.Subscribe(context.Background()) @@ -310,7 +310,7 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { ) require.NoError(t, err) - subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100) + subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1) events := subscriber.Subscribe(context.Background())