Skip to content

Commit

Permalink
remove heartbeat interval setting
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Oct 30, 2024
1 parent 197617c commit d3996e4
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 26 deletions.
7 changes: 6 additions & 1 deletion bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
Msg("indexing cadence height information")

// create event subscriber
subscriber := ingestion.NewRPCEventSubscriber(b.logger, b.client, b.config.FlowNetworkID, latestCadenceHeight, b.config.HeartbeatInterval)
subscriber := ingestion.NewRPCEventSubscriber(
b.logger,
b.client,
b.config.FlowNetworkID,
latestCadenceHeight,
)

// initialize event ingestion engine
b.events = ingestion.NewEventIngestionEngine(
Expand Down
1 change: 0 additions & 1 deletion cmd/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func init() {
Cmd.Flags().Float64Var(&cfg.StreamLimit, "stream-limit", 10, "Rate-limits the events sent to the client within one second")
Cmd.Flags().Uint64Var(&cfg.RateLimit, "rate-limit", 50, "Rate-limit requests per second made by the client over any protocol (ws/http)")
Cmd.Flags().StringVar(&cfg.AddressHeader, "address-header", "", "Address header that contains the client IP, this is useful when the server is behind a proxy that sets the source IP of the client. Leave empty if no proxy is used.")
Cmd.Flags().Uint64Var(&cfg.HeartbeatInterval, "heartbeat-interval", 100, "Heartbeat interval for AN event subscription")
Cmd.Flags().UintVar(&cfg.CacheSize, "script-cache-size", 10000, "Cache size used for script execution in items kept in cache")
Cmd.Flags().IntVar(&streamTimeout, "stream-timeout", 3, "Defines the timeout in seconds the server waits for the event to be sent to the client")
Cmd.Flags().Uint64Var(&forceStartHeight, "force-start-height", 0, "Force set starting Cadence height. WARNING: This should only be used locally or for testing, never in production.")
Expand Down
2 changes: 0 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ type Config struct {
FilterExpiry time.Duration
// ForceStartCadenceHeight will force set the starting Cadence height, this should be only used for testing or locally.
ForceStartCadenceHeight uint64
// HeartbeatInterval sets custom heartbeat interval for events
HeartbeatInterval uint64
// TracesBucketName sets the GCP bucket name where transaction traces are being stored.
TracesBucketName string
// TracesEnabled sets whether the node is supporting transaction traces.
Expand Down
28 changes: 11 additions & 17 deletions services/ingestion/event_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,7 @@ type EventSubscriber interface {

var _ EventSubscriber = &RPCEventSubscriber{}

type RPCEventSubscriberConfig struct {
HeartbeatInterval uint64
}

type RPCEventSubscriber struct {
RPCEventSubscriberConfig

logger zerolog.Logger

client *requester.CrossSporkClient
Expand All @@ -51,14 +45,9 @@ func NewRPCEventSubscriber(
client *requester.CrossSporkClient,
chainID flowGo.ChainID,
startHeight uint64,
heartbeatInterval uint64,
) *RPCEventSubscriber {
logger = logger.With().Str("component", "subscriber").Logger()
return &RPCEventSubscriber{
RPCEventSubscriberConfig: RPCEventSubscriberConfig{
HeartbeatInterval: heartbeatInterval,
},

logger: logger,

client: client,
Expand Down Expand Up @@ -86,7 +75,7 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockE
Uint64("height", r.height).
Msg("height found in previous spork, starting to backfill")

// backfill all the missed eventsChan, handling of context cancellation is done by the producer
// backfill all the missed events, handling of context cancellation is done by the producer
for ev := range r.backfill(ctx, r.height) {
eventsChan <- ev

Expand All @@ -109,8 +98,7 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockE
Msg("backfilling done, subscribe for live data")

// subscribe in the current spork, handling of context cancellation is done by the producer
// TODO(JanezP): I think the heartbeat interval should always be 1 here
for ev := range r.subscribe(ctx, r.height, access.WithHeartbeatInterval(r.HeartbeatInterval)) {
for ev := range r.subscribe(ctx, r.height) {
eventsChan <- ev
}

Expand All @@ -124,7 +112,7 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockE
//
// Subscribing to EVM specific events and handle any disconnection errors
// as well as context cancellations.
func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64, opts ...access.SubscribeOption) <-chan models.BlockEvents {
func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

_, err := r.client.GetBlockHeaderByHeight(ctx, height)
Expand All @@ -134,7 +122,13 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64, opts
return eventsChan
}

eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(ctx, height, blocksFilter(r.chain), opts...)
// we always use heartbeat interval of 1 to have the least amount of delay from the access node
eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(
ctx,
height,
blocksFilter(r.chain),
access.WithHeartbeatInterval(1),
)
if err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf("failed to subscribe to events by block height: %d, with: %w", height, err),
Expand Down Expand Up @@ -230,7 +224,7 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan
Uint64("last-spork-height", latestHeight).
Msg("backfilling spork")

for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(1)) {
for ev := range r.subscribe(ctx, height) {
eventsChan <- ev

if ev.Err != nil {
Expand Down
10 changes: 5 additions & 5 deletions services/ingestion/event_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_Subscribing(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100)
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1)

events := subscriber.Subscribe(context.Background())

Expand Down Expand Up @@ -83,7 +83,7 @@ func Test_MissingBlockEvent(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100)
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1)

events := subscriber.Subscribe(context.Background())

Expand Down Expand Up @@ -185,7 +185,7 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100)
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1)

events := subscriber.Subscribe(context.Background())

Expand Down Expand Up @@ -248,7 +248,7 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100)
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1)

events := subscriber.Subscribe(context.Background())

Expand Down Expand Up @@ -310,7 +310,7 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100)
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1)

events := subscriber.Subscribe(context.Background())

Expand Down

0 comments on commit d3996e4

Please sign in to comment.