Skip to content

Commit

Permalink
handle syncing more graciously
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jul 12, 2022
1 parent 7d3b818 commit 2a76821
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 0 deletions.
16 changes: 16 additions & 0 deletions pkg/exporter/consensus/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func (n *node) Start(ctx context.Context) error {
return err
}

if err := n.fetchSyncStatus(ctx); err != nil {
return err
}

s := gocron.NewScheduler(time.Local)

if _, err := s.Every("15s").Do(func() {
Expand Down Expand Up @@ -254,6 +258,10 @@ func (n *node) fetchPeers(ctx context.Context) error {
func (n *node) subscribeToSelf(ctx context.Context) error {
// Listen for beacon block events and insert them in to our state
if _, err := n.OnBlock(ctx, func(ctx context.Context, ev *v1.BlockEvent) error {
if n.syncing.IsSyncing {
return nil
}

start := time.Now()

// Grab the entire block from the beacon node
Expand Down Expand Up @@ -316,6 +324,10 @@ func (n *node) handleDownstreamBlockInserted(ctx context.Context, epoch phase0.E
}

func (n *node) handleDownstreamEmptySlot(ctx context.Context, epoch phase0.Epoch, slot state.Slot) error {
if n.syncing.IsSyncing {
return nil
}

if err := n.publishEmptySlot(ctx, slot.Number()); err != nil {
return err
}
Expand All @@ -339,6 +351,10 @@ func (n *node) handleStateEpochSlotChanged(ctx context.Context, epochNumber phas
continue
}

if n.syncing.IsSyncing {
continue
}

if err := n.fetchEpochProposerDuties(ctx, i); err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/exporter/consensus/beacon/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func (n *node) ensureBeaconSubscription(ctx context.Context) error {
continue
}

// Only resubscribe if we haven't received an event after 5 minutes.
if time.Since(n.lastEventTime) < (5 * time.Minute) {
continue
}
Expand Down Expand Up @@ -76,6 +77,13 @@ func (n *node) handleEvent(ctx context.Context, event *v1.Event) error {
n.log.WithError(err).Error("Failed to publish raw event")
}

// If we are syncing, only forward on "head" and "block" events
if n.syncing.IsSyncing {
if event.Topic != topicBlock && event.Topic != topicHead {
return nil
}
}

switch event.Topic {
case topicAttestation:
return n.handleAttestation(ctx, event)
Expand Down
1 change: 1 addition & 0 deletions pkg/exporter/consensus/jobs/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ func (b *Beacon) handleSingleBlock(blockID string, block *spec.VersionedSignedBe
b.Attestations.Reset()
b.Deposits.Reset()
b.VoluntaryExits.Reset()
b.Slot.Reset()

b.currentVersion = block.Version.String()
}
Expand Down

0 comments on commit 2a76821

Please sign in to comment.