Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: rework tx indexer #25723

Merged
merged 4 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 55 additions & 76 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,22 +290,16 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.currentFinalizedBlock.Store(nilBlock)
bc.currentSafeBlock.Store(nilBlock)

// Initialize the chain with ancient data if it isn't empty.
var txIndexBlock uint64

// If Geth is initialized with an external ancient store, re-initialize the
// missing chain indexes and chain flags. This procedure can survive crash
// and can be resumed in next restart since chain flags are updated in last step.
if bc.empty() {
rawdb.InitDatabaseFromFreezer(bc.db)
// If ancient database is not empty, reconstruct all missing
// indices in the background.
frozen, _ := bc.db.Ancients()
if frozen > 0 {
txIndexBlock = frozen
}
}
// Load blockchain states from disk
if err := bc.loadLastState(); err != nil {
return nil, err
}

// Make sure the state associated with the block is available
head := bc.CurrentBlock()
if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil {
Expand Down Expand Up @@ -413,14 +407,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.wg.Add(1)
go bc.updateFutureBlocks()

// Start tx indexer/unindexer.
if txLookupLimit != nil {
bc.txLookupLimit = *txLookupLimit

bc.wg.Add(1)
go bc.maintainTxIndex(txIndexBlock)
}

// If periodic cache journal is required, spin it up.
if bc.cacheConfig.TrieCleanRejournal > 0 {
if bc.cacheConfig.TrieCleanRejournal < time.Minute {
Expand All @@ -440,6 +426,13 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.SetHead(compat.RewindTo)
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}
// Start tx indexer/unindexer if required.
if txLookupLimit != nil {
bc.txLookupLimit = *txLookupLimit

bc.wg.Add(1)
go bc.maintainTxIndex()
}
return bc, nil
}

Expand Down Expand Up @@ -2286,72 +2279,58 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
return false
}

// indexBlocks reindexes or unindexes transactions depending on user configuration
func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) {
defer func() { close(done) }()

// The tail flag is not existent, it means the node is just initialized
// and all blocks(may from ancient store) are not indexed yet.
if tail == nil {
from := uint64(0)
if bc.txLookupLimit != 0 && head >= bc.txLookupLimit {
from = head - bc.txLookupLimit + 1
}
rawdb.IndexTransactions(bc.db, from, head+1, bc.quit)
return
}
// The tail flag is existent, but the whole chain is required to be indexed.
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
if *tail > 0 {
// It can happen when chain is rewound to a historical point which
// is even lower than the indexes tail, recap the indexing target
// to new head to avoid reading non-existent block bodies.
end := *tail
if end > head+1 {
end = head + 1
}
rawdb.IndexTransactions(bc.db, 0, end, bc.quit)
}
return
}
// Update the transaction index to the new chain state
if head-bc.txLookupLimit+1 < *tail {
// Reindex a part of missing indices and rewind index tail to HEAD-limit
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit)
} else {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit)
}
}

// maintainTxIndex is responsible for the construction and deletion of the
// transaction index.
//
// User can use flag `txlookuplimit` to specify a "recentness" block, below
// which ancient tx indices get deleted. If `txlookuplimit` is 0, it means
// all tx indices will be reserved.
//
// The user can adjust the txlookuplimit value for each launch after fast
// sync, Geth will automatically construct the missing indices and delete
// the extra indices.
func (bc *BlockChain) maintainTxIndex(ancients uint64) {
// The user can adjust the txlookuplimit value for each launch after sync,
// Geth will automatically construct the missing indices or delete the extra
// indices.
func (bc *BlockChain) maintainTxIndex() {
defer bc.wg.Done()

// Before starting the actual maintenance, we need to handle a special case,
// where user might init Geth with an external ancient database. If so, we
// need to reindex all necessary transactions before starting to process any
// pruning requests.
if ancients > 0 {
var from = uint64(0)
if bc.txLookupLimit != 0 && ancients > bc.txLookupLimit {
from = ancients - bc.txLookupLimit
}
rawdb.IndexTransactions(bc.db, from, ancients, bc.quit)
}

// indexBlocks reindexes or unindexes transactions depending on user configuration
indexBlocks := func(tail *uint64, head uint64, done chan struct{}) {
defer func() { done <- struct{}{} }()

// If the user just upgraded Geth to a new version which supports transaction
// index pruning, write the new tail and remove anything older.
if tail == nil {
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
// Nothing to delete, write the tail and return
rawdb.WriteTxIndexTail(bc.db, 0)
} else {
// Prune all stale tx indices and record the tx index tail
rawdb.UnindexTransactions(bc.db, 0, head-bc.txLookupLimit+1, bc.quit)
}
return
}
// If a previous indexing existed, make sure that we fill in any missing entries
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
if *tail > 0 {
// It can happen when chain is rewound to a historical point which
// is even lower than the indexes tail, recap the indexing target
// to new head to avoid reading non-existent block bodies.
end := *tail
if end > head+1 {
end = head + 1
}
rawdb.IndexTransactions(bc.db, 0, end, bc.quit)
}
return
}
// Update the transaction index to the new chain state
if head-bc.txLookupLimit+1 < *tail {
// Reindex a part of missing indices and rewind index tail to HEAD-limit
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit)
} else {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit)
}
}

// Any reindexing done, start listening to chain events and moving the index window
// Listening to chain events and manipulate the transaction indexes.
var (
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
Expand All @@ -2367,7 +2346,7 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) {
case head := <-headCh:
if done == nil {
done = make(chan struct{})
go indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
}
case <-done:
done = nil
Expand Down
Loading