diff --git a/eth/stagedsync/stage_finish.go b/eth/stagedsync/stage_finish.go index 9a96e47f0e0..a90be80bc37 100644 --- a/eth/stagedsync/stage_finish.go +++ b/eth/stagedsync/stage_finish.go @@ -5,9 +5,10 @@ import ( "context" "encoding/binary" "fmt" - "github.com/ledgerwatch/erigon-lib/kv/dbutils" "time" + "github.com/ledgerwatch/erigon-lib/kv/dbutils" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/hexutility" "github.com/ledgerwatch/erigon-lib/gointerfaces" @@ -148,18 +149,18 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishS var notifyTo = notifyFrom var notifyToHash libcommon.Hash var headersRlp [][]byte - if err := tx.ForEach(kv.Headers, hexutility.EncodeTs(notifyFrom), func(k, headerRLP []byte) error { - if len(headerRLP) == 0 { + if err := tx.ForEach(kv.HeaderCanonical, hexutility.EncodeTs(notifyFrom), func(k, hash []byte) (err error) { + if len(hash) == 0 { return nil } - notifyTo = binary.BigEndian.Uint64(k) - var err error - if notifyToHash, err = blockReader.CanonicalHash(ctx, tx, notifyTo); err != nil { - logger.Warn("[Finish] failed checking if header is cannonical") + blockNum := binary.BigEndian.Uint64(k) + if blockNum > finishStageAfterSync { //[from,to) + return nil } - - headerHash := libcommon.BytesToHash(k[8:]) - if notifyToHash == headerHash { + notifyTo = blockNum + notifyToHash = libcommon.BytesToHash(hash) + headerRLP := rawdb.ReadHeaderRLP(tx, notifyToHash, notifyTo) + if headerRLP != nil { headersRlp = append(headersRlp, libcommon.CopyBytes(headerRLP)) } @@ -182,7 +183,7 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishS notifier.OnLogs(logs) } logTiming := time.Since(t) - logger.Info("RPC Daemon notified of new headers", "from", notifyFrom-1, "to", notifyTo, "hash", notifyToHash, "header sending", headerTiming, "log sending", logTiming) + logger.Info("RPC Daemon notified of new headers", "from", notifyFrom-1, "to", notifyTo, "amount", len(headersRlp), "hash", notifyToHash, "header sending", headerTiming, "log sending", logTiming) } return nil } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index e7173f52493..e9c1ca0f492 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -310,7 +310,11 @@ func (h *Hook) afterRun(tx kv.Tx, finishProgressBefore uint64) error { } if notifications != nil && notifications.Events != nil { - if err = stagedsync.NotifyNewHeaders(h.ctx, finishProgressBefore, head, h.sync.PrevUnwindPoint(), notifications.Events, tx, h.logger, blockReader); err != nil { + finishStageAfterSync, err := stages.GetStageProgress(tx, stages.Finish) + if err != nil { + return err + } + if err = stagedsync.NotifyNewHeaders(h.ctx, finishProgressBefore, finishStageAfterSync, h.sync.PrevUnwindPoint(), notifications.Events, tx, h.logger, blockReader); err != nil { return nil } }