Skip to content

Commit

Permalink
core, light: write headerchains in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
holiman committed Nov 25, 2020
1 parent 7ac6513 commit b58fe2d
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 130 deletions.
7 changes: 1 addition & 6 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2438,12 +2438,7 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i

bc.wg.Add(1)
defer bc.wg.Done()

whFunc := func(header *types.Header) error {
_, err := bc.hc.WriteHeader(header)
return err
}
return bc.hc.InsertHeaderChain(chain, whFunc, start)
return bc.hc.InsertHeaderChain(chain, nil, start)
}

// CurrentHeader retrieves the current head header of the canonical chain. The
Expand Down
269 changes: 161 additions & 108 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,107 +129,182 @@ func (hc *HeaderChain) GetBlockNumber(hash common.Hash) *uint64 {
return number
}

// WriteHeader writes a header into the local chain, given that its parent is
// already known. If the total difficulty of the newly inserted header becomes
// greater than the current known TD, the canonical chain is re-routed.
type headerWriteResult struct {
ignored int
imported int
status WriteStatus
lastHash common.Hash
lastHeader *types.Header
}

// WriteHeaders writes a chain of headers into the local chain, given that the parents
// are already known. If the total difficulty of the newly inserted chain becomes
// greater than the current known TD, the canonical chain is reorged.
//
// Note: This method is not concurrent-safe with inserting blocks simultaneously
// into the chain, as side effects caused by reorganisations cannot be emulated
// without the real blocks. Hence, writing headers directly should only be done
// in two scenarios: pure-header mode of operation (light clients), or properly
// separated header/block phases (non-archive clients).
func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, err error) {
// Cache some values to prevent constant recalculation
func (hc *HeaderChain) WriteHeaders(headers []*types.Header, postWriteFn PostWriteCallback) (result *headerWriteResult, err error) {
if len(headers) == 0 {
return &headerWriteResult{}, nil
}
ptd := hc.GetTd(headers[0].ParentHash, headers[0].Number.Uint64()-1)
if ptd == nil {
return &headerWriteResult{}, consensus.ErrUnknownAncestor
}
var (
hash = header.Hash()
number = header.Number.Uint64()
lastHeader *types.Header // Last successfully imported header
lastNumber uint64 // Last successfully imported number
lastHash common.Hash
externTd *big.Int // TD of successfully imported chain
inserted []numberHash // Ephemeral lookup of number/hash for the chain
firstInsertedIndex = -1 // Index of the first non-ignored header
)
// Calculate the total difficulty of the header
ptd := hc.GetTd(header.ParentHash, number-1)
if ptd == nil {
return NonStatTy, consensus.ErrUnknownAncestor
}
head := hc.CurrentHeader().Number.Uint64()
localTd := hc.GetTd(hc.currentHeaderHash, head)
externTd := new(big.Int).Add(header.Difficulty, ptd)

// Irrelevant of the canonical status, write the td and header to the database
//
// Note all the components of header(td, hash->number index and header) should
// be written atomically.
headerBatch := hc.chainDb.NewBatch()
rawdb.WriteTd(headerBatch, hash, number, externTd)
rawdb.WriteHeader(headerBatch, header)
if err := headerBatch.Write(); err != nil {
log.Crit("Failed to write header into disk", "err", err)
lastHash, lastNumber = headers[0].ParentHash, headers[0].Number.Uint64()-1 // Already validated above
batch := hc.chainDb.NewBatch()
for i, header := range headers {
// Short circuit insertion if shutting down
if hc.procInterrupt() {
log.Debug("Premature abort during headers import")
// if we haven't done anything yet, we can return
if i == 0 {
return &headerWriteResult{}, errors.New("aborted")
}
// We only 'break' here - since we want to try and keep the
// db consistent
break
}
hash, number := header.Hash(), header.Number.Uint64()
if header.ParentHash != lastHash {
log.Warn("Non-contiguous header insertion", "header.parent", header.ParentHash, "expected", hash, "number", number)
break
}
externTd = new(big.Int).Add(header.Difficulty, ptd)

// If the header is already known, skip it, otherwise store
if !hc.HasHeader(hash, number) {
// Irrelevant of the canonical status, write the td and header to the database
rawdb.WriteTd(batch, hash, number, externTd)
hc.tdCache.Add(hash, new(big.Int).Set(externTd))

rawdb.WriteHeader(batch, header)
inserted = append(inserted, numberHash{number, hash})
hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number)
if firstInsertedIndex < 0 {
firstInsertedIndex = i
}
}
lastHeader, lastHash, lastNumber, ptd = header, hash, number, externTd
}
batch.Write()
batch.Reset()
var (
head = hc.CurrentHeader().Number.Uint64()
localTd = hc.GetTd(hc.currentHeaderHash, head)
status = SideStatTy
)
// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
reorg := externTd.Cmp(localTd) > 0
if !reorg && externTd.Cmp(localTd) == 0 {
if header.Number.Uint64() < head {
if lastNumber < head {
reorg = true
} else if header.Number.Uint64() == head {
} else if lastNumber == head {
reorg = mrand.Float64() < 0.5
}
}
// If the parent of the (first) block is already the canon header,
// we don't have to go backwards to delete canon blocks, but
// simply pile them onto the existing chain
chainAlreadyCanon := headers[0].ParentHash == hc.currentHeaderHash
if reorg {
// If the header can be added into canonical chain, adjust the
// header chain markers(canonical indexes and head header flag).
//
// Note all markers should be written atomically.

// Delete any canonical number assignments above the new head
markerBatch := hc.chainDb.NewBatch()
for i := number + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
if hash == (common.Hash{}) {
break
markerBatch := batch // we can reuse the batch to keep allocs down
if !chainAlreadyCanon {
// Delete any canonical number assignments above the new head
for i := lastNumber + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
if hash == (common.Hash{}) {
break
}
rawdb.DeleteCanonicalHash(markerBatch, i)
}
// Overwrite any stale canonical number assignments, going
// backwards from the first header in this import
var (
headHash = headers[0].ParentHash // inserted[0].parent?
headNumber = headers[0].Number.Uint64() - 1 // inserted[0].num-1 ?
headHeader = hc.GetHeader(headHash, headNumber)
)
for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber)
headHash = headHeader.ParentHash
headNumber = headHeader.Number.Uint64() - 1
headHeader = hc.GetHeader(headHash, headNumber)
}
// If some of the older headers were already known, but obtained canon-status
// during this import batch, then we need to write that now
// Further down, we continue writing the staus for the ones that
// were not already known
for i := 0; i < firstInsertedIndex; i++ {
hash := headers[i].Hash()
num := headers[i].Number.Uint64()
rawdb.WriteCanonicalHash(markerBatch, hash, num)
rawdb.WriteHeadHeaderHash(markerBatch, hash)
}
rawdb.DeleteCanonicalHash(markerBatch, i)
}

// Overwrite any stale canonical number assignments
var (
headHash = header.ParentHash
headNumber = header.Number.Uint64() - 1
headHeader = hc.GetHeader(headHash, headNumber)
)
for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber)

headHash = headHeader.ParentHash
headNumber = headHeader.Number.Uint64() - 1
headHeader = hc.GetHeader(headHash, headNumber)
// Extend the canonical chain with the new headers
for _, hn := range inserted {
rawdb.WriteCanonicalHash(markerBatch, hn.hash, hn.number)
rawdb.WriteHeadHeaderHash(markerBatch, hn.hash)
}
// Extend the canonical chain with the new header
rawdb.WriteCanonicalHash(markerBatch, hash, number)
rawdb.WriteHeadHeaderHash(markerBatch, hash)
if err := markerBatch.Write(); err != nil {
log.Crit("Failed to write header markers into disk", "err", err)
}
markerBatch.Reset()
// Last step update all in-memory head header markers
hc.currentHeaderHash = hash
hc.currentHeader.Store(types.CopyHeader(header))
headHeaderGauge.Update(header.Number.Int64())
hc.currentHeaderHash = lastHash
hc.currentHeader.Store(types.CopyHeader(lastHeader))
headHeaderGauge.Update(lastHeader.Number.Int64())

status = CanonStatTy
} else {
status = SideStatTy
}
hc.tdCache.Add(hash, externTd)
hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number)
return
// Execute any post-write callback function
// - unless we're exiting
// - and unless we ignored everything
if postWriteFn != nil && !hc.procInterrupt() && firstInsertedIndex >= 0 {
// TODO: Is it really necessary to invoke this N times, instead of just
// invoking it for the last header?
// It is only used for lightchain event aggregation
for _, header := range headers[firstInsertedIndex:] {
if header.Number.Uint64() > lastNumber {
break
}
postWriteFn(header, status)
}
}
return &headerWriteResult{
ignored: len(headers) - len(inserted),
imported: len(inserted),
status: status,
lastHash: lastHash,
lastHeader: lastHeader,
}, nil
}

// WhCallback is a callback function for inserting individual headers.
// A callback is used for two reasons: first, in a LightChain, status should be
// processed and light chain events sent, while in a BlockChain this is not
// necessary since chain events are sent after inserting blocks. Second, the
// header writes should be protected by the parent chain mutex individually.
type WhCallback func(*types.Header) error
// PostWriteCallback is a callback function for inserting headers,
// which is called after each header is inserted.
// The reson for having it is:
// In light-chain mode, status should be processed and light chain events sent,
// whereas in a non-light mode this is not necessary since chain events are sent after inserting blocks.
type PostWriteCallback func(header *types.Header, status WriteStatus)

func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
// Do a sanity check that the provided chain is actually ordered and linked
Expand Down Expand Up @@ -282,55 +357,33 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int)
return 0, nil
}

// InsertHeaderChain attempts to insert the given header chain in to the local
// chain, possibly creating a reorg. If an error is returned, it will return the
// index number of the failing header as well an error describing what went wrong.
//
// The verify parameter can be used to fine tune whether nonce verification
// should be done or not. The reason behind the optional check is because some
// of the header retrieval mechanisms already need to verfy nonces, as well as
// because nonces can be verified sparsely, not needing to check each.
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCallback, start time.Time) (int, error) {
// Collect some import statistics to report on
stats := struct{ processed, ignored int }{}
// All headers passed verification, import them into the database
for i, header := range chain {
// Short circuit insertion if shutting down
if hc.procInterrupt() {
log.Debug("Premature abort during headers import")
return i, errors.New("aborted")
}
// If the header's already known, skip it, otherwise store
hash := header.Hash()
if hc.HasHeader(hash, header.Number.Uint64()) {
externTd := hc.GetTd(hash, header.Number.Uint64())
localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64())
if externTd == nil || externTd.Cmp(localTd) <= 0 {
stats.ignored++
continue
}
}
if err := writeHeader(header); err != nil {
return i, err
}
stats.processed++
// InsertHeaderChain inserts the given headers, and returns the
// index at which something went wrong, and the error (if any)
// The caller should hold applicable mutexes
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, postWriteFn PostWriteCallback, start time.Time) (int, error) {
if hc.procInterrupt() {
return 0, errors.New("aborted")
}
res, err := hc.WriteHeaders(chain, postWriteFn)
// Report some public statistics so the user has a clue what's going on
last := chain[len(chain)-1]

context := []interface{}{
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
"number", last.Number, "hash", last.Hash(),
"count", res.imported,
"elapsed", common.PrettyDuration(time.Since(start)),
}
if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
if err != nil {
context = append(context, "err", err)
}
if last := res.lastHeader; last != nil {
context = append(context, "number", last.Number, "hash", res.lastHash)
if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
}
if stats.ignored > 0 {
context = append(context, []interface{}{"ignored", stats.ignored}...)
if res.ignored > 0 {
context = append(context, []interface{}{"ignored", res.ignored}...)
}
log.Info("Imported new block headers", context...)

return 0, nil
return 0, err
}

// GetBlockHashesFromHash retrieves a number of block hashes starting at a given
Expand Down
10 changes: 2 additions & 8 deletions core/headerchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,14 @@ func testInsert(t *testing.T, hc *HeaderChain, chain []*types.Header, expInsert,
t.Helper()
gotInsert, gotCanon, gotSide := 0, 0, 0

_, err := hc.InsertHeaderChain(chain, func(header *types.Header) error {
status, err := hc.WriteHeader(header)
if err != nil{
return err
}
_, err := hc.InsertHeaderChain(chain, func(header *types.Header, status WriteStatus) {
gotInsert++
switch status {
case CanonStatTy:
gotCanon++
default:
gotSide++
}
return nil

}, time.Now())

if gotInsert != expInsert {
Expand Down Expand Up @@ -109,7 +103,7 @@ func TestHeaderInsertion(t *testing.T) {
t.Fatal(err)
}

// Inserting 64 indentical headers, expecting
// Inserting 64 inentical headers, expecting
// 0 callbacks, 0 canon-status, 0 sidestatus,
if err := testInsert(t, hc, chainA[:64], 0, 0, 0); err != nil {
t.Fatal(err)
Expand Down
14 changes: 6 additions & 8 deletions light/lightchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,21 +397,19 @@ func (lc *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
defer lc.wg.Done()

var events []interface{}
whFunc := func(header *types.Header) error {
status, err := lc.hc.WriteHeader(header)

postWriteCallback := func(header *types.Header, status core.WriteStatus) {
hash := header.Hash()
switch status {
case core.CanonStatTy:
log.Debug("Inserted new header", "number", header.Number, "hash", header.Hash())
events = append(events, core.ChainEvent{Block: types.NewBlockWithHeader(header), Hash: header.Hash()})
log.Debug("Inserted new header", "number", header.Number, "hash", hash)
events = append(events, core.ChainEvent{Block: types.NewBlockWithHeader(header), Hash: hash})

case core.SideStatTy:
log.Debug("Inserted forked header", "number", header.Number, "hash", header.Hash())
log.Debug("Inserted forked header", "number", header.Number, "hash", hash)
events = append(events, core.ChainSideEvent{Block: types.NewBlockWithHeader(header)})
}
return err
}
i, err := lc.hc.InsertHeaderChain(chain, whFunc, start)
i, err := lc.hc.InsertHeaderChain(chain, postWriteCallback, start)
lc.postChainEvents(events)
return i, err
}
Expand Down

0 comments on commit b58fe2d

Please sign in to comment.