Skip to content

Commit

Permalink
core: remove callback parameter in InsertHeaderChain
Browse files Browse the repository at this point in the history
The semantics of InsertHeaderChain are now much simpler: it is now an
all-or-nothing operation. The new WriteStatus return value allows
callers to check for the canonicality of the insertion. This change
simplifies use of HeaderChain in package les, where the callback was
previously used to post chain events.
  • Loading branch information
fjl committed Dec 4, 2020
1 parent 8699099 commit cb3429c
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 113 deletions.
3 changes: 2 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2438,7 +2438,8 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i

bc.wg.Add(1)
defer bc.wg.Done()
return bc.hc.InsertHeaderChain(chain, nil, start)
_, err := bc.hc.InsertHeaderChain(chain, start)
return 0, err
}

// CurrentHeader retrieves the current head header of the canonical chain. The
Expand Down
93 changes: 44 additions & 49 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (hc *HeaderChain) GetBlockNumber(hash common.Hash) *uint64 {
}

type headerWriteResult struct {
status WriteStatus
ignored int
imported int
lastHash common.Hash
Expand All @@ -145,7 +146,7 @@ type headerWriteResult struct {
// without the real blocks. Hence, writing headers directly should only be done
// in two scenarios: pure-header mode of operation (light clients), or properly
// separated header/block phases (non-archive clients).
func (hc *HeaderChain) writeHeaders(headers []*types.Header, postWriteFn PostWriteCallback) (result *headerWriteResult, err error) {
func (hc *HeaderChain) writeHeaders(headers []*types.Header) (result *headerWriteResult, err error) {
if len(headers) == 0 {
return &headerWriteResult{}, nil
}
Expand All @@ -154,39 +155,25 @@ func (hc *HeaderChain) writeHeaders(headers []*types.Header, postWriteFn PostWri
return &headerWriteResult{}, consensus.ErrUnknownAncestor
}
var (
lastHeader *types.Header // Last successfully imported header
lastNumber uint64 // Last successfully imported number
lastHash common.Hash
externTd *big.Int // TD of successfully imported chain
lastNumber = headers[0].Number.Uint64() - 1 // Last successfully imported number
lastHash = headers[0].ParentHash // Last imported header hash
newTD = new(big.Int).Set(ptd) // Total difficulty of inserted chain

lastHeader *types.Header
inserted []numberHash // Ephemeral lookup of number/hash for the chain
firstInserted = -1 // Index of the first non-ignored header
)
lastHash, lastNumber = headers[0].ParentHash, headers[0].Number.Uint64()-1 // Already validated above

batch := hc.chainDb.NewBatch()
for i, header := range headers {
// Short circuit insertion if shutting down
if hc.procInterrupt() {
log.Debug("Premature abort during headers import")
// if we haven't done anything yet, we can return
if i == 0 {
return &headerWriteResult{}, errors.New("aborted")
}
// We only 'break' here - since we want to try and keep the
// db consistent
break
}
hash, number := header.Hash(), header.Number.Uint64()
if header.ParentHash != lastHash {
log.Warn("Non-contiguous header insertion", "parent", header.ParentHash, "expected", lastHash, "number", number)
break
}
externTd = new(big.Int).Add(header.Difficulty, ptd)
newTD.Add(newTD, header.Difficulty)

// If the header is already known, skip it, otherwise store
if !hc.HasHeader(hash, number) {
// Irrelevant of the canonical status, write the td and header to the database
rawdb.WriteTd(batch, hash, number, externTd)
hc.tdCache.Add(hash, new(big.Int).Set(externTd))
// Irrelevant of the canonical status, write the TD and header to the database.
rawdb.WriteTd(batch, hash, number, newTD)
hc.tdCache.Add(hash, new(big.Int).Set(newTD))

rawdb.WriteHeader(batch, header)
inserted = append(inserted, numberHash{number, hash})
Expand All @@ -196,22 +183,30 @@ func (hc *HeaderChain) writeHeaders(headers []*types.Header, postWriteFn PostWri
firstInserted = i
}
}
lastHeader, lastHash, lastNumber, ptd = header, hash, number, externTd
lastHeader, lastHash, lastNumber = header, hash, number
}

// Skip the slow disk write of all headers if interrupted.
if hc.procInterrupt() {
log.Debug("Premature abort during headers import")
return &headerWriteResult{}, errors.New("aborted")
}
// Commit to disk!
if err := batch.Write(); err != nil {
log.Crit("Failed to write headers", "error", err)
}
batch.Reset()

var (
head = hc.CurrentHeader().Number.Uint64()
localTd = hc.GetTd(hc.currentHeaderHash, head)
localTD = hc.GetTd(hc.currentHeaderHash, head)
status = SideStatTy
)
// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
reorg := externTd.Cmp(localTd) > 0
if !reorg && externTd.Cmp(localTd) == 0 {
reorg := newTD.Cmp(localTD) > 0
if !reorg && newTD.Cmp(localTD) == 0 {
if lastNumber < head {
reorg = true
} else if lastNumber == head {
Expand Down Expand Up @@ -275,31 +270,23 @@ func (hc *HeaderChain) writeHeaders(headers []*types.Header, postWriteFn PostWri
hc.currentHeader.Store(types.CopyHeader(lastHeader))
headHeaderGauge.Update(lastHeader.Number.Int64())

// Chain status is canonical since this insert was a reorg.
// Note that all inserts which have higher TD than existing are 'reorg'.
status = CanonStatTy
}
// Execute any post-write callback function
// - unless we're exiting
// - and unless we ignored everything
if postWriteFn != nil && !hc.procInterrupt() && len(inserted) > 0 {
// The postwrite function is called only for the last imported header.
// It is only used for lightchain event aggregation.
postWriteFn(lastHeader, status)

if len(inserted) == 0 {
status = NonStatTy
}
return &headerWriteResult{
status: status,
ignored: len(headers) - len(inserted),
imported: len(inserted),
lastHash: lastHash,
lastHeader: lastHeader,
}, nil
}

// PostWriteCallback is a callback function for inserting headers,
// which is called once, with the last successfully imported header in the batch.
// The reason for having it is:
// In light-chain mode, status should be processed and light chain events sent,
// whereas in a non-light mode this is not necessary since chain events are sent after inserting blocks.
type PostWriteCallback func(header *types.Header, status WriteStatus)

func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
Expand Down Expand Up @@ -351,14 +338,22 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int)
return 0, nil
}

// InsertHeaderChain inserts the given headers, and returns the
// index at which something went wrong, and the error (if any)
// The caller should hold applicable mutexes
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, postWriteFn PostWriteCallback, start time.Time) (int, error) {
// InsertHeaderChain inserts the given headers.
//
// The validity of the headers is NOT CHECKED by this method, i.e. they need to be
// validated by ValidateHeaderChain before calling InsertHeaderChain.
//
// This insert is all-or-nothing. If this returns an error, no headers were written,
// otherwise they were all processed successfully.
//
// The returned 'write status' says if the inserted headers are part of the canonical chain
// or a side chain.
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, start time.Time) (WriteStatus, error) {
if hc.procInterrupt() {
return 0, errors.New("aborted")
}
res, err := hc.writeHeaders(chain, postWriteFn)
res, err := hc.writeHeaders(chain)

// Report some public statistics so the user has a clue what's going on
context := []interface{}{
"count", res.imported,
Expand All @@ -377,7 +372,7 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, postWriteFn Post
context = append(context, []interface{}{"ignored", res.ignored}...)
}
log.Info("Imported new block headers", context...)
return 0, err
return res.status, err
}

// GetBlockHashesFromHash retrieves a number of block hashes starting at a given
Expand Down
73 changes: 24 additions & 49 deletions core/headerchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,37 +50,23 @@ func verifyUnbrokenCanonchain(hc *HeaderChain) error {
return nil
}

func testInsert(t *testing.T, hc *HeaderChain, chain []*types.Header, expInsert, expCanon, expSide int) error {
func testInsert(t *testing.T, hc *HeaderChain, chain []*types.Header, wantStatus WriteStatus, wantErr error) {
t.Helper()
gotInsert, gotCanon, gotSide := 0, 0, 0

_, err := hc.InsertHeaderChain(chain, func(header *types.Header, status WriteStatus) {
gotInsert++
switch status {
case CanonStatTy:
gotCanon++
default:
gotSide++
}
}, time.Now())

if gotInsert != expInsert {
t.Errorf("wrong number of callback invocations, got %d, exp %d", gotInsert, expInsert)
}
if gotCanon != expCanon {
t.Errorf("wrong number of canon headers, got %d, exp %d", gotCanon, expCanon)
}
if gotSide != expSide {
t.Errorf("wrong number of side headers, got %d, exp %d", gotSide, expSide)
status, err := hc.InsertHeaderChain(chain, time.Now())
if status != wantStatus {
t.Errorf("wrong write status from InsertHeaderChain: got %v, want %v", status, wantStatus)
}
// Always verify that the header chain is unbroken
if err := verifyUnbrokenCanonchain(hc); err != nil {
t.Fatal(err)
return err
}
return err
if !errors.Is(err, wantErr) {
t.Fatalf("unexpected error from InsertHeaderChain: %v", err)
}
}

// This test checks status reporting of InsertHeaderChain.
func TestHeaderInsertion(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
Expand All @@ -99,42 +85,31 @@ func TestHeaderInsertion(t *testing.T) {

// Inserting 64 headers on an empty chain, expecting
// 1 callbacks, 1 canon-status, 0 sidestatus,
if err := testInsert(t, hc, chainA[:64], 1, 1, 0); err != nil {
t.Fatal(err)
}
testInsert(t, hc, chainA[:64], CanonStatTy, nil)

// Inserting 64 identical headers, expecting
// 0 callbacks, 0 canon-status, 0 sidestatus,
if err := testInsert(t, hc, chainA[:64], 0, 0, 0); err != nil {
t.Fatal(err)
}
testInsert(t, hc, chainA[:64], NonStatTy, nil)

// Inserting the same some old, some new headers
// 1 callbacks, 1 canon, 0 side
if err := testInsert(t, hc, chainA[32:96], 1, 1, 0); err != nil {
t.Fatal(err)
}
testInsert(t, hc, chainA[32:96], CanonStatTy, nil)

// Inserting side blocks, but not overtaking the canon chain
if err := testInsert(t, hc, chainB[0:32], 1, 0, 1); err != nil {
t.Fatal(err)
}
testInsert(t, hc, chainB[0:32], SideStatTy, nil)

// Inserting more side blocks, but we don't have the parent
if err := testInsert(t, hc, chainB[34:36], 0, 0, 0); !errors.Is(err, consensus.ErrUnknownAncestor) {
t.Fatal(fmt.Errorf("Expected %v, got %v", consensus.ErrUnknownAncestor, err))
}
testInsert(t, hc, chainB[34:36], NonStatTy, consensus.ErrUnknownAncestor)

// Inserting more sideblocks, overtaking the canon chain
if err := testInsert(t, hc, chainB[32:97], 1, 1, 0); err != nil {
t.Fatal(err)
}
testInsert(t, hc, chainB[32:97], CanonStatTy, nil)

// Inserting more A-headers, taking back the canonicality
if err := testInsert(t, hc, chainA[90:100], 1, 1, 0); err != nil {
t.Fatal(err)
}
testInsert(t, hc, chainA[90:100], CanonStatTy, nil)

// And B becomes canon again
if err := testInsert(t, hc, chainB[97:107], 1, 1, 0); err != nil {
t.Fatal(err)
}
testInsert(t, hc, chainB[97:107], CanonStatTy, nil)

// And B becomes even longer
if err := testInsert(t, hc, chainB[107:128], 1, 1, 0); err != nil {
t.Fatal(err)
}
testInsert(t, hc, chainB[107:128], CanonStatTy, nil)
}
32 changes: 18 additions & 14 deletions light/lightchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,22 +396,26 @@ func (lc *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
lc.wg.Add(1)
defer lc.wg.Done()

var events []interface{}
postWriteCallback := func(header *types.Header, status core.WriteStatus) {
hash := header.Hash()
switch status {
case core.CanonStatTy:
log.Debug("Inserted new header", "number", header.Number, "hash", hash)
events = append(events, core.ChainEvent{Block: types.NewBlockWithHeader(header), Hash: hash})

case core.SideStatTy:
log.Debug("Inserted forked header", "number", header.Number, "hash", hash)
events = append(events, core.ChainSideEvent{Block: types.NewBlockWithHeader(header)})
}
status, err := lc.hc.InsertHeaderChain(chain, start)
if err != nil || len(chain) == 0 {
return 0, err
}

// Create chain event for the new head block of this insertion.
var (
events = make([]interface{}, 0, 1)
lastHeader = chain[len(chain)-1]
block = types.NewBlockWithHeader(lastHeader)
)
switch status {
case core.CanonStatTy:
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash()})
case core.SideStatTy:
events = append(events, core.ChainSideEvent{Block: block})
}
i, err := lc.hc.InsertHeaderChain(chain, postWriteCallback, start)
lc.postChainEvents(events)
return i, err

return 0, err
}

// CurrentHeader retrieves the current head header of the canonical chain. The
Expand Down

0 comments on commit cb3429c

Please sign in to comment.