Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd, core, eth, light, trie: dump clean cache periodically #20391

Merged
merged 6 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ var (
utils.CacheFlag,
utils.CacheDatabaseFlag,
utils.CacheTrieFlag,
utils.CacheTrieJournalFlag,
utils.CacheTrieRejournalFlag,
utils.CacheGCFlag,
utils.CacheSnapshotFlag,
utils.CacheNoPrefetchFlag,
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.CacheFlag,
utils.CacheDatabaseFlag,
utils.CacheTrieFlag,
utils.CacheTrieJournalFlag,
utils.CacheTrieRejournalFlag,
utils.CacheGCFlag,
utils.CacheSnapshotFlag,
utils.CacheNoPrefetchFlag,
Expand Down
16 changes: 16 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,16 @@ var (
Usage: "Percentage of cache memory allowance to use for trie caching (default = 15% full mode, 30% archive mode)",
Value: 15,
}
CacheTrieJournalFlag = cli.StringFlag{
Name: "cache.trie.journal",
Usage: "Disk journal directory for trie cache to survive node restarts",
Value: eth.DefaultConfig.TrieCleanCacheJournal,
}
CacheTrieRejournalFlag = cli.DurationFlag{
Name: "cache.trie.rejournal",
Usage: "Time interval to regenerate the trie cache journal",
Value: eth.DefaultConfig.TrieCleanCacheRejournal,
}
CacheGCFlag = cli.IntFlag{
Name: "cache.gc",
Usage: "Percentage of cache memory allowance to use for trie pruning (default = 25% full mode, 0% archive mode)",
Expand Down Expand Up @@ -1537,6 +1547,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheTrieFlag.Name) {
cfg.TrieCleanCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheTrieFlag.Name) / 100
}
if ctx.GlobalIsSet(CacheTrieJournalFlag.Name) {
cfg.TrieCleanCacheJournal = ctx.GlobalString(CacheTrieJournalFlag.Name)
}
if ctx.GlobalIsSet(CacheTrieRejournalFlag.Name) {
cfg.TrieCleanCacheRejournal = ctx.GlobalDuration(CacheTrieRejournalFlag.Name)
}
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheGCFlag.Name) {
cfg.TrieDirtyCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheGCFlag.Name) / 100
}
Expand Down
23 changes: 22 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ const (
// that's resident in a blockchain.
type CacheConfig struct {
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieCleanJournal string // Disk journal for saving clean cache entries.
TrieCleanRejournal time.Duration // Time interval to dump clean cache to disk periodically
TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
Expand Down Expand Up @@ -220,7 +222,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit, cacheConfig.TrieCleanJournal),
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
Expand Down Expand Up @@ -328,6 +330,19 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bc.txLookupLimit = *txLookupLimit
go bc.maintainTxIndex(txIndexBlock)
}
// If periodic cache journal is required, spin it up.
if bc.cacheConfig.TrieCleanRejournal > 0 {
if bc.cacheConfig.TrieCleanRejournal < time.Minute {
log.Warn("Sanitizing invalid trie cache journal time", "provided", bc.cacheConfig.TrieCleanRejournal, "updated", time.Minute)
bc.cacheConfig.TrieCleanRejournal = time.Minute
}
triedb := bc.stateCache.TrieDB()
bc.wg.Add(1)
go func() {
defer bc.wg.Done()
triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit)
}()
}
return bc, nil
}

Expand Down Expand Up @@ -919,6 +934,12 @@ func (bc *BlockChain) Stop() {
log.Error("Dangling trie nodes after full cleanup")
}
}
// Ensure all live cached entries be saved into disk, so that we can skip
// cache warmup when node restarts.
if bc.cacheConfig.TrieCleanJournal != "" {
triedb := bc.stateCache.TrieDB()
triedb.SaveCache(bc.cacheConfig.TrieCleanJournal)
}
log.Info("Blockchain stopped")
}

Expand Down
2 changes: 1 addition & 1 deletion core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func SetupGenesisBlock(db ethdb.Database, genesis *Genesis) (*params.ChainConfig
// We have the genesis block in database(perhaps in ancient database)
// but the corresponding state is missing.
header := rawdb.ReadHeader(db, stored, 0)
if _, err := state.New(header.Root, state.NewDatabaseWithCache(db, 0), nil); err != nil {
if _, err := state.New(header.Root, state.NewDatabaseWithCache(db, 0, ""), nil); err != nil {
if genesis == nil {
genesis = DefaultGenesisBlock()
}
Expand Down
6 changes: 3 additions & 3 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ type Trie interface {
// concurrent use, but does not retain any recent trie nodes in memory. To keep some
// historical state in memory, use the NewDatabaseWithCache constructor.
func NewDatabase(db ethdb.Database) Database {
return NewDatabaseWithCache(db, 0)
return NewDatabaseWithCache(db, 0, "")
}

// NewDatabaseWithCache creates a backing store for state. The returned database
// is safe for concurrent use and retains a lot of collapsed RLP trie nodes in a
// large memory cache.
func NewDatabaseWithCache(db ethdb.Database, cache int) Database {
func NewDatabaseWithCache(db ethdb.Database, cache int, journal string) Database {
csc, _ := lru.New(codeSizeCacheSize)
return &cachingDB{
db: trie.NewDatabaseWithCache(db, cache),
db: trie.NewDatabaseWithCache(db, cache, journal),
codeSizeCache: csc,
}
}
Expand Down
4 changes: 2 additions & 2 deletions eth/api_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl

// Ensure we have a valid starting state before doing any work
origin := start.NumberU64()
database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16) // Chain tracing will probably start at genesis
database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16, "") // Chain tracing will probably start at genesis

if number := start.NumberU64(); number > 0 {
start = api.eth.blockchain.GetBlock(start.ParentHash(), start.NumberU64()-1)
Expand Down Expand Up @@ -641,7 +641,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*
}
// Otherwise try to reexec blocks until we find a state or reach our limit
origin := block.NumberU64()
database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16)
database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16, "")

for i := uint64(0); i < reexec; i++ {
block = api.eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1)
Expand Down
2 changes: 2 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
TrieCleanJournal: ctx.ResolvePath(config.TrieCleanCacheJournal),
TrieCleanRejournal: config.TrieCleanCacheRejournal,
TrieCleanNoPrefetch: config.NoPrefetch,
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
Expand Down
28 changes: 16 additions & 12 deletions eth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@ var DefaultConfig = Config{
DatasetsOnDisk: 2,
DatasetsLockMmap: false,
},
NetworkId: 1,
LightPeers: 100,
UltraLightFraction: 75,
DatabaseCache: 512,
TrieCleanCache: 256,
TrieDirtyCache: 256,
TrieTimeout: 60 * time.Minute,
SnapshotCache: 256,
NetworkId: 1,
LightPeers: 100,
UltraLightFraction: 75,
DatabaseCache: 512,
TrieCleanCache: 256,
TrieCleanCacheJournal: "triecache",
TrieCleanCacheRejournal: 60 * time.Minute,
TrieDirtyCache: 256,
TrieTimeout: 60 * time.Minute,
SnapshotCache: 256,
Miner: miner.Config{
GasFloor: 8000000,
GasCeil: 8000000,
Expand Down Expand Up @@ -139,10 +141,12 @@ type Config struct {
DatabaseCache int
DatabaseFreezer string

TrieCleanCache int
TrieDirtyCache int
TrieTimeout time.Duration
SnapshotCache int
TrieCleanCache int
TrieCleanCacheJournal string `toml:",omitempty"` // Disk journal directory for trie cache to survive node restarts
TrieCleanCacheRejournal time.Duration `toml:",omitempty"` // Time interval to regenerate the journal for clean cache
TrieDirtyCache int
TrieTimeout time.Duration
SnapshotCache int

// Mining options
Miner miner.Config
Expand Down
12 changes: 12 additions & 0 deletions eth/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions light/postprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func NewChtIndexer(db ethdb.Database, odr OdrBackend, size, confirms uint64, dis
diskdb: db,
odr: odr,
trieTable: trieTable,
triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down
triedb: trie.NewDatabaseWithCache(trieTable, 1, ""), // Use a tiny cache only to keep memory down
trieset: mapset.NewSet(),
sectionSize: size,
disablePruning: disablePruning,
Expand Down Expand Up @@ -340,7 +340,7 @@ func NewBloomTrieIndexer(db ethdb.Database, odr OdrBackend, parentSize, size uin
diskdb: db,
odr: odr,
trieTable: trieTable,
triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down
triedb: trie.NewDatabaseWithCache(trieTable, 1, ""), // Use a tiny cache only to keep memory down
trieset: mapset.NewSet(),
parentSize: parentSize,
size: size,
Expand Down
51 changes: 48 additions & 3 deletions trie/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"reflect"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -278,16 +279,20 @@ func expandNode(hash hashNode, n node) node {
// its written out to disk or garbage collected. No read cache is created, so all
// data retrievals will hit the underlying disk database.
func NewDatabase(diskdb ethdb.KeyValueStore) *Database {
return NewDatabaseWithCache(diskdb, 0)
return NewDatabaseWithCache(diskdb, 0, "")
}

// NewDatabaseWithCache creates a new trie database to store ephemeral trie content
// before its written out to disk or garbage collected. It also acts as a read cache
// for nodes loaded from disk.
func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int) *Database {
func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int, journal string) *Database {
var cleans *fastcache.Cache
if cache > 0 {
cleans = fastcache.New(cache * 1024 * 1024)
if journal == "" {
cleans = fastcache.New(cache * 1024 * 1024)
} else {
cleans = fastcache.LoadFromFileOrNew(journal, cache*1024*1024)
}
}
return &Database{
diskdb: diskdb,
Expand Down Expand Up @@ -867,3 +872,43 @@ func (db *Database) Size() (common.StorageSize, common.StorageSize) {
var metarootRefs = common.StorageSize(len(db.dirties[common.Hash{}].children) * (common.HashLength + 2))
return db.dirtiesSize + db.childrenSize + metadataSize - metarootRefs, db.preimagesSize
}

// saveCache saves clean state cache to given directory path
// using specified CPU cores.
func (db *Database) saveCache(dir string, threads int) error {
if db.cleans == nil {
return nil
}
log.Info("Writing clean trie cache to disk", "path", dir, "threads", threads)

start := time.Now()
err := db.cleans.SaveToFileConcurrent(dir, threads)
if err != nil {
log.Error("Failed to persist clean trie cache", "error", err)
return err
}
log.Info("Persisted the clean trie cache", "path", dir, "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}

// SaveCache atomically saves fast cache data to the given dir using all
// available CPU cores.
func (db *Database) SaveCache(dir string) error {
return db.saveCache(dir, runtime.GOMAXPROCS(0))
}

// SaveCachePeriodically atomically saves fast cache data to the given dir with
// the specified interval. All dump operation will only use a single CPU core.
func (db *Database) SaveCachePeriodically(dir string, interval time.Duration, stopCh <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
db.saveCache(dir, 1)
case <-stopCh:
return
}
}
}