Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/downloader: remove header rollback mechanism #668

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 2 additions & 48 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,39 +1519,13 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
var (
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
rollbackErr error
mode = d.getMode()
mode = d.getMode()
gotHeaders = false // Wait for batches of headers to process
)
defer func() {
if rollback > 0 {
lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
if mode != LightSync {
lastFastBlock = d.blockchain.CurrentFastBlock().Number()
lastBlock = d.blockchain.CurrentBlock().Number()
}
if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block
// We're already unwinding the stack, only print the error to make it more visible
log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
}
curFastBlock, curBlock := common.Big0, common.Big0
if mode != LightSync {
curFastBlock = d.blockchain.CurrentFastBlock().Number()
curBlock = d.blockchain.CurrentBlock().Number()
}
log.Warn("Rolled back chain segment",
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
}
}()
// Wait for batches of headers to process
gotHeaders := false

for {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled

case headers := <-d.headerProcCh:
Expand Down Expand Up @@ -1595,8 +1569,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
return errStallingPeer
}
}
// Disable any rollback and return
rollback = 0
return nil
}
// Otherwise split the chunk of headers into batches and process them
Expand All @@ -1605,7 +1577,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Terminate if something failed in between processing chunks
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
default:
}
Expand All @@ -1632,40 +1603,23 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
frequency = 1
}
if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
rollbackErr = err

// If some headers were inserted, track them as uncertain
if (mode == FastSync || frequency > 1) && n > 0 && rollback == 0 {
rollback = chunk[0].Number.Uint64()
}
log.Warn("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
// All verifications passed, track all headers within the alloted limits
if mode == FastSync {
head := chunk[len(chunk)-1].Number.Uint64()
if head-rollback > uint64(fsHeaderSafetyNet) {
rollback = head - uint64(fsHeaderSafetyNet)
} else {
rollback = 1
}
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if mode == FullSync || mode == FastSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
case <-time.After(time.Second):
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunk, origin)
if len(inserts) != len(chunk) {
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
return fmt.Errorf("%w: stale headers", errBadPeer)
}
}
Expand Down
91 changes: 0 additions & 91 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,97 +973,6 @@ func testShiftedHeaderAttack(t *testing.T, protocol uint, mode SyncMode) {
assertOwnChain(t, tester, chain.len())
}

// Tests that upon detecting an invalid header, the recent ones are rolled back
// for various failure scenarios. Afterwards a full sync is attempted to make
// sure no state was corrupted.
func TestInvalidHeaderRollback66Fast(t *testing.T) { testInvalidHeaderRollback(t, eth.ETH66, FastSync) }

func testInvalidHeaderRollback(t *testing.T, protocol uint, mode SyncMode) {
t.Parallel()

tester := newTester()

// Create a small enough block chain to download
targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
chain := testChainBase.shorten(targetBlocks)

// Attempt to sync with an attacker that feeds junk during the fast sync phase.
// This should result in the last fsHeaderSafetyNet headers being rolled back.
missing := fsHeaderSafetyNet + MaxHeaderFetch + 1
fastAttackChain := chain.shorten(chain.len())
delete(fastAttackChain.headerm, fastAttackChain.chain[missing])
tester.newPeer("fast-attack", protocol, fastAttackChain)

if err := tester.sync("fast-attack", nil, mode); err == nil {
t.Fatalf("succeeded fast attacker synchronisation")
}
if head := tester.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch {
t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch)
}

// Attempt to sync with an attacker that feeds junk during the block import phase.
// This should result in both the last fsHeaderSafetyNet number of headers being
// rolled back, and also the pivot point being reverted to a non-block status.
missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
blockAttackChain := chain.shorten(chain.len())
delete(fastAttackChain.headerm, fastAttackChain.chain[missing]) // Make sure the fast-attacker doesn't fill in
delete(blockAttackChain.headerm, blockAttackChain.chain[missing])
tester.newPeer("block-attack", protocol, blockAttackChain)

if err := tester.sync("block-attack", nil, mode); err == nil {
t.Fatalf("succeeded block attacker synchronisation")
}
if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
}
if mode == FastSync {
if head := tester.CurrentBlock().NumberU64(); head != 0 {
t.Errorf("fast sync pivot block #%d not rolled back", head)
}
}

// Attempt to sync with an attacker that withholds promised blocks after the
// fast sync pivot point. This could be a trial to leave the node with a bad
// but already imported pivot block.
withholdAttackChain := chain.shorten(chain.len())
tester.newPeer("withhold-attack", protocol, withholdAttackChain)
tester.downloader.syncInitHook = func(uint64, uint64) {
for i := missing; i < withholdAttackChain.len(); i++ {
delete(withholdAttackChain.headerm, withholdAttackChain.chain[i])
}
tester.downloader.syncInitHook = nil
}
if err := tester.sync("withhold-attack", nil, mode); err == nil {
t.Fatalf("succeeded withholding attacker synchronisation")
}
if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
}
if mode == FastSync {
if head := tester.CurrentBlock().NumberU64(); head != 0 {
t.Errorf("fast sync pivot block #%d not rolled back", head)
}
}

// synchronise with the valid peer and make sure sync succeeds. Since the last rollback
// should also disable fast syncing for this process, verify that we did a fresh full
// sync. Note, we can't assert anything about the receipts since we won't purge the
// database of them, hence we can't use assertOwnChain.
tester.newPeer("valid", protocol, chain)
if err := tester.sync("valid", nil, mode); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
if hs := len(tester.ownHeaders); hs != chain.len() {
t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, chain.len())
}
if mode != LightSync {
if bs := len(tester.ownBlocks); bs != chain.len() {
t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, chain.len())
}
}
tester.terminate()
}

// Tests that a peer advertising a high TD doesn't get to stall the downloader
// afterwards by not sending any useful hashes.
func TestHighTDStarvationAttack66Full(t *testing.T) {
Expand Down
Loading