diff --git a/eth/stagedsync/stage_finish.go b/eth/stagedsync/stage_finish.go index 39560402703..61aaaa59e04 100644 --- a/eth/stagedsync/stage_finish.go +++ b/eth/stagedsync/stage_finish.go @@ -1,14 +1,18 @@ package stagedsync import ( + "bytes" "context" - "fmt" + "encoding/binary" + "reflect" "github.com/ledgerwatch/erigon/params" + "github.com/ledgerwatch/erigon/rlp" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/log/v3" ) @@ -124,29 +128,46 @@ func PruneFinish(u *PruneState, tx kv.RwTx, cfg FinishCfg, ctx context.Context) return nil } -func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, unwindTo *uint64, notifier ChainEventNotifier, tx kv.Tx) error { - notifyTo, err := stages.GetStageProgress(tx, stages.Finish) // because later stages can be disabled - if err != nil { - return err - } - notifyFrom := finishStageBeforeSync - if unwindTo != nil && *unwindTo != 0 && (*unwindTo) < finishStageBeforeSync { - notifyFrom = *unwindTo + 1 - } +func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishStageAfterSync uint64, unwindTo *uint64, notifier ChainEventNotifier, tx kv.Tx) error { + if notifier == nil { - log.Warn("rpc notifier is not set, rpc daemon won't be updated about headers") + log.Trace("RPC Daemon notification channel not set. No headers notifications will be sent") return nil } - for i := notifyFrom; i <= notifyTo; i++ { - header := rawdb.ReadHeaderByNumber(tx, i) - if header == nil { - return fmt.Errorf("could not find canonical header for number: %d", i) + // Notify all headers we have (either canonical or not) in a maximum range span of 1024 + var notifyFrom uint64 + if unwindTo != nil && *unwindTo != 0 && (*unwindTo) < finishStageBeforeSync { + notifyFrom = *unwindTo + } else { + heightSpan := finishStageAfterSync - finishStageBeforeSync + if heightSpan > 1024 { + heightSpan = 1024 } - notifier.OnNewHeader(header) + notifyFrom = finishStageAfterSync - heightSpan } - log.Info("Updated current block for the RPC API", "from", notifyFrom, "to", notifyTo) + startKey := make([]byte, reflect.TypeOf(notifyFrom).Size()+32) + var notifyTo uint64 + binary.BigEndian.PutUint64(startKey, notifyFrom) + if err := tx.ForEach(kv.Headers, startKey, func(k, headerRLP []byte) error { + if len(headerRLP) == 0 { + return nil + } + header := new(types.Header) + if err := rlp.Decode(bytes.NewReader(headerRLP), header); err != nil { + log.Error("Invalid block header RLP", "err", err) + return err + } + notifyTo = header.Number.Uint64() + notifier.OnNewHeader(header) + return libcommon.Stopped(ctx.Done()) + }); err != nil { + log.Error("RPC Daemon notification failed", "error", err) + return err + } + log.Info("RPC Daemon notified of new headers", "from", notifyFrom, "to", notifyTo) return nil + } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 7cac20c053b..9e7f71a2300 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -172,20 +172,18 @@ func StageLoopStep( } updateHead(ctx, head, headHash, headTd256) - if notifications.Accumulator != nil { + if notifications != nil && notifications.Accumulator != nil { if err := db.View(ctx, func(tx kv.Tx) error { header := rawdb.ReadCurrentHeader(tx) if header == nil { return nil } + pendingBaseFee := misc.CalcBaseFee(notifications.Accumulator.ChainConfig(), header) notifications.Accumulator.SendAndReset(ctx, notifications.StateChangesConsumer, pendingBaseFee.Uint64()) - err = stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, sync.PrevUnwindPoint(), notifications.Events, tx) - if err != nil { - return err - } - return nil + return stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, head, sync.PrevUnwindPoint(), notifications.Events, tx) + }); err != nil { return err }