Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support blob storage & miscs; #2229

Merged
merged 12 commits into from
Mar 4, 2024
1 change: 1 addition & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,5 @@ type PoSA interface {
GetFinalizedHeader(chain ChainHeaderReader, header *types.Header) *types.Header
VerifyVote(chain ChainHeaderReader, vote *types.VoteEnvelope) error
IsActiveValidatorAt(chain ChainHeaderReader, header *types.Header, checkVoteKeyFn func(bLSPublicKey *types.BLSPublicKey) bool) bool
IsDataAvailable(chain ChainHeaderReader, block *types.Block, blobs types.BlobTxSidecars) error
}
31 changes: 24 additions & 7 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ func (p *Parlia) VerifyHeaders(chain consensus.ChainHeaderReader, headers []*typ
return abort, results
}

// IsDataAvailable it checks that the blobTx block has available blob data
// TODO(GalaIO): implement it later
func (p *Parlia) IsDataAvailable(chain consensus.ChainHeaderReader, block *types.Block, blobs types.BlobTxSidecars) error {
return nil
}

// getValidatorBytesFromHeader returns the validators bytes extracted from the header's extra field if exists.
// The validators bytes would be contained only in the epoch block's header, and its each validator bytes length is fixed.
// On luban fork, we introduce vote attestation into the header's extra field, so extra format is different from before.
Expand Down Expand Up @@ -589,13 +595,24 @@ func (p *Parlia) verifyHeader(chain consensus.ChainHeaderReader, header *types.H
}
// Verify the existence / non-existence of excessBlobGas
cancun := chain.Config().IsCancun(header.Number, header.Time)
if !cancun && header.ExcessBlobGas != nil {
return fmt.Errorf("invalid excessBlobGas: have %d, expected nil", header.ExcessBlobGas)
}
if !cancun && header.BlobGasUsed != nil {
return fmt.Errorf("invalid blobGasUsed: have %d, expected nil", header.BlobGasUsed)
}
if cancun {
if !cancun {
switch {
case header.ExcessBlobGas != nil:
return fmt.Errorf("invalid excessBlobGas: have %d, expected nil", header.ExcessBlobGas)
case header.BlobGasUsed != nil:
return fmt.Errorf("invalid blobGasUsed: have %d, expected nil", header.BlobGasUsed)
case header.ParentBeaconRoot != nil:
return fmt.Errorf("invalid parentBeaconRoot, have %#x, expected nil", header.ParentBeaconRoot)
case header.WithdrawalsHash != nil:
return fmt.Errorf("invalid WithdrawalsHash, have %#x, expected nil", header.WithdrawalsHash)
}
} else {
switch {
case header.ParentBeaconRoot != nil:
return fmt.Errorf("invalid parentBeaconRoot, have %#x, expected nil", header.ParentBeaconRoot)
case *header.WithdrawalsHash != common.Hash{}:
return errors.New("header has wrong WithdrawalsHash")
}
if err := eip4844.VerifyEIP4844Header(parent, header); err != nil {
return err
}
Expand Down
50 changes: 50 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ const (
blockCacheLimit = 256
diffLayerCacheLimit = 1024
receiptsCacheLimit = 10000
blobsCacheLimit = 10000
txLookupCacheLimit = 1024
maxBadBlockLimit = 16
maxFutureBlocks = 256
Expand Down Expand Up @@ -277,12 +278,16 @@ type BlockChain struct {
receiptsCache *lru.Cache[common.Hash, []*types.Receipt]
blockCache *lru.Cache[common.Hash, *types.Block]
txLookupCache *lru.Cache[common.Hash, txLookup]
blobsCache *lru.Cache[common.Hash, types.BlobTxSidecars]

// future blocks are blocks added for later processing
futureBlocks *lru.Cache[common.Hash, *types.Block]
// Cache for the blocks that failed to pass MPT root verification
badBlockCache *lru.Cache[common.Hash, time.Time]

// blobs
receivedBlobsCache sync.Map // it saves received blobs for validation & storage
galaio marked this conversation as resolved.
Show resolved Hide resolved

// trusted diff layers
diffLayerCache *exlru.Cache // Cache for the diffLayers
diffLayerChanCache *exlru.Cache // Cache for the difflayer channel
Expand Down Expand Up @@ -358,6 +363,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blobsCache: lru.NewCache[common.Hash, types.BlobTxSidecars](blobsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
Expand Down Expand Up @@ -645,6 +651,11 @@ func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer, diffLayerCh cha

func (bc *BlockChain) cacheBlock(hash common.Hash, block *types.Block) {
bc.blockCache.Add(hash, block)
// try cache blob too
blob, ok := bc.receivedBlobsCache.Load(hash)
if ok {
bc.blobsCache.Add(hash, blob.(types.BlobTxSidecars))
}
}

// empty returns an indicator whether the blockchain is empty.
Expand Down Expand Up @@ -989,6 +1000,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
bc.receiptsCache.Purge()
bc.blobsCache.Purge()
bc.blockCache.Purge()
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()
Expand Down Expand Up @@ -1376,6 +1388,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [

// Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64())
// TODO(GalaIO): when sync the history block, it needs store blobs too.
//if isCancun() {
// writeSize, err := rawdb.WriteAncientBlocksWithBlobs(bc.db, blockChain, receiptChain, td, blobs)
//}
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
if err != nil {
log.Error("Error importing chain data to ancients", "err", err)
Expand Down Expand Up @@ -1454,6 +1470,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Write all the data out into the database
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
// TODO(GalaIO): if enable cancun, need write blobs
//if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
// rawdb.WriteBlobs(batch, block.Hash(), block.NumberU64(), blobs)
//}

// Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts)
Expand Down Expand Up @@ -1523,6 +1543,12 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
batch := bc.db.NewBatch()
rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td)
rawdb.WriteBlock(batch, block)
// if enable cancun, it needs to write blobs too
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
blobs, _ := bc.receivedBlobsCache.Load(block.Hash())
rawdb.WriteBlobs(batch, block.Hash(), block.NumberU64(), blobs.(types.BlobTxSidecars))
bc.receivedBlobsCache.Delete(block.Hash())
}
if err := batch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}
Expand Down Expand Up @@ -1565,6 +1591,16 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
rawdb.WriteBlock(blockBatch, block)
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
// if enable cancun, it needs to write blobs too
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
blobs, exist := bc.receivedBlobsCache.Load(block.Hash())
if exist {
rawdb.WriteBlobs(blockBatch, block.Hash(), block.NumberU64(), blobs.(types.BlobTxSidecars))
} else {
rawdb.WriteBlobs(blockBatch, block.Hash(), block.NumberU64(), nil)
bc.receivedBlobsCache.Delete(block.Hash())
}
}
rawdb.WritePreimages(blockBatch, state.Preimages())
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
Expand Down Expand Up @@ -1802,6 +1838,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// racey behaviour. If a sidechain import is in progress, and the historic state
// is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again
// TODO(GalaIO): if enable cancun, it must set received blob cache for check, remove cache when failed
func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) {
// If the chain is terminating, don't even bother starting up.
if bc.insertStopped() {
Expand Down Expand Up @@ -1928,6 +1965,19 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}

for ; block != nil && err == nil || errors.Is(err, ErrKnownBlock); block, err = it.next() {
// TODO(GalaIO): check blob data available first
//if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
// if posa, ok := bc.engine.(consensus.PoSA); ok {
// blobs, exist := bc.receivedBlobsCache.Load(block.Hash())
// if !exist {
// return it.index, fmt.Errorf("cannot find the target block's blob info, block: %v, hash: %v", block.NumberU64(), block.Hash())
// }
// if err = posa.IsDataAvailable(bc, block, blobs.(types.BlobTxSidecars)); err != nil {
// return it.index, err
// }
// }
//}

// If the chain is terminating, stop processing blocks
if bc.insertStopped() {
log.Debug("Abort during block processing")
Expand Down
17 changes: 17 additions & 0 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,23 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
return receipts
}

// GetBlobsByHash retrieves the blobs for all transactions in a given block.
func (bc *BlockChain) GetBlobsByHash(hash common.Hash) types.BlobTxSidecars {
if blobs, ok := bc.blobsCache.Get(hash); ok {
return blobs
}
number := rawdb.ReadHeaderNumber(bc.db, hash)
if number == nil {
return nil
}
blobs := rawdb.ReadRawBlobs(bc.db, hash, *number)
if blobs == nil {
return nil
}
bc.blobsCache.Add(hash, blobs)
return blobs
}

// GetUnclesInChain retrieves all the uncles from a given block backwards until
// a specific distance is reached.
func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {
Expand Down
3 changes: 3 additions & 0 deletions core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,9 @@ func (cm *chainMaker) makeHeader(parent *types.Block, state *state.StateDB, engi
header.ExcessBlobGas = &excessBlobGas
header.BlobGasUsed = new(uint64)
header.ParentBeaconRoot = new(common.Hash)
if cm.config.Parlia != nil {
header.WithdrawalsHash = new(common.Hash)
}
}
return header
}
Expand Down
3 changes: 3 additions & 0 deletions core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,9 @@ func (g *Genesis) ToBlock() *types.Block {
if head.BlobGasUsed == nil {
head.BlobGasUsed = new(uint64)
}
if conf.Parlia != nil {
head.WithdrawalsHash = new(common.Hash)
}
}
}
return types.NewBlock(head, nil, nil, nil, trie.NewStackTrie(nil)).WithWithdrawals(withdrawals)
Expand Down
128 changes: 128 additions & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,20 @@ func ReadHeader(db ethdb.Reader, hash common.Hash, number uint64) *types.Header
return header
}

// ReadHeaderAndRaw retrieves the block header corresponding to the hash.
func ReadHeaderAndRaw(db ethdb.Reader, hash common.Hash, number uint64) (*types.Header, rlp.RawValue) {
data := ReadHeaderRLP(db, hash, number)
if len(data) == 0 {
return nil, nil
}
header := new(types.Header)
if err := rlp.DecodeBytes(data, header); err != nil {
log.Error("Invalid block header RLP", "hash", hash, "err", err)
return nil, nil
}
return header, data
}

// WriteHeader stores a block header into the database and also stores the hash-
// to-number mapping.
func WriteHeader(db ethdb.KeyValueWriter, header *types.Header) {
Expand Down Expand Up @@ -809,6 +823,98 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts
})
}

// WriteAncientBlocksWithBlobs writes entire block data into ancient store and returns the total written size.
// Attention: The caller must set blobs after cancun
func WriteAncientBlocksWithBlobs(db ethdb.AncientStore, blocks []*types.Block, receipts []types.Receipts, td *big.Int, blobs []types.BlobTxSidecars) (int64, error) {
if len(blocks) == 0 {
return 0, nil
}

// do some sanity check
if len(blocks) != len(blobs) {
return 0, fmt.Errorf("the blobs len is different with blobks, %v:%v", len(blobs), len(blocks))
galaio marked this conversation as resolved.
Show resolved Hide resolved
}
if len(blocks) != len(receipts) {
return 0, fmt.Errorf("the receipts len is different with blobks, %v:%v", len(receipts), len(blocks))
}
// try reset empty blob ancient table
if err := ResetEmptyBlobAncientTable(db, blocks[0].NumberU64()); err != nil {
return 0, err
}

var (
tdSum = new(big.Int).Set(td)
stReceipts []*types.ReceiptForStorage
)
return db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i, block := range blocks {
// Convert receipts to storage format and sum up total difficulty.
stReceipts = stReceipts[:0]
for _, receipt := range receipts[i] {
stReceipts = append(stReceipts, (*types.ReceiptForStorage)(receipt))
}
header := block.Header()
if i > 0 {
tdSum.Add(tdSum, header.Difficulty)
}
if err := writeAncientBlockWithBlob(op, block, header, stReceipts, tdSum, blobs[i]); err != nil {
return err
}
}
return nil
})
}

// ReadBlobsRLP retrieves all the transaction blobs belonging to a block in RLP encoding.
func ReadBlobsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
var data []byte
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(ChainFreezerBlobTable, number)
return nil
}
// If not, try reading from leveldb
data, _ = db.Get(blockBlobsKey(number, hash))
emailtovamos marked this conversation as resolved.
Show resolved Hide resolved
return nil
})
return data
}

// ReadRawBlobs retrieves all the transaction blobs belonging to a block.
func ReadRawBlobs(db ethdb.Reader, hash common.Hash, number uint64) types.BlobTxSidecars {
data := ReadBlobsRLP(db, hash, number)
if len(data) == 0 {
return nil
}
var ret types.BlobTxSidecars
if err := rlp.DecodeBytes(data, &ret); err != nil {
log.Error("Invalid blob array RLP", "hash", hash, "err", err)
return nil
}
return ret
}

// WriteBlobs stores all the transaction blobs belonging to a block.
// It could input nil for empty blobs.
func WriteBlobs(db ethdb.KeyValueWriter, hash common.Hash, number uint64, blobs types.BlobTxSidecars) {
emailtovamos marked this conversation as resolved.
Show resolved Hide resolved
data, err := rlp.EncodeToBytes(blobs)
if err != nil {
emailtovamos marked this conversation as resolved.
Show resolved Hide resolved
log.Crit("Failed to encode block blobs", "err", err)
}
// Store the flattened receipt slice
if err := db.Put(blockBlobsKey(number, hash), data); err != nil {
log.Crit("Failed to store block blobs", "err", err)
}
}

// DeleteBlobs removes all blob data associated with a block hash.
func DeleteBlobs(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(blockBlobsKey(number, hash)); err != nil {
log.Crit("Failed to delete block blobs", "err", err)
emailtovamos marked this conversation as resolved.
Show resolved Hide resolved
}
}

func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int) error {
num := block.NumberU64()
if err := op.AppendRaw(ChainFreezerHashTable, num, block.Hash().Bytes()); err != nil {
Expand All @@ -829,12 +935,33 @@ func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *type
return nil
}

func writeAncientBlob(op ethdb.AncientWriteOp, num uint64, blobs types.BlobTxSidecars) error {
if err := op.Append(ChainFreezerBlobTable, num, blobs); err != nil {
return fmt.Errorf("can't append block %d blobs: %v", num, err)
}
return nil
}

// writeAncientBlockWithBlob writes entire block data into ancient store and returns the total written size.
// Attention: The caller must set blobs after cancun
func writeAncientBlockWithBlob(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int, blobs types.BlobTxSidecars) error {
num := block.NumberU64()
if err := writeAncientBlock(op, block, header, receipts, td); err != nil {
return err
}
if err := writeAncientBlob(op, num, blobs); err != nil {
return err
}
return nil
}

// DeleteBlock removes all block data associated with a hash.
func DeleteBlock(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
DeleteReceipts(db, hash, number)
DeleteHeader(db, hash, number)
DeleteBody(db, hash, number)
DeleteTd(db, hash, number)
DeleteBlobs(db, hash, number) // it is safe to delete non-exist blob
}

// DeleteBlockWithoutNumber removes all block data associated with a hash, except
Expand All @@ -844,6 +971,7 @@ func DeleteBlockWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number
deleteHeaderWithoutNumber(db, hash, number)
DeleteBody(db, hash, number)
DeleteTd(db, hash, number)
DeleteBlobs(db, hash, number)
}

const badBlockToKeep = 10
Expand Down
Loading
Loading