Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State trie garbage collection #15903

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions cmd/faucet/faucet.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ import (
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/ethstats"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -237,20 +235,10 @@ func newFaucet(genesis *core.Genesis, port int, enodes []*discv5.Node, network u
cfg.SyncMode = downloader.LightSync
cfg.NetworkId = network
cfg.Genesis = genesis
return les.New(ctx, &cfg)
return nil, err //les.New(ctx, &cfg)
}); err != nil {
return nil, err
}
// Assemble the ethstats monitoring and reporting service'
if stats != "" {
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
var serv *les.LightEthereum
ctx.Service(&serv)
return ethstats.New(stats, nil, serv)
}); err != nil {
return nil, err
}
}
// Boot up the client and ensure it connects to bootnodes
if err := stack.Start(); err != nil {
return nil, err
Expand Down
32 changes: 12 additions & 20 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ import (
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethstats"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node"
Expand Down Expand Up @@ -1083,20 +1081,20 @@ func SetDashboardConfig(ctx *cli.Context, cfg *dashboard.Config) {
// RegisterEthService adds an Ethereum client to the stack.
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
var err error
if cfg.SyncMode == downloader.LightSync {
/* if cfg.SyncMode == downloader.LightSync {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return les.New(ctx, cfg)
})
} else {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg)
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}
return fullNode, err
})
}
} else {*/
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg)
/*if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}*/
return fullNode, err
})
//}
if err != nil {
Fatalf("Failed to register the Ethereum service: %v", err)
}
Expand All @@ -1123,13 +1121,7 @@ func RegisterShhService(stack *node.Node, cfg *whisper.Config) {
func RegisterEthStatsService(stack *node.Node, url string) {
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
// Retrieve both eth and les services
var ethServ *eth.Ethereum
ctx.Service(&ethServ)

var lesServ *les.LightEthereum
ctx.Service(&lesServ)

return ethstats.New(url, ethServ, lesServ)
return nil, nil
}); err != nil {
Fatalf("Failed to register the Ethereum Stats service: %v", err)
}
Expand Down
10 changes: 2 additions & 8 deletions contracts/release/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -66,12 +65,7 @@ func NewReleaseService(ctx *node.ServiceContext, config Config) (node.Service, e
if err := ctx.Service(&ethereum); err == nil {
apiBackend = ethereum.ApiBackend
} else {
var ethereum *les.LightEthereum
if err := ctx.Service(&ethereum); err == nil {
apiBackend = ethereum.ApiBackend
} else {
return nil, err
}
return nil, err
}
// Construct the release service
contract, err := NewReleaseOracle(config.Oracle, eth.NewContractBackend(apiBackend))
Expand Down Expand Up @@ -137,7 +131,7 @@ func (r *ReleaseService) checkVersion() {
if err != nil {
if err == bind.ErrNoCode {
log.Debug("Release oracle not found", "contract", r.config.Oracle)
} else if err != les.ErrNoPeers {
} else {
log.Error("Failed to retrieve current release", "err", err)
}
return
Expand Down
35 changes: 33 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/hashtree"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
Expand Down Expand Up @@ -105,14 +106,18 @@ type BlockChain struct {
quit chan struct{} // blockchain quit channel
running int32 // running must be called atomically
// procInterrupt must be atomically called
processing int32
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup // chain processing wait group for shutting down
writeCounter uint64

engine consensus.Engine
processor Processor // block processor interface
validator Validator // block and state validator interface
vmConfig vm.Config

gc *hashtree.GarbageCollector

badBlocks *lru.Cache // Bad block cache
}

Expand All @@ -126,6 +131,7 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co
futureBlocks, _ := lru.New(maxFutureBlocks)
badBlocks, _ := lru.New(badBlockLimit)

//hashtree.Print(chainDb, []byte(state.DbPrefix))
bc := &BlockChain{
config: config,
chainDb: chainDb,
Expand Down Expand Up @@ -167,11 +173,29 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co
}
}
}

bc.gc = hashtree.NewGarbageCollector(chainDb, []byte(state.DbPrefix), bc.hasDataCallback)

/*headBlock := bc.currentBlock.NumberU64()
if headBlock > 1000 {
bc.gc.FullGC(headBlock - 1000)
}*/

bc.gc.BackgroundGC(bc.CurrentBlock, &bc.processing, &bc.procInterrupt, &bc.wg)

// Take ownership of this particular state
go bc.update()
return bc, nil
}

func (bc *BlockChain) hasDataCallback(version uint64) func(position, hash []byte) bool {
header := bc.GetHeaderByNumber(version)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I don't yet understand the full context of how this method is used, I do find it a bit odd that number is used to resolve a header, since a number can be ambiguous.

So what I'm wondering is if whatever uses this method, does it handle reorgs without breaking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely not. This function verifies a piece of data (identified by hash) being present at the given position in a certain block. This block is the "GC block" which is the oldest block whose state we still want to remember. This block is also the earliest one where we can roll back to. If a longer reorg happens, we should resync the entire chain from the beginning (or do a fast sync). Handling this corner case is not implemented yet.

if header == nil {
return nil
}
return state.HasDataCallback(header.Root, bc.chainDb)
}

func (bc *BlockChain) getProcInterrupt() bool {
return atomic.LoadInt32(&bc.procInterrupt) == 1
}
Expand Down Expand Up @@ -292,7 +316,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
if block == nil {
return fmt.Errorf("non existent block [%x…]", hash[:4])
}
if _, err := trie.NewSecure(block.Root(), bc.chainDb, 0); err != nil {
if _, err := trie.NewSecure(block.Root(), hashtree.NewReader(bc.chainDb, state.DbPrefix), 0); err != nil {
return err
}
// If all checks out, manually set the head block
Expand Down Expand Up @@ -808,7 +832,7 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R
if err := WriteBlock(batch, block); err != nil {
return NonStatTy, err
}
if _, err := state.CommitTo(batch, bc.config.IsEIP158(block.Number())); err != nil {
if _, err := state.CommitTo(batch, block.NumberU64(), bc.gc, bc.config.IsEIP158(block.Number())); err != nil {
return NonStatTy, err
}
if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
Expand Down Expand Up @@ -842,9 +866,13 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R
} else {
status = SideStatTy
}

bc.gc.LockWrite()
if err := batch.Write(); err != nil {
bc.gc.UnlockWrite()
return NonStatTy, err
}
bc.gc.UnlockWrite()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit playing with fire; holding on to one mutex (bc.mu) while obtaining another mutex. Could lead to race conditions.

For example, the bc.mu.RLock() is called in CurrentBlock, which is called from BackgroundGC, which is a separate thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean deadlock? It cannot cause a deadlock because the only thing we do under this lock is write or delete state data. Chain mutex is never used while holding this one. Still, I know that using such a lock in Blockchain is critical, but so is the GC. If we keep using it like this, we should document it very well what it does and why it does that. We should somehow avoid deleting trie nodes that reappeared just when GC removed the old entry. I am open to other suggestions though.

Note: my original proposal had an inherently safe db structure:
https://github.com/zsfelfoldi/ethereum-docs/blob/master/geth/gc_proposal.md
Unfortunately this also means that we don't know the exact key when reading and we would need a db iterator for reading every trie node. So far I could not find a db format that is both concurrency-safe and provides good performance. Using this mutex and not starting a GC while processing a block ensures that they usually don't collide and if they do anyway, it won't cause any trouble. From a practical point of view this seems to be a good solution to me if it is properly documented.


// Set new head.
if status == CanonStatTy {
Expand Down Expand Up @@ -888,6 +916,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

atomic.StoreInt32(&bc.processing, 1)
defer atomic.StoreInt32(&bc.processing, 0)

// A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
// acquiring.
Expand Down
2 changes: 1 addition & 1 deletion core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
if b.engine != nil {
block, _ := b.engine.Finalize(b.chainReader, b.header, statedb, b.txs, b.uncles, b.receipts)
// Write state changes to db
_, err := statedb.CommitTo(db, config.IsEIP158(b.header.Number))
_, err := statedb.CommitTo(db, b.header.Number.Uint64(), nil, config.IsEIP158(b.header.Number))
if err != nil {
panic(fmt.Sprintf("state write error: %v", err))
}
Expand Down
17 changes: 9 additions & 8 deletions core/database_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)

// DatabaseReader wraps the Get method of a backing data store.
Expand Down Expand Up @@ -58,8 +60,7 @@ var (
lookupPrefix = []byte("l") // lookupPrefix + hash -> transaction/receipt lookup metadata
bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits

preimagePrefix = "secure-key-" // preimagePrefix + hash -> preimage
configPrefix = []byte("ethereum-config-") // config prefix for the db
configPrefix = []byte("ethereum-config-") // config prefix for the db

// Chain index prefixes (use `i` + single byte to avoid mixing data types).
BloomBitsIndexPrefix = []byte("iB") // BloomBitsIndexPrefix is the data table of a chain indexer to track its progress
Expand Down Expand Up @@ -532,20 +533,20 @@ func DeleteTxLookupEntry(db DatabaseDeleter, hash common.Hash) {
db.Delete(append(lookupPrefix, hash.Bytes()...))
}

// PreimageTable returns a Database instance with the key prefix for preimage entries.
func PreimageTable(db ethdb.Database) ethdb.Database {
return ethdb.NewTable(db, preimagePrefix)
func GetPreimage(db ethdb.Database, hash common.Hash) ([]byte, error) {
return db.Get(append([]byte(state.DbPrefix), trie.SecHashTreePos(hash.Bytes())...))
}

// WritePreimages writes the provided set of preimages to the database. `number` is the
// current block number, and is used for debug messages only.
func WritePreimages(db ethdb.Database, number uint64, preimages map[common.Hash][]byte) error {
table := PreimageTable(db)
table := ethdb.NewTable(db, state.DbPrefix)
batch := table.NewBatch()
hitCount := 0
for hash, preimage := range preimages {
if _, err := table.Get(hash.Bytes()); err != nil {
batch.Put(hash.Bytes(), preimage)
key := trie.SecHashTreePos(hash.Bytes())
if _, err := table.Get(key); err != nil {
batch.Put(key, preimage)
hitCount++
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (g *Genesis) Commit(db ethdb.Database) (*types.Block, error) {
if block.Number().Sign() != 0 {
return nil, fmt.Errorf("can't commit genesis block with number > 0")
}
if _, err := statedb.CommitTo(db, false); err != nil {
if _, err := statedb.CommitTo(db, 0, nil, false); err != nil {
return nil, fmt.Errorf("cannot write state: %v", err)
}
if err := WriteTd(db, block.Hash(), block.NumberU64(), g.Difficulty); err != nil {
Expand Down
Loading