From c03da8fb8e5137384e596c79be08bb2542636fd4 Mon Sep 17 00:00:00 2001 From: b00ris Date: Fri, 19 Mar 2021 15:54:47 +0300 Subject: [PATCH] Split header prefix bucket to headers, td and canonical (#1556) * split headers prefix * migration * fix downloader bug * test for migration * fix lint * uncomment t.Prallel * fix postprocessing test --- cmd/hack/hack.go | 5 +- cmd/integration/commands/snapshot_check.go | 2 +- cmd/pics/state.go | 6 +- .../generator/commands/copy_from_state.go | 5 +- .../commands/generate_header_snapshot.go | 5 +- .../commands/generate_state_snapshot.go | 5 +- .../commands/verify_state_snapshot.go | 7 +- cmd/state/generate/regenerate_tx_lookup.go | 2 +- cmd/state/stateless/state.go | 5 +- cmd/state/verify/verify_headers_snapshot.go | 4 +- common/dbutils/bucket.go | 18 +- common/dbutils/composite_keys.go | 33 ---- common/dbutils/composite_keys_test.go | 34 ---- core/rawdb/accessors_chain.go | 34 ++-- eth/downloader/downloader_stagedsync.go | 48 ++--- eth/stagedsync/stage_blockhashes.go | 5 +- eth/stagedsync/stage_headers.go | 4 +- eth/stagedsync/stage_senders.go | 7 +- eth/stagedsync/stage_txlookup.go | 9 +- eth/stagedsync/stage_txpool.go | 14 +- ethdb/kv_migrator_test.go | 6 +- ethdb/kv_snapshot_test.go | 28 +-- migrations/header_prefix.go | 166 ++++++++++++++++++ migrations/header_prefix_test.go | 110 ++++++++++++ migrations/migrations.go | 1 + turbo/snapshotsync/postprocessing.go | 28 ++- turbo/snapshotsync/postprocessing_test.go | 4 +- turbo/snapshotsync/wrapdb.go | 4 +- turbo/stages/headerdownload/header_algos.go | 4 +- 29 files changed, 401 insertions(+), 202 deletions(-) create mode 100644 migrations/header_prefix.go create mode 100644 migrations/header_prefix_test.go diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index dcaf9d3d1b3..8c69dcb09bc 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -1574,16 +1574,13 @@ func mint(chaindata string, block uint64) error { blockEncoded := dbutils.EncodeBlockNumber(block) canonical := make(map[common.Hash]struct{}) if err1 := db.KV().View(context.Background(), func(tx ethdb.Tx) error { - c := tx.Cursor(dbutils.HeaderPrefix) + c := tx.Cursor(dbutils.HeaderCanonicalBucket) // This is a mapping of contractAddress + incarnation => CodeHash for k, v, err := c.Seek(blockEncoded); k != nil; k, v, err = c.Next() { if err != nil { return err } // Skip non relevant records - if !dbutils.CheckCanonicalKey(k) { - continue - } canonical[common.BytesToHash(v)] = struct{}{} if len(canonical)%100_000 == 0 { log.Info("Read canonical hashes", "count", len(canonical)) diff --git a/cmd/integration/commands/snapshot_check.go b/cmd/integration/commands/snapshot_check.go index 459f3054be7..7efa11babda 100644 --- a/cmd/integration/commands/snapshot_check.go +++ b/cmd/integration/commands/snapshot_check.go @@ -82,7 +82,7 @@ var cmdSnapshotCheck = &cobra.Command{ kv := ethdb.NewSnapshot2KV(). DB(tmpDb). - SnapshotDB([]string{dbutils.HeaderPrefix, dbutils.BlockBodyPrefix, dbutils.Senders, dbutils.HeadBlockKey, dbutils.HeaderNumberPrefix}, mainDB.KV()). + SnapshotDB([]string{dbutils.HeadersBucket, dbutils.HeaderCanonicalBucket, dbutils.HeaderTDBucket, dbutils.BlockBodyPrefix, dbutils.Senders, dbutils.HeadBlockKey, dbutils.HeaderNumberBucket}, mainDB.KV()). SnapshotDB([]string{dbutils.PlainStateBucket, dbutils.CodeBucket, dbutils.PlainContractCodeBucket}, stateSnapshot). MustOpen() diff --git a/cmd/pics/state.go b/cmd/pics/state.go index 448ee24ec3c..dedbfd35a69 100644 --- a/cmd/pics/state.go +++ b/cmd/pics/state.go @@ -120,9 +120,11 @@ var bucketLabels = map[string]string{ dbutils.Log: "Event Logs", dbutils.AccountsHistoryBucket: "History Of Accounts", dbutils.StorageHistoryBucket: "History Of Storage", - dbutils.HeaderPrefix: "Headers", + dbutils.HeadersBucket: "Headers", + dbutils.HeaderCanonicalBucket: "Canonical headers", + dbutils.HeaderTDBucket: "Headers TD", dbutils.BlockBodyPrefix: "Block Bodies", - dbutils.HeaderNumberPrefix: "Header Numbers", + dbutils.HeaderNumberBucket: "Header Numbers", dbutils.TxLookupPrefix: "Transaction Index", dbutils.CodeBucket: "Code Of Contracts", dbutils.Senders: "Senders", diff --git a/cmd/snapshots/generator/commands/copy_from_state.go b/cmd/snapshots/generator/commands/copy_from_state.go index 2d10f813cee..4dc4833717d 100644 --- a/cmd/snapshots/generator/commands/copy_from_state.go +++ b/cmd/snapshots/generator/commands/copy_from_state.go @@ -3,13 +3,14 @@ package commands import ( "context" "fmt" + "os" + "time" + "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/turbo/snapshotsync" "github.com/spf13/cobra" - "os" - "time" ) func init() { diff --git a/cmd/snapshots/generator/commands/generate_header_snapshot.go b/cmd/snapshots/generator/commands/generate_header_snapshot.go index c7dbcc11604..0966db473da 100644 --- a/cmd/snapshots/generator/commands/generate_header_snapshot.go +++ b/cmd/snapshots/generator/commands/generate_header_snapshot.go @@ -61,8 +61,7 @@ func HeaderSnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock ui } snKV := ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return dbutils.BucketsCfg{ - dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, - dbutils.HeadersSnapshotInfoBucket: dbutils.BucketConfigItem{}, + dbutils.HeadersBucket: dbutils.BucketConfigItem{}, } }).Path(snapshotPath).MustOpen() @@ -87,7 +86,7 @@ func HeaderSnapshot(ctx context.Context, dbPath, snapshotPath string, toBlock ui if len(header) == 0 { return fmt.Errorf("empty header: %v", i) } - tuples = append(tuples, []byte(dbutils.HeaderPrefix), dbutils.HeaderKey(i, hash), header) + tuples = append(tuples, []byte(dbutils.HeadersBucket), dbutils.HeaderKey(i, hash), header) if len(tuples) >= chunkFile { log.Info("Committed", "block", i) _, err = snDB.MultiPut(tuples...) diff --git a/cmd/snapshots/generator/commands/generate_state_snapshot.go b/cmd/snapshots/generator/commands/generate_state_snapshot.go index 93318f67183..058301760f8 100644 --- a/cmd/snapshots/generator/commands/generate_state_snapshot.go +++ b/cmd/snapshots/generator/commands/generate_state_snapshot.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "os" + "time" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/core/state" @@ -12,8 +15,6 @@ import ( "github.com/ledgerwatch/turbo-geth/turbo/snapshotsync" "github.com/ledgerwatch/turbo-geth/turbo/trie" "github.com/spf13/cobra" - "os" - "time" ) func init() { diff --git a/cmd/snapshots/generator/commands/verify_state_snapshot.go b/cmd/snapshots/generator/commands/verify_state_snapshot.go index c17119cd2f6..9b0cb41fe70 100644 --- a/cmd/snapshots/generator/commands/verify_state_snapshot.go +++ b/cmd/snapshots/generator/commands/verify_state_snapshot.go @@ -3,6 +3,10 @@ package commands import ( "context" "fmt" + "io/ioutil" + "os" + "time" + "github.com/ledgerwatch/lmdb-go/lmdb" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/core/rawdb" @@ -10,9 +14,6 @@ import ( "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/turbo/snapshotsync" "github.com/spf13/cobra" - "io/ioutil" - "os" - "time" ) func init() { diff --git a/cmd/state/generate/regenerate_tx_lookup.go b/cmd/state/generate/regenerate_tx_lookup.go index 81133bdf360..1df0be9bd74 100644 --- a/cmd/state/generate/regenerate_tx_lookup.go +++ b/cmd/state/generate/regenerate_tx_lookup.go @@ -33,7 +33,7 @@ func RegenerateTxLookup(chaindata string) error { log.Error("Cant get last executed block", "err", err) } log.Info("TxLookup generation started", "start time", startTime) - err = stagedsync.TxLookupTransform("txlookup", db, dbutils.HeaderHashKey(0), dbutils.HeaderHashKey(lastExecutedBlock), quitCh, os.TempDir()) + err = stagedsync.TxLookupTransform("txlookup", db, dbutils.EncodeBlockNumber(0), dbutils.EncodeBlockNumber(lastExecutedBlock+1), quitCh, os.TempDir()) if err != nil { return err } diff --git a/cmd/state/stateless/state.go b/cmd/state/stateless/state.go index 012a2f327aa..3295c5bae76 100644 --- a/cmd/state/stateless/state.go +++ b/cmd/state/stateless/state.go @@ -491,11 +491,8 @@ func (r *GasLimitReporter) GasLimits(ctx context.Context) { var blockNum uint64 = 0 if err := r.remoteDB.View(ctx, func(tx ethdb.Tx) error { - c := tx.Cursor(dbutils.HeaderPrefix) + c := tx.Cursor(dbutils.HeadersBucket) if err := ethdb.ForEach(c, func(k, v []byte) (bool, error) { - if len(k) != 40 { - return true, nil - } header := new(types.Header) if err := rlp.Decode(bytes.NewReader(v), header); err != nil { return false, err diff --git a/cmd/state/verify/verify_headers_snapshot.go b/cmd/state/verify/verify_headers_snapshot.go index 7c6c53f6d5c..5af1adcdd38 100644 --- a/cmd/state/verify/verify_headers_snapshot.go +++ b/cmd/state/verify/verify_headers_snapshot.go @@ -15,13 +15,13 @@ import ( func HeadersSnapshot(snapshotPath string) error { snKV := ethdb.NewLMDB().Path(snapshotPath).Flags(func(flags uint) uint { return flags | lmdb.Readonly }).WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return dbutils.BucketsCfg{ - dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, + dbutils.HeadersBucket: dbutils.BucketConfigItem{}, dbutils.HeadersSnapshotInfoBucket: dbutils.BucketConfigItem{}, } }).MustOpen() var prevHeader *types.Header err := snKV.View(context.Background(), func(tx ethdb.Tx) error { - c := tx.Cursor(dbutils.HeaderPrefix) + c := tx.Cursor(dbutils.HeadersBucket) k, v, innerErr := c.First() for { if len(k) == 0 && len(v) == 0 { diff --git a/common/dbutils/bucket.go b/common/dbutils/bucket.go index d280ce530f6..9691efd580b 100644 --- a/common/dbutils/bucket.go +++ b/common/dbutils/bucket.go @@ -147,10 +147,12 @@ var ( DatabaseVerisionKey = "DatabaseVersion" // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). - HeaderPrefix = "h" // block_num_u64 + hash -> header - HeaderTDSuffix = []byte("t") // block_num_u64 + hash + headerTDSuffix -> td - HeaderHashSuffix = []byte("n") // block_num_u64 + headerHashSuffix -> hash - HeaderNumberPrefix = "H" // headerNumberPrefix + hash -> num (uint64 big endian) + HeaderPrefixOld = "h" // block_num_u64 + hash -> header + HeaderNumberBucket = "H" // headerNumberPrefix + hash -> num (uint64 big endian) + + HeaderCanonicalBucket = "canonical_headers" // block_num_u64 -> header hash + HeadersBucket = "headers" // block_num_u64 + hash -> header + HeaderTDBucket = "header_to_td" // block_num_u64 + hash + headerTDSuffix -> td BlockBodyPrefix = "b" // block_num_u64 + hash -> block body EthTx = "eth_tx" // tbl_sequence_u64 -> rlp(tx) @@ -254,8 +256,7 @@ var Buckets = []string{ CodeBucket, ContractCodeBucket, DatabaseVerisionKey, - HeaderPrefix, - HeaderNumberPrefix, + HeaderNumberBucket, BlockBodyPrefix, BlockReceiptsPrefix, TxLookupPrefix, @@ -294,6 +295,10 @@ var Buckets = []string{ HashedAccountsBucket, HashedStorageBucket, IntermediateTrieHashBucketOld2, + + HeaderCanonicalBucket, + HeadersBucket, + HeaderTDBucket, } // DeprecatedBuckets - list of buckets which can be programmatically deleted - for example after migration @@ -303,6 +308,7 @@ var DeprecatedBuckets = []string{ CurrentStateBucketOld1, PlainStateBucketOld1, IntermediateTrieHashBucketOld1, + HeaderPrefixOld, } type CustomComparator string diff --git a/common/dbutils/composite_keys.go b/common/dbutils/composite_keys.go index efeba42733d..e1ccca25cfb 100644 --- a/common/dbutils/composite_keys.go +++ b/common/dbutils/composite_keys.go @@ -1,7 +1,6 @@ package dbutils import ( - "bytes" "encoding/binary" "github.com/ledgerwatch/turbo-geth/common" @@ -19,38 +18,6 @@ func HeaderKey(number uint64, hash common.Hash) []byte { return append(EncodeBlockNumber(number), hash.Bytes()...) } -func IsHeaderKey(k []byte) bool { - l := common.BlockNumberLength + common.HashLength - if len(k) != l { - return false - } - - return !IsHeaderHashKey(k) && !IsHeaderTDKey(k) -} - -// headerTDKey = headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -func HeaderTDKey(number uint64, hash common.Hash) []byte { - return append(HeaderKey(number, hash), HeaderTDSuffix...) -} - -func IsHeaderTDKey(k []byte) bool { - l := common.BlockNumberLength + common.HashLength + 1 - return len(k) == l && bytes.Equal(k[l-1:], HeaderTDSuffix) -} - -// headerHashKey = headerPrefix + num (uint64 big endian) + headerHashSuffix -func HeaderHashKey(number uint64) []byte { - return append(EncodeBlockNumber(number), HeaderHashSuffix...) -} - -func CheckCanonicalKey(k []byte) bool { - return len(k) == 8+len(HeaderHashSuffix) && bytes.Equal(k[8:], HeaderHashSuffix) -} - -func IsHeaderHashKey(k []byte) bool { - l := common.BlockNumberLength + 1 - return len(k) == l && bytes.Equal(k[l-1:], HeaderHashSuffix) -} // blockBodyKey = blockBodyPrefix + num (uint64 big endian) + hash func BlockBodyKey(number uint64, hash common.Hash) []byte { diff --git a/common/dbutils/composite_keys_test.go b/common/dbutils/composite_keys_test.go index 297d63464c7..df282b7d42e 100644 --- a/common/dbutils/composite_keys_test.go +++ b/common/dbutils/composite_keys_test.go @@ -7,41 +7,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestHeaderTypeDetection(t *testing.T) { - - // good input - headerHashKey := common.Hex2Bytes("00000000000000006e") - assert.False(t, IsHeaderKey(headerHashKey)) - assert.False(t, IsHeaderTDKey(headerHashKey)) - assert.True(t, IsHeaderHashKey(headerHashKey)) - - headerKey := common.Hex2Bytes("0000000000004321ed7240d411782ae438adfd85f7edad373cea722318c6e7f5f5b30f9abc9b36fd") - assert.True(t, IsHeaderKey(headerKey)) - assert.False(t, IsHeaderTDKey(headerKey)) - assert.False(t, IsHeaderHashKey(headerKey)) - - headerTdKey := common.Hex2Bytes("0000000000004321ed7240d411782ae438adfd85f7edad373cea722318c6e7f5f5b30f9abc9b36fd74") - assert.False(t, IsHeaderKey(headerTdKey)) - assert.True(t, IsHeaderTDKey(headerTdKey)) - assert.False(t, IsHeaderHashKey(headerTdKey)) - - // bad input - emptyKey := common.Hex2Bytes("") - assert.False(t, IsHeaderKey(emptyKey)) - assert.False(t, IsHeaderTDKey(emptyKey)) - assert.False(t, IsHeaderHashKey(emptyKey)) - - tooLongKey := common.Hex2Bytes("0000000000004321ed7240d411782ae438adfd85f7edad373cea722318c6e7f5f5b30f9abc9b36fd0000000000004321ed7240d411782ae438adfd85f7edad373cea722318c6e7f5f5b30f9abc9b36fd0000000000004321ed7240d411782ae438adfd85f7edad373cea722318c6e7f5f5b30f9abc9b36fd0000000000004321ed7240d411782ae438adfd85f7edad373cea722318c6e7f5f5b30f9abc9b36fd") - assert.False(t, IsHeaderKey(tooLongKey)) - assert.False(t, IsHeaderTDKey(tooLongKey)) - assert.False(t, IsHeaderHashKey(tooLongKey)) - - notRelatedInput := common.Hex2Bytes("alex") - assert.False(t, IsHeaderKey(notRelatedInput)) - assert.False(t, IsHeaderTDKey(notRelatedInput)) - assert.False(t, IsHeaderHashKey(notRelatedInput)) -} func TestPlainParseStoragePrefix(t *testing.T) { expectedAddr := common.HexToAddress("0x5A0b54D5dc17e0AadC383d2db43B0a0D3E029c4c") diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 2daeb37da5a..82084627f9e 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -36,7 +36,7 @@ import ( // ReadCanonicalHash retrieves the hash assigned to a canonical block number. func ReadCanonicalHash(db databaseReader, number uint64) (common.Hash, error) { - data, err := db.Get(dbutils.HeaderPrefix, dbutils.HeaderHashKey(number)) + data, err := db.Get(dbutils.HeaderCanonicalBucket, dbutils.EncodeBlockNumber(number)) if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { return common.Hash{}, fmt.Errorf("failed ReadCanonicalHash: %w, number=%d", err, number) } @@ -48,7 +48,7 @@ func ReadCanonicalHash(db databaseReader, number uint64) (common.Hash, error) { // WriteCanonicalHash stores the hash assigned to a canonical block number. func WriteCanonicalHash(db DatabaseWriter, hash common.Hash, number uint64) error { - if err := db.Put(dbutils.HeaderPrefix, dbutils.HeaderHashKey(number), hash.Bytes()); err != nil { + if err := db.Put(dbutils.HeaderCanonicalBucket, dbutils.EncodeBlockNumber(number), hash.Bytes()); err != nil { return fmt.Errorf("failed to store number to hash mapping: %w", err) } return nil @@ -56,7 +56,7 @@ func WriteCanonicalHash(db DatabaseWriter, hash common.Hash, number uint64) erro // DeleteCanonicalHash removes the number to hash canonical mapping. func DeleteCanonicalHash(db DatabaseDeleter, number uint64) error { - if err := db.Delete(dbutils.HeaderPrefix, dbutils.HeaderHashKey(number), nil); err != nil { + if err := db.Delete(dbutils.HeaderCanonicalBucket, dbutils.EncodeBlockNumber(number), nil); err != nil { return fmt.Errorf("failed to delete number to hash mapping: %w", err) } return nil @@ -83,7 +83,7 @@ func ReadAllHashes(db databaseReader, number uint64) []common.Hash { // ReadHeaderNumber returns the header number assigned to a hash. func ReadHeaderNumber(db databaseReader, hash common.Hash) *uint64 { - data, err := db.Get(dbutils.HeaderNumberPrefix, hash.Bytes()) + data, err := db.Get(dbutils.HeaderNumberBucket, hash.Bytes()) if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { log.Error("ReadHeaderNumber failed", "err", err) } @@ -101,14 +101,14 @@ func ReadHeaderNumber(db databaseReader, hash common.Hash) *uint64 { // WriteHeaderNumber stores the hash->number mapping. func WriteHeaderNumber(db DatabaseWriter, hash common.Hash, number uint64) { enc := dbutils.EncodeBlockNumber(number) - if err := db.Put(dbutils.HeaderNumberPrefix, hash[:], enc); err != nil { + if err := db.Put(dbutils.HeaderNumberBucket, hash[:], enc); err != nil { log.Crit("Failed to store hash to number mapping", "err", err) } } // DeleteHeaderNumber removes hash->number mapping. func DeleteHeaderNumber(db DatabaseDeleter, hash common.Hash) { - if err := db.Delete(dbutils.HeaderNumberPrefix, hash[:], nil); err != nil { + if err := db.Delete(dbutils.HeaderNumberBucket, hash[:], nil); err != nil { log.Crit("Failed to delete hash to number mapping", "err", err) } } @@ -194,7 +194,7 @@ func WriteFastTrieProgress(db DatabaseWriter, count uint64) { // ReadHeaderRLP retrieves a block header in its raw RLP database encoding. func ReadHeaderRLP(db databaseReader, hash common.Hash, number uint64) rlp.RawValue { - data, err := db.Get(dbutils.HeaderPrefix, dbutils.HeaderKey(number, hash)) + data, err := db.Get(dbutils.HeadersBucket, dbutils.HeaderKey(number, hash)) if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { log.Error("ReadHeaderRLP failed", "err", err) } @@ -203,7 +203,7 @@ func ReadHeaderRLP(db databaseReader, hash common.Hash, number uint64) rlp.RawVa // HasHeader verifies the existence of a block header corresponding to the hash. func HasHeader(db databaseReader, hash common.Hash, number uint64) bool { - if has, err := db.Has(dbutils.HeaderPrefix, dbutils.HeaderKey(number, hash)); !has || err != nil { + if has, err := db.Has(dbutils.HeadersBucket, dbutils.HeaderKey(number, hash)); !has || err != nil { return false } return true @@ -234,7 +234,7 @@ func WriteHeader(ctx context.Context, db DatabaseWriter, header *types.Header) { if common.IsCanceled(ctx) { return } - if err := db.Put(dbutils.HeaderNumberPrefix, hash[:], encoded); err != nil { + if err := db.Put(dbutils.HeaderNumberBucket, hash[:], encoded); err != nil { log.Crit("Failed to store hash to number mapping", "err", err) } // Write the encoded header @@ -242,17 +242,17 @@ func WriteHeader(ctx context.Context, db DatabaseWriter, header *types.Header) { if err != nil { log.Crit("Failed to RLP encode header", "err", err) } - if err := db.Put(dbutils.HeaderPrefix, dbutils.HeaderKey(number, hash), data); err != nil { + if err := db.Put(dbutils.HeadersBucket, dbutils.HeaderKey(number, hash), data); err != nil { log.Crit("Failed to store header", "err", err) } } // DeleteHeader removes all block header data associated with a hash. func DeleteHeader(db DatabaseDeleter, hash common.Hash, number uint64) { - if err := db.Delete(dbutils.HeaderPrefix, dbutils.HeaderKey(number, hash), nil); err != nil { + if err := db.Delete(dbutils.HeadersBucket, dbutils.HeaderKey(number, hash), nil); err != nil { log.Crit("Failed to delete header", "err", err) } - if err := db.Delete(dbutils.HeaderNumberPrefix, hash.Bytes(), nil); err != nil { + if err := db.Delete(dbutils.HeaderNumberBucket, hash.Bytes(), nil); err != nil { log.Crit("Failed to delete hash to number mapping", "err", err) } } @@ -260,7 +260,7 @@ func DeleteHeader(db DatabaseDeleter, hash common.Hash, number uint64) { // deleteHeaderWithoutNumber removes only the block header but does not remove // the hash to number mapping. func deleteHeaderWithoutNumber(db DatabaseDeleter, hash common.Hash, number uint64) { - if err := db.Delete(dbutils.HeaderPrefix, dbutils.HeaderKey(number, hash), nil); err != nil { + if err := db.Delete(dbutils.HeadersBucket, dbutils.HeaderKey(number, hash), nil); err != nil { log.Crit("Failed to delete header", "err", err) } } @@ -432,7 +432,7 @@ func ReadTdRLP(db databaseReader, hash common.Hash, number uint64) rlp.RawValue //data, _ := db.Ancient(freezerDifficultyTable, number) data := []byte{} if len(data) == 0 { - data, _ = db.Get(dbutils.HeaderPrefix, dbutils.HeaderTDKey(number, hash)) + data, _ = db.Get(dbutils.HeaderTDBucket, dbutils.HeaderKey(number, hash)) // In the background freezer is moving data from leveldb to flatten files. // So during the first check for ancient db, the data is not yet in there, // but when we reach into leveldb, the data was already moved. That would @@ -446,7 +446,7 @@ func ReadTdRLP(db databaseReader, hash common.Hash, number uint64) rlp.RawValue // ReadTd retrieves a block's total difficulty corresponding to the hash. func ReadTd(db databaseReader, hash common.Hash, number uint64) (*big.Int, error) { - data, err := db.Get(dbutils.HeaderPrefix, dbutils.HeaderTDKey(number, hash)) + data, err := db.Get(dbutils.HeaderTDBucket, dbutils.HeaderKey(number, hash)) if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { return nil, fmt.Errorf("failed ReadTd: %w", err) } @@ -466,7 +466,7 @@ func WriteTd(db DatabaseWriter, hash common.Hash, number uint64, td *big.Int) er if err != nil { return fmt.Errorf("failed to RLP encode block total difficulty: %w", err) } - if err := db.Put(dbutils.HeaderPrefix, dbutils.HeaderTDKey(number, hash), data); err != nil { + if err := db.Put(dbutils.HeaderTDBucket, dbutils.HeaderKey(number, hash), data); err != nil { return fmt.Errorf("failed to store block total difficulty: %w", err) } return nil @@ -474,7 +474,7 @@ func WriteTd(db DatabaseWriter, hash common.Hash, number uint64, td *big.Int) er // DeleteTd removes all block total difficulty data associated with a hash. func DeleteTd(db DatabaseDeleter, hash common.Hash, number uint64) error { - if err := db.Delete(dbutils.HeaderPrefix, dbutils.HeaderTDKey(number, hash), nil); err != nil { + if err := db.Delete(dbutils.HeaderTDBucket, dbutils.HeaderKey(number, hash), nil); err != nil { return fmt.Errorf("failed to delete block total difficulty: %w", err) } return nil diff --git a/eth/downloader/downloader_stagedsync.go b/eth/downloader/downloader_stagedsync.go index 11009790a96..d09086bb532 100644 --- a/eth/downloader/downloader_stagedsync.go +++ b/eth/downloader/downloader_stagedsync.go @@ -44,40 +44,44 @@ func (d *Downloader) SpawnBodyDownloadStage( var hashes [N]common.Hash // Canonical hashes of the blocks var headers = make(map[common.Hash]*types.Header) // We use map because there might be more than one header by block number var hashCount = 0 - err := d.stateDB.Walk(dbutils.HeaderPrefix, dbutils.EncodeBlockNumber(currentNumber), 0, func(k, v []byte) (bool, error) { + startKey := dbutils.EncodeBlockNumber(currentNumber) + err := d.stateDB.Walk(dbutils.HeaderCanonicalBucket, startKey, 0, func(k, v []byte) (bool, error) { + // This is how we learn about canonical chain + blockNumber := binary.BigEndian.Uint64(k[:8]) + if blockNumber != currentNumber { + log.Warn(fmt.Sprintf("[%s] Canonical hash is missing", logPrefix), "number", currentNumber, "got", blockNumber) + missingHeader = currentNumber + return false, nil + } + currentNumber++ + if hashCount < len(hashes) { + copy(hashes[hashCount][:], v) + hashCount++ + } else { + return false, nil + } + return true, nil + + }) + if err != nil { + return false, fmt.Errorf("%s: walking over canonical hashes: %w", logPrefix, err) + } + + err = d.stateDB.Walk(dbutils.HeadersBucket, startKey, 0, func(k, v []byte) (bool, error) { if err := common.Stopped(d.quitCh); err != nil { return false, err } - // Skip non relevant records - if dbutils.CheckCanonicalKey(k) { - // This is how we learn about canonical chain - blockNumber := binary.BigEndian.Uint64(k[:8]) - if blockNumber != currentNumber { - log.Warn(fmt.Sprintf("[%s] Canonical hash is missing", logPrefix), "number", currentNumber, "got", blockNumber) - missingHeader = currentNumber - return false, nil - } - currentNumber++ - if hashCount < len(hashes) { - copy(hashes[hashCount][:], v) - hashCount++ - } - return true, nil - } - if len(k) != 8+common.HashLength { - return true, nil - } header := new(types.Header) if err1 := rlp.Decode(bytes.NewReader(v), header); err1 != nil { log.Error(fmt.Sprintf("[%s] Invalid block header RLP", logPrefix), "hash", k[8:], "err", err1) return false, err1 } headers[common.BytesToHash(k[8:])] = header - return hashCount < len(hashes), nil + return currentNumber > binary.BigEndian.Uint64(k[:8]), nil }) if err != nil { - return false, fmt.Errorf("%s: walking over canonical hashes: %w", logPrefix, err) + return false, fmt.Errorf("%s: walking over headers: %w", logPrefix, err) } if missingHeader != 0 { if err1 := u.UnwindTo(missingHeader, d.stateDB); err1 != nil { diff --git a/eth/stagedsync/stage_blockhashes.go b/eth/stagedsync/stage_blockhashes.go index ad5a3b9c16b..0f7259fc88e 100644 --- a/eth/stagedsync/stage_blockhashes.go +++ b/eth/stagedsync/stage_blockhashes.go @@ -49,12 +49,13 @@ func SpawnBlockHashStage(s *StageState, db ethdb.Database, tmpdir string, quit < binary.BigEndian.PutUint64(startKey, s.BlockNumber) endKey := dbutils.HeaderKey(headNumber, headHash) // Make sure we stop at head + //todo do we need non canonical headers ? logPrefix := s.state.LogPrefix() if err := etl.Transform( logPrefix, tx, - dbutils.HeaderPrefix, - dbutils.HeaderNumberPrefix, + dbutils.HeadersBucket, + dbutils.HeaderNumberBucket, tmpdir, extractHeaders, etl.IdentityLoadFunc, diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 6136c407ee5..40535cb0d33 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -219,7 +219,7 @@ Error: %v if err := rawdb.WriteTd(batch, header.Hash(), header.Number.Uint64(), td); err != nil { return false, false, 0, fmt.Errorf("[%s] Failed to WriteTd: %w", logPrefix, err) } - if err := batch.Put(dbutils.HeaderPrefix, dbutils.HeaderKey(number, header.Hash()), data); err != nil { + if err := batch.Put(dbutils.HeadersBucket, dbutils.HeaderKey(number, header.Hash()), data); err != nil { return false, false, 0, fmt.Errorf("[%s] Failed to store header: %w", logPrefix, err) } } @@ -261,7 +261,7 @@ Error: %v if newCanonical { encoded := dbutils.EncodeBlockNumber(lastHeader.Number.Uint64()) - if err := batch.Put(dbutils.HeaderNumberPrefix, lastHeader.Hash().Bytes(), encoded); err != nil { + if err := batch.Put(dbutils.HeaderNumberBucket, lastHeader.Hash().Bytes(), encoded); err != nil { return false, false, 0, fmt.Errorf("[%s] failed to store hash to number mapping: %w", logPrefix, err) } if err := rawdb.WriteHeadHeaderHash(batch, lastHeader.Hash()); err != nil { diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index 2679ff01f12..74cffe14757 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -53,16 +53,11 @@ func SpawnRecoverSendersStage(cfg Stage3Config, s *StageState, db ethdb.Database canonical := make([]common.Hash, to-s.BlockNumber) currentHeaderIdx := uint64(0) - if err := db.Walk(dbutils.HeaderPrefix, dbutils.EncodeBlockNumber(s.BlockNumber+1), 0, func(k, v []byte) (bool, error) { + if err := db.Walk(dbutils.HeaderCanonicalBucket, dbutils.EncodeBlockNumber(s.BlockNumber+1), 0, func(k, v []byte) (bool, error) { if err := common.Stopped(quitCh); err != nil { return false, err } - // Skip non relevant records - if !dbutils.CheckCanonicalKey(k) { - return true, nil - } - if currentHeaderIdx >= to-s.BlockNumber { // if header stage is ehead of body stage return false, nil } diff --git a/eth/stagedsync/stage_txlookup.go b/eth/stagedsync/stage_txlookup.go index d43e78c1760..2be64e82960 100644 --- a/eth/stagedsync/stage_txlookup.go +++ b/eth/stagedsync/stage_txlookup.go @@ -29,8 +29,8 @@ func SpawnTxLookup(s *StageState, db ethdb.Database, tmpdir string, quitCh <-cha } logPrefix := s.state.LogPrefix() - startKey = dbutils.HeaderHashKey(blockNum) - if err = TxLookupTransform(logPrefix, db, startKey, dbutils.HeaderHashKey(syncHeadNumber), quitCh, tmpdir); err != nil { + startKey = dbutils.EncodeBlockNumber(blockNum) + if err = TxLookupTransform(logPrefix, db, startKey, dbutils.EncodeBlockNumber(syncHeadNumber), quitCh, tmpdir); err != nil { return err } @@ -38,10 +38,7 @@ func SpawnTxLookup(s *StageState, db ethdb.Database, tmpdir string, quitCh <-cha } func TxLookupTransform(logPrefix string, db ethdb.Database, startKey, endKey []byte, quitCh <-chan struct{}, tmpdir string) error { - return etl.Transform(logPrefix, db, dbutils.HeaderPrefix, dbutils.TxLookupPrefix, tmpdir, func(k []byte, v []byte, next etl.ExtractNextFunc) error { - if !dbutils.CheckCanonicalKey(k) { - return nil - } + return etl.Transform(logPrefix, db, dbutils.HeaderCanonicalBucket, dbutils.TxLookupPrefix, tmpdir, func(k []byte, v []byte, next etl.ExtractNextFunc) error { blocknum := binary.BigEndian.Uint64(k) blockHash := common.BytesToHash(v) body := rawdb.ReadBody(db, blockHash, blocknum) diff --git a/eth/stagedsync/stage_txpool.go b/eth/stagedsync/stage_txpool.go index 73db270c408..975c27f8933 100644 --- a/eth/stagedsync/stage_txpool.go +++ b/eth/stagedsync/stage_txpool.go @@ -57,16 +57,11 @@ func incrementalTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPoo canonical := make([]common.Hash, to-from) currentHeaderIdx := uint64(0) - if err := db.Walk(dbutils.HeaderPrefix, dbutils.EncodeBlockNumber(from+1), 0, func(k, v []byte) (bool, error) { + if err := db.Walk(dbutils.HeaderCanonicalBucket, dbutils.EncodeBlockNumber(from+1), 0, func(k, v []byte) (bool, error) { if err := common.Stopped(quitCh); err != nil { return false, err } - // Skip non relevant records - if !dbutils.CheckCanonicalKey(k) { - return true, nil - } - if currentHeaderIdx >= to-from { // if header stage is ahead of body stage return false, nil } @@ -135,15 +130,10 @@ func unwindTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPool, db pool.ResetHead(headHeader.GasLimit, from) canonical := make([]common.Hash, to-from) - if err := db.Walk(dbutils.HeaderPrefix, dbutils.EncodeBlockNumber(from+1), 0, func(k, v []byte) (bool, error) { + if err := db.Walk(dbutils.HeaderCanonicalBucket, dbutils.EncodeBlockNumber(from+1), 0, func(k, v []byte) (bool, error) { if err := common.Stopped(quitCh); err != nil { return false, err } - - // Skip non relevant records - if !dbutils.CheckCanonicalKey(k) { - return true, nil - } blockNumber := binary.BigEndian.Uint64(k[:8]) if blockNumber > to { diff --git a/ethdb/kv_migrator_test.go b/ethdb/kv_migrator_test.go index 137832fa109..3638a9fd3bf 100644 --- a/ethdb/kv_migrator_test.go +++ b/ethdb/kv_migrator_test.go @@ -83,14 +83,14 @@ func TestReadOnlyMode(t *testing.T) { } db1 := NewLMDB().Path(path).WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return dbutils.BucketsCfg{ - dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, + dbutils.HeadersBucket: dbutils.BucketConfigItem{}, } }).MustOpen() db1.Close() db2 := NewLMDB().Flags(func(flags uint) uint { return flags | lmdb.Readonly }).Path(path).WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return dbutils.BucketsCfg{ - dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, + dbutils.HeadersBucket: dbutils.BucketConfigItem{}, } }).MustOpen() @@ -99,7 +99,7 @@ func TestReadOnlyMode(t *testing.T) { t.Fatal(err) } - c := tx.Cursor(dbutils.HeaderPrefix) + c := tx.Cursor(dbutils.HeadersBucket) _, _, err = c.Seek([]byte("some prefix")) if err != nil { t.Fatal(err) diff --git a/ethdb/kv_snapshot_test.go b/ethdb/kv_snapshot_test.go index 903c977abc7..31590eb87ad 100644 --- a/ethdb/kv_snapshot_test.go +++ b/ethdb/kv_snapshot_test.go @@ -331,11 +331,11 @@ import ( func TestSnapshot2Get(t *testing.T) { sn1 := NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return dbutils.BucketsCfg{ - dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, + dbutils.HeadersBucket: dbutils.BucketConfigItem{}, } }).InMem().MustOpen() err := sn1.Update(context.Background(), func(tx Tx) error { - bucket := tx.Cursor(dbutils.HeaderPrefix) + bucket := tx.Cursor(dbutils.HeadersBucket) innerErr := bucket.Put(dbutils.HeaderKey(1, common.Hash{1}), []byte{1}) if innerErr != nil { return innerErr @@ -375,7 +375,7 @@ func TestSnapshot2Get(t *testing.T) { mainDB := NewLMDB().InMem().MustOpen() err = mainDB.Update(context.Background(), func(tx Tx) error { - bucket := tx.Cursor(dbutils.HeaderPrefix) + bucket := tx.Cursor(dbutils.HeadersBucket) innerErr := bucket.Put(dbutils.HeaderKey(2, common.Hash{2}), []byte{22}) if innerErr != nil { return innerErr @@ -401,7 +401,7 @@ func TestSnapshot2Get(t *testing.T) { t.Fatal(err) } - kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.HeaderPrefix}, sn1). + kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.HeadersBucket}, sn1). SnapshotDB([]string{dbutils.BlockBodyPrefix}, sn2).MustOpen() tx, err := kv.Begin(context.Background(), RO) @@ -409,7 +409,7 @@ func TestSnapshot2Get(t *testing.T) { t.Fatal(err) } - v, err := tx.GetOne(dbutils.HeaderPrefix, dbutils.HeaderKey(1, common.Hash{1})) + v, err := tx.GetOne(dbutils.HeadersBucket, dbutils.HeaderKey(1, common.Hash{1})) if err != nil { t.Fatal(err) } @@ -417,7 +417,7 @@ func TestSnapshot2Get(t *testing.T) { t.Fatal(v) } - v, err = tx.GetOne(dbutils.HeaderPrefix, dbutils.HeaderKey(2, common.Hash{2})) + v, err = tx.GetOne(dbutils.HeadersBucket, dbutils.HeaderKey(2, common.Hash{2})) if err != nil { t.Fatal(err) } @@ -425,7 +425,7 @@ func TestSnapshot2Get(t *testing.T) { t.Fatal(v) } - v, err = tx.GetOne(dbutils.HeaderPrefix, dbutils.HeaderKey(3, common.Hash{3})) + v, err = tx.GetOne(dbutils.HeadersBucket, dbutils.HeaderKey(3, common.Hash{3})) if err != nil { t.Fatal(err) } @@ -457,7 +457,7 @@ func TestSnapshot2Get(t *testing.T) { t.Fatal(v) } - headerCursor := tx.Cursor(dbutils.HeaderPrefix) + headerCursor := tx.Cursor(dbutils.HeadersBucket) k, v, err := headerCursor.Last() if err != nil { t.Fatal(err) @@ -504,11 +504,11 @@ func TestSnapshot2Get(t *testing.T) { func TestSnapshot2WritableTxAndGet(t *testing.T) { sn1 := NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return dbutils.BucketsCfg{ - dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, + dbutils.HeadersBucket: dbutils.BucketConfigItem{}, } }).InMem().MustOpen() err := sn1.Update(context.Background(), func(tx Tx) error { - bucket := tx.Cursor(dbutils.HeaderPrefix) + bucket := tx.Cursor(dbutils.HeadersBucket) innerErr := bucket.Put(dbutils.HeaderKey(1, common.Hash{1}), []byte{1}) if innerErr != nil { return innerErr @@ -548,14 +548,14 @@ func TestSnapshot2WritableTxAndGet(t *testing.T) { mainDB := NewLMDB().InMem().MustOpen() - kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.HeaderPrefix}, sn1). + kv := NewSnapshot2KV().DB(mainDB).SnapshotDB([]string{dbutils.HeadersBucket}, sn1). SnapshotDB([]string{dbutils.BlockBodyPrefix}, sn2).MustOpen() tx, err := kv.Begin(context.Background(), RW) if err != nil { t.Fatal(err) } - v, err := tx.GetOne(dbutils.HeaderPrefix, dbutils.HeaderKey(1, common.Hash{1})) + v, err := tx.GetOne(dbutils.HeadersBucket, dbutils.HeaderKey(1, common.Hash{1})) if err != nil { t.Fatal(err) } @@ -575,7 +575,7 @@ func TestSnapshot2WritableTxAndGet(t *testing.T) { if err != nil { t.Fatal(err) } - err = tx.Cursor(dbutils.HeaderPrefix).Put(dbutils.HeaderKey(4, common.Hash{4}), []byte{4}) + err = tx.Cursor(dbutils.HeadersBucket).Put(dbutils.HeaderKey(4, common.Hash{4}), []byte{4}) if err != nil { t.Fatal(err) } @@ -587,7 +587,7 @@ func TestSnapshot2WritableTxAndGet(t *testing.T) { if err != nil { t.Fatal(err) } - c := tx.Cursor(dbutils.HeaderPrefix) + c := tx.Cursor(dbutils.HeadersBucket) k, v, err := c.First() if err != nil { t.Fatal(err) diff --git a/migrations/header_prefix.go b/migrations/header_prefix.go new file mode 100644 index 00000000000..7590a10306d --- /dev/null +++ b/migrations/header_prefix.go @@ -0,0 +1,166 @@ +package migrations + +import ( + "bytes" + "fmt" + "github.com/ledgerwatch/turbo-geth/common" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/common/etl" + "github.com/ledgerwatch/turbo-geth/ethdb" +) + +var headerPrefixToSeparateBuckets = Migration{ + Name: "header_prefix_to_separate_buckets", + Up: func(db ethdb.Database, tmpdir string, progress []byte, CommitProgress etl.LoadCommitHandler) (err error) { + exists, err := db.(ethdb.BucketsMigrator).BucketExists(dbutils.HeaderPrefixOld) + if err != nil { + return err + } + if !exists { + return CommitProgress(db, nil, true) + } + + if err = db.(ethdb.BucketsMigrator).ClearBuckets(dbutils.HeaderCanonicalBucket, dbutils.HeaderCanonicalBucket, dbutils.HeaderTDBucket); err != nil { + return err + } + logPrefix := "split_header_prefix_bucket" + const loadStep = "load" + + canonicalCollector, err := etl.NewCollectorFromFiles(tmpdir + "canonical") + if err != nil { + return err + } + tdCollector, err := etl.NewCollectorFromFiles(tmpdir + "td") + if err != nil { + return err + } + headersCollector, err := etl.NewCollectorFromFiles(tmpdir + "headers") + if err != nil { + return err + } + + switch string(progress) { + case "": + // can't use files if progress field not set, clear them + if canonicalCollector != nil { + canonicalCollector.Close(logPrefix) + canonicalCollector = nil + } + + if tdCollector != nil { + tdCollector.Close(logPrefix) + tdCollector = nil + } + if headersCollector != nil { + headersCollector.Close(logPrefix) + headersCollector = nil + } + case loadStep: + if headersCollector == nil || canonicalCollector == nil || tdCollector == nil { + return ErrMigrationETLFilesDeleted + } + defer func() { + // don't clean if error or panic happened + if err != nil { + return + } + if rec := recover(); rec != nil { + panic(rec) + } + canonicalCollector.Close(logPrefix) + tdCollector.Close(logPrefix) + headersCollector.Close(logPrefix) + }() + goto LoadStep + } + + canonicalCollector = etl.NewCriticalCollector(tmpdir+"canonical", etl.NewSortableBuffer(etl.BufferOptimalSize*4)) + tdCollector = etl.NewCriticalCollector(tmpdir+"td", etl.NewSortableBuffer(etl.BufferOptimalSize*4)) + headersCollector = etl.NewCriticalCollector(tmpdir+"headers", etl.NewSortableBuffer(etl.BufferOptimalSize*4)) + defer func() { + // don't clean if error or panic happened + if err != nil { + return + } + if rec := recover(); rec != nil { + panic(rec) + } + canonicalCollector.Close(logPrefix) + tdCollector.Close(logPrefix) + headersCollector.Close(logPrefix) + }() + + err = db.Walk(dbutils.HeaderPrefixOld, []byte{}, 0, func(k, v []byte) (bool, error) { + var innerErr error + switch { + case IsHeaderKey(k): + innerErr = headersCollector.Collect(k, v) + case IsHeaderTDKey(k): + innerErr = tdCollector.Collect(bytes.TrimSuffix(k, HeaderTDSuffix), v) + case IsHeaderHashKey(k): + innerErr = canonicalCollector.Collect(bytes.TrimSuffix(k, HeaderHashSuffix), v) + default: + return false, fmt.Errorf("incorrect header prefix key: %v", common.Bytes2Hex(k)) + } + if innerErr != nil { + return false, innerErr + } + return true, nil + }) + if err = db.(ethdb.BucketsMigrator).DropBuckets(dbutils.HeaderPrefixOld); err != nil { + return err + } + + LoadStep: + // Now transaction would have been re-opened, and we should be re-using the space + if err = canonicalCollector.Load(logPrefix, db, dbutils.HeaderCanonicalBucket, etl.IdentityLoadFunc, etl.TransformArgs{ + OnLoadCommit: CommitProgress, + }); err != nil { + return fmt.Errorf("loading the transformed data back into the storage table: %w", err) + } + if err = tdCollector.Load(logPrefix, db, dbutils.HeaderTDBucket, etl.IdentityLoadFunc, etl.TransformArgs{ + OnLoadCommit: CommitProgress, + }); err != nil { + return fmt.Errorf("loading the transformed data back into the acc table: %w", err) + } + if err = headersCollector.Load(logPrefix, db, dbutils.HeadersBucket, etl.IdentityLoadFunc, etl.TransformArgs{ + OnLoadCommit: CommitProgress, + }); err != nil { + return fmt.Errorf("loading the transformed data back into the acc table: %w", err) + } + return nil + }, +} + +func IsHeaderKey(k []byte) bool { + l := common.BlockNumberLength + common.HashLength + if len(k) != l { + return false + } + + return !IsHeaderHashKey(k) && !IsHeaderTDKey(k) +} + +func IsHeaderTDKey(k []byte) bool { + l := common.BlockNumberLength + common.HashLength + 1 + return len(k) == l && bytes.Equal(k[l-1:], HeaderTDSuffix) +} + +// headerHashKey = headerPrefix + num (uint64 big endian) + headerHashSuffix +func HeaderHashKey(number uint64) []byte { + return append(dbutils.EncodeBlockNumber(number), HeaderHashSuffix...) +} + +func CheckCanonicalKey(k []byte) bool { + return len(k) == 8+len(HeaderHashSuffix) && bytes.Equal(k[8:], HeaderHashSuffix) +} + +func IsHeaderHashKey(k []byte) bool { + l := common.BlockNumberLength + 1 + return len(k) == l && bytes.Equal(k[l-1:], HeaderHashSuffix) +} + +var ( + HeaderTDSuffix = []byte("t") // block_num_u64 + hash + headerTDSuffix -> td + HeaderHashSuffix = []byte("n") // block_num_u64 + headerHashSuffix -> hash +) diff --git a/migrations/header_prefix_test.go b/migrations/header_prefix_test.go new file mode 100644 index 00000000000..586bc4e2a24 --- /dev/null +++ b/migrations/header_prefix_test.go @@ -0,0 +1,110 @@ +package migrations + +import ( + "bytes" + "context" + "encoding/binary" + "github.com/ledgerwatch/turbo-geth/common" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "os" + "strconv" + "testing" +) + +func TestHeaderPrefix(t *testing.T) { + require := require.New(t) + db := ethdb.NewMemDatabase() + + err := db.KV().Update(context.Background(), func(tx ethdb.Tx) error { + err := tx.(ethdb.BucketMigrator).CreateBucket(dbutils.HeaderPrefixOld) + if err != nil { + return err + } + c := tx.Cursor(dbutils.HeaderPrefixOld) + for i := uint64(0); i < 10; i++ { + //header + err = c.Put(dbutils.HeaderKey(i, common.Hash{uint8(i)}), []byte("header "+strconv.Itoa(int(i)))) + require.NoError(err) + //canonical + err = c.Put(HeaderHashKey(i), common.Hash{uint8(i)}.Bytes()) + require.NoError(err) + err = c.Put(append(dbutils.HeaderKey(i, common.Hash{uint8(i)}), HeaderTDSuffix...), []byte{uint8(i)}) + require.NoError(err) + } + return nil + }) + require.NoError(err) + + migrator := NewMigrator() + migrator.Migrations = []Migration{headerPrefixToSeparateBuckets} + err = migrator.Apply(db, os.TempDir()) + require.NoError(err) + + num := 0 + err = db.Walk(dbutils.HeaderCanonicalBucket, []byte{}, 0, func(k, v []byte) (bool, error) { + require.Len(k, 8) + bytes.Equal(v, common.Hash{uint8(binary.BigEndian.Uint64(k))}.Bytes()) + num++ + return true, nil + }) + require.NoError(err) + require.Equal(num, 10) + + num = 0 + err = db.Walk(dbutils.HeaderTDBucket, []byte{}, 0, func(k, v []byte) (bool, error) { + require.Len(k, 40) + bytes.Equal(v, []byte{uint8(binary.BigEndian.Uint64(k))}) + num++ + return true, nil + }) + require.NoError(err) + require.Equal(num, 10) + + num = 0 + err = db.Walk(dbutils.HeadersBucket, []byte{}, 0, func(k, v []byte) (bool, error) { + require.Len(k, 40) + bytes.Equal(v, []byte("header "+strconv.Itoa(int(binary.BigEndian.Uint64(k))))) + num++ + return true, nil + }) + require.NoError(err) + require.Equal(num, 10) + +} + +func TestHeaderTypeDetection(t *testing.T) { + // good input + headerHashKey := common.Hex2Bytes("00000000000000006e") + assert.False(t, IsHeaderKey(headerHashKey)) + assert.False(t, IsHeaderTDKey(headerHashKey)) + assert.True(t, IsHeaderHashKey(headerHashKey)) + + headerKey := common.Hex2Bytes("0000000000004321ed7240d411782ae438adfd85f7edad373cea722318c6e7f5f5b30f9abc9b36fd") + assert.True(t, IsHeaderKey(headerKey)) + assert.False(t, IsHeaderTDKey(headerKey)) + assert.False(t, IsHeaderHashKey(headerKey)) + + headerTdKey := common.Hex2Bytes("0000000000004321ed7240d411782ae438adfd85f7edad373cea722318c6e7f5f5b30f9abc9b36fd74") + assert.False(t, IsHeaderKey(headerTdKey)) + assert.True(t, IsHeaderTDKey(headerTdKey)) + assert.False(t, IsHeaderHashKey(headerTdKey)) + + // bad input + emptyKey := common.Hex2Bytes("") + assert.False(t, IsHeaderKey(emptyKey)) + assert.False(t, IsHeaderTDKey(emptyKey)) + assert.False(t, IsHeaderHashKey(emptyKey)) + + tooLongKey := common.Hex2Bytes("0000000000004321ed7240d411782ae438adfd85f7edad373cea722318c6e7f5f5b30f9abc9b36fd0000000000004321ed7240d411782ae438adfd85f7edad373cea722318c6e7f5f5b30f9abc9b36fd0000000000004321ed7240d411782ae438adfd85f7edad373cea722318c6e7f5f5b30f9abc9b36fd0000000000004321ed7240d411782ae438adfd85f7edad373cea722318c6e7f5f5b30f9abc9b36fd") + assert.False(t, IsHeaderKey(tooLongKey)) + assert.False(t, IsHeaderTDKey(tooLongKey)) + assert.False(t, IsHeaderHashKey(tooLongKey)) + + notRelatedInput := common.Hex2Bytes("alex") + assert.False(t, IsHeaderKey(notRelatedInput)) + assert.False(t, IsHeaderTDKey(notRelatedInput)) + assert.False(t, IsHeaderHashKey(notRelatedInput)) +} diff --git a/migrations/migrations.go b/migrations/migrations.go index 99ecc585430..f529e1e09f3 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -74,6 +74,7 @@ var migrations = []Migration{ splitHashStateBucket, splitIHBucket, deleteExtensionHashesFromTrieBucket, + headerPrefixToSeparateBuckets, } type Migration struct { diff --git a/turbo/snapshotsync/postprocessing.go b/turbo/snapshotsync/postprocessing.go index 35ea8b091c5..3fea2c3a9a6 100644 --- a/turbo/snapshotsync/postprocessing.go +++ b/turbo/snapshotsync/postprocessing.go @@ -116,10 +116,7 @@ func GenerateHeaderIndexes(ctx context.Context, db ethdb.Database) error { headNumber := big.NewInt(0).SetBytes(headNumberBytes).Uint64() headHash := common.BytesToHash(headHashBytes) - innerErr = etl.Transform("Torrent post-processing 1", db, dbutils.HeaderPrefix, dbutils.HeaderNumberPrefix, os.TempDir(), func(k []byte, v []byte, next etl.ExtractNextFunc) error { - if len(k) != 8+common.HashLength { - return nil - } + innerErr = etl.Transform("Torrent post-processing 1", db, dbutils.HeadersBucket, dbutils.HeaderNumberBucket, os.TempDir(), func(k []byte, v []byte, next etl.ExtractNextFunc) error { return next(k, common.CopyBytes(k[8:]), common.CopyBytes(k[:8])) }, etl.IdentityLoadFunc, etl.TransformArgs{ Quit: ctx.Done(), @@ -145,10 +142,7 @@ func GenerateHeaderIndexes(ctx context.Context, db ethdb.Database) error { td := h.Difficulty log.Info("Generate TD index & canonical") - err = etl.Transform("Torrent post-processing 2", db, dbutils.HeaderPrefix, dbutils.HeaderPrefix, os.TempDir(), func(k []byte, v []byte, next etl.ExtractNextFunc) error { - if len(k) != 8+common.HashLength { - return nil - } + err = etl.Transform("Torrent post-processing 2", db, dbutils.HeadersBucket, dbutils.HeaderTDBucket, os.TempDir(), func(k []byte, v []byte, next etl.ExtractNextFunc) error { header := &types.Header{} innerErr := rlp.DecodeBytes(v, header) if innerErr != nil { @@ -162,13 +156,16 @@ func GenerateHeaderIndexes(ctx context.Context, db ethdb.Database) error { return innerErr } - innerErr = next(k, dbutils.HeaderTDKey(header.Number.Uint64(), header.Hash()), tdBytes) - if innerErr != nil { - return innerErr - } - - //canonical - return next(k, dbutils.HeaderHashKey(header.Number.Uint64()), header.Hash().Bytes()) + return next(k, dbutils.HeaderKey(header.Number.Uint64(), header.Hash()), tdBytes) + }, etl.IdentityLoadFunc, etl.TransformArgs{ + Quit: ctx.Done(), + }) + if err != nil { + return err + } + log.Info("Generate TD index & canonical") + err = etl.Transform("Torrent post-processing 2", db, dbutils.HeadersBucket, dbutils.HeaderCanonicalBucket, os.TempDir(), func(k []byte, v []byte, next etl.ExtractNextFunc) error { + return next(k, common.CopyBytes(k[:8]), common.CopyBytes(k[8:])) }, etl.IdentityLoadFunc, etl.TransformArgs{ Quit: ctx.Done(), OnLoadCommit: func(db ethdb.Putter, key []byte, isDone bool) error { @@ -193,6 +190,7 @@ func GenerateHeaderIndexes(ctx context.Context, db ethdb.Database) error { if err != nil { return err } + log.Info("Last processed block", "num", number, "hash", hash.String()) } diff --git a/turbo/snapshotsync/postprocessing_test.go b/turbo/snapshotsync/postprocessing_test.go index 6aa6ee8945b..0dfaf1989fa 100644 --- a/turbo/snapshotsync/postprocessing_test.go +++ b/turbo/snapshotsync/postprocessing_test.go @@ -25,7 +25,7 @@ func TestHeadersGenerateIndex(t *testing.T) { if innerErr != nil { panic(innerErr) } - innerErr = tx.Cursor(dbutils.HeaderPrefix).Put(dbutils.HeaderKey(header.Number.Uint64(), header.Hash()), headerBytes) + innerErr = tx.Cursor(dbutils.HeadersBucket).Put(dbutils.HeaderKey(header.Number.Uint64(), header.Hash()), headerBytes) if innerErr != nil { panic(innerErr) } @@ -55,7 +55,7 @@ func TestHeadersGenerateIndex(t *testing.T) { } snKV := ethdb.NewLMDB().Path(snPath).Flags(func(flags uint) uint { return flags | lmdb.Readonly }).WithBucketsConfig(ethdb.DefaultBucketConfigs).MustOpen() - snKV = ethdb.NewSnapshot2KV().SnapshotDB([]string{dbutils.HeadersSnapshotInfoBucket, dbutils.HeaderPrefix}, snKV).DB(db).MustOpen() + snKV = ethdb.NewSnapshot2KV().SnapshotDB([]string{dbutils.HeadersSnapshotInfoBucket, dbutils.HeadersBucket}, snKV).DB(db).MustOpen() err = GenerateHeaderIndexes(context.Background(), ethdb.NewObjectDatabase(snKV)) if err != nil { t.Fatal(err) diff --git a/turbo/snapshotsync/wrapdb.go b/turbo/snapshotsync/wrapdb.go index b3a502472ad..754f30a802b 100644 --- a/turbo/snapshotsync/wrapdb.go +++ b/turbo/snapshotsync/wrapdb.go @@ -14,7 +14,7 @@ var ( dbutils.BodiesSnapshotInfoBucket: dbutils.BucketConfigItem{}, }, SnapshotType_headers: { - dbutils.HeaderPrefix: dbutils.BucketConfigItem{}, + dbutils.HeadersBucket: dbutils.BucketConfigItem{}, dbutils.HeadersSnapshotInfoBucket: dbutils.BucketConfigItem{}, }, SnapshotType_state: { @@ -55,7 +55,7 @@ func WrapBySnapshotsFromDir(kv ethdb.KV, snapshotDir string, mode SnapshotMode) log.Error("Can't open headers snapshot", "err", err) return nil, err } else { //nolint - snkv.SnapshotDB([]string{dbutils.HeaderPrefix, dbutils.HeadersSnapshotInfoBucket}, snapshotKV) + snkv.SnapshotDB([]string{dbutils.HeadersBucket, dbutils.HeadersSnapshotInfoBucket}, snapshotKV) } } if mode.State { diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 45298286c3b..bc8bb8952f0 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -741,7 +741,7 @@ func ReadFilesAndBuffer(files []string, headerBuf *HeaderBuffer, hf func(header func (hd *HeaderDownload) RecoverFromDb(db ethdb.Database, currentTime uint64) error { var anchor *Anchor err := db.(ethdb.HasKV).KV().View(context.Background(), func(tx ethdb.Tx) error { - c := tx.Cursor(dbutils.HeaderPrefix) + c := tx.Cursor(dbutils.HeadersBucket) var anchorH types.Header // Take first header (with the lowest height) as the anchor for k, v, err := c.First(); k != nil; k, v, err = c.Next() { @@ -1216,7 +1216,7 @@ func (hi *HeaderInserter) FeedHeader(header *types.Header, blockHeight uint64) e if err = rawdb.WriteTd(hi.batch, hash, blockHeight, td); err != nil { return fmt.Errorf("[%s] failed to WriteTd: %w", hi.logPrefix, err) } - if err = hi.batch.Put(dbutils.HeaderPrefix, dbutils.HeaderKey(blockHeight, hash), data); err != nil { + if err = hi.batch.Put(dbutils.HeadersBucket, dbutils.HeaderKey(blockHeight, hash), data); err != nil { return fmt.Errorf("[%s] failed to store header: %w", hi.logPrefix, err) } if blockHeight > hi.headerProgress {