Skip to content

Commit

Permalink
add im state root to data stream (#2893) (#2895)
Browse files Browse the repository at this point in the history
* add im state root to data stream

* fix test

* remove log

* optimization

* fix config

* init map

* add wg

* add logs

* add logs

* add logs

* add logs

* add logs

* add logs

* fix
  • Loading branch information
ToniRamirezM authored Dec 12, 2023
1 parent 5576e50 commit 9ce36bf
Show file tree
Hide file tree
Showing 18 changed files with 251 additions and 57 deletions.
2 changes: 1 addition & 1 deletion sequencer/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context) (*Batch, error)

batch, err := f.openNewWIPBatch(ctx, lastBatchNumber+1, f.currentGERHash, stateRoot)

// Substract the bytes needed to store the changeL2Block tx into the new batch
// Subtract the bytes needed to store the changeL2Block tx into the new batch
batch.remainingResources.Bytes = batch.remainingResources.Bytes - changeL2BlockSize

return batch, err
Expand Down
2 changes: 2 additions & 0 deletions sequencer/closingsignalsmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (c *closingSignalsManager) Start() {
//go c.checkGERUpdate() //TODO: delete this go func and all GER related data and funcs
}

/*
func (c *closingSignalsManager) checkGERUpdate() {
lastBatch, err := c.dbManager.GetLastBatch(c.ctx)
for err != nil {
Expand Down Expand Up @@ -60,6 +61,7 @@ func (c *closingSignalsManager) checkGERUpdate() {
}
}
}
*/

func (c *closingSignalsManager) checkForcedBatches() {
for {
Expand Down
20 changes: 17 additions & 3 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ func (d *dbManager) sendDataToStreamer() {
}

for _, l2Transaction := range l2Transactions {
// Populate intermediate state root
position := state.GetSystemSCPosition(blockStart.L2BlockNumber)
imStateRoot, err := d.GetStorageAt(context.Background(), common.HexToAddress(state.SystemSC), big.NewInt(0).SetBytes(position), l2Block.StateRoot)
if err != nil {
log.Errorf("failed to get storage at for l2block %v: %v", l2Block.L2BlockNumber, err)
}
l2Transaction.StateRoot = common.BigToHash(imStateRoot)

_, err = d.streamServer.AddStreamEntry(state.EntryTypeL2Tx, l2Transaction.Encode())
if err != nil {
log.Errorf("failed to add l2tx stream entry for l2block %v: %v", l2Block.L2BlockNumber, err)
Expand Down Expand Up @@ -268,11 +276,11 @@ func (d *dbManager) BuildChangeL2Block(deltaTimestamp uint32, l1InfoTreeIndex ui
// changeL2Block transaction mark
changeL2Block = append(changeL2Block, changeL2BlockMark...)
// changeL2Block deltaTimeStamp
deltaTimestampBytes := make([]byte, 4)
deltaTimestampBytes := make([]byte, 4) // nolint:gomnd
binary.BigEndian.PutUint32(deltaTimestampBytes, deltaTimestamp)
changeL2Block = append(changeL2Block, deltaTimestampBytes...)
// changeL2Block l1InfoTreeIndexBytes
l1InfoTreeIndexBytes := make([]byte, 4)
l1InfoTreeIndexBytes := make([]byte, 4) // nolint:gomnd
binary.BigEndian.PutUint32(l1InfoTreeIndexBytes, uint32(l1InfoTreeIndex))
changeL2Block = append(changeL2Block, l1InfoTreeIndexBytes...)

Expand Down Expand Up @@ -321,7 +329,8 @@ func (d *dbManager) StoreL2Block(ctx context.Context, batchNumber uint64, l2Bloc
continue
}

transactions = append(transactions, &txResponse.Tx)
tx := txResponse.Tx
transactions = append(transactions, &tx)

storeTxsEGPData = append(storeTxsEGPData, state.StoreTxEGPData{EGPLog: nil, EffectivePercentage: uint8(txResponse.EffectivePercentage)})
if txsEGPLog != nil {
Expand Down Expand Up @@ -781,3 +790,8 @@ func (d *dbManager) DSSendL2Block(l2Block *L2Block) error {

return nil
}

// GetStorageAt returns the storage at a given address and position
func (d *dbManager) GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error) {
return d.state.GetStorageAt(ctx, address, position, root)
}
40 changes: 21 additions & 19 deletions sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
"github.com/stretchr/testify/require"
)

/*
const (
forkId5 uint64 = 5
)
*/

var (
f *finalizer
Expand Down Expand Up @@ -90,21 +92,21 @@ var (
},
DefaultMinGasPriceAllowed: 1000000000,
}
chainID = new(big.Int).SetInt64(400)
pvtKey = "0x28b2b0318721be8c8339199172cd7cc8f5e273800a35616ec893083a4b32c02e"
nonce1 = uint64(1)
nonce2 = uint64(2)
seqAddr = common.Address{}
oldHash = common.HexToHash("0x01")
newHash = common.HexToHash("0x02")
newHash2 = common.HexToHash("0x03")
stateRootHashes = []common.Hash{oldHash, newHash, newHash2}
txHash = common.HexToHash("0xf9e4fe4bd2256f782c66cffd76acdb455a76111842bb7e999af2f1b7f4d8d092")
txHash2 = common.HexToHash("0xb281831a3401a04f3afa4ec586ef874f58c61b093643d408ea6aa179903df1a4")
tx = types.NewTransaction(nonce1, receiverAddr, big.NewInt(1), 100000, big.NewInt(1), nil)
senderAddr = common.HexToAddress("0x617b3a3528F9cDd6630fd3301B9c8911F7Bf063D")
receiverAddr = common.HexToAddress("0x1555324")
isSynced = func(ctx context.Context) bool {
// chainID = new(big.Int).SetInt64(400)
// pvtKey = "0x28b2b0318721be8c8339199172cd7cc8f5e273800a35616ec893083a4b32c02e"
nonce1 = uint64(1)
nonce2 = uint64(2)
seqAddr = common.Address{}
oldHash = common.HexToHash("0x01")
newHash = common.HexToHash("0x02")
newHash2 = common.HexToHash("0x03")
// stateRootHashes = []common.Hash{oldHash, newHash, newHash2}
txHash = common.HexToHash("0xf9e4fe4bd2256f782c66cffd76acdb455a76111842bb7e999af2f1b7f4d8d092")
txHash2 = common.HexToHash("0xb281831a3401a04f3afa4ec586ef874f58c61b093643d408ea6aa179903df1a4")
tx = types.NewTransaction(nonce1, receiverAddr, big.NewInt(1), 100000, big.NewInt(1), nil)
senderAddr = common.HexToAddress("0x617b3a3528F9cDd6630fd3301B9c8911F7Bf063D")
receiverAddr = common.HexToAddress("0x1555324")
isSynced = func(ctx context.Context) bool {
return true
}
testErrStr = "some err"
Expand All @@ -113,10 +115,10 @@ var (
cumulativeGasErr = state.GetZKCounterError("CumulativeGasUsed")
testBatchL2DataAsString = "0xee80843b9aca00830186a0944d5cf5032b2a844602278b01199ed191a86c93ff88016345785d8a0000808203e980801186622d03b6b8da7cf111d1ccba5bb185c56deae6a322cebc6dda0556f3cb9700910c26408b64b51c5da36ba2f38ef55ba1cee719d5a6c012259687999074321bff"
decodedBatchL2Data []byte
done chan bool
gasPrice = big.NewInt(1000000)
effectiveGasPrice = big.NewInt(1000000)
l1GasPrice = uint64(1000000)
// done chan bool
// gasPrice = big.NewInt(1000000)
// effectiveGasPrice = big.NewInt(1000000)
// l1GasPrice = uint64(1000000)
)

func testNow() time.Time {
Expand Down
2 changes: 2 additions & 0 deletions sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type stateInterface interface {
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*state.DSBatch, error)
GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*state.DSL2Block, error)
GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*state.DSL2Transaction, error)
GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error)
}

type workerInterface interface {
Expand Down Expand Up @@ -143,4 +144,5 @@ type dbManagerInterface interface {
GetForkIDByBatchNumber(batchNumber uint64) uint64
BuildChangeL2Block(deltaTimestamp uint32, l1InfoTreeIndex uint32) []byte
DSSendL2Block(l2Block *L2Block) error
GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error)
}
13 changes: 7 additions & 6 deletions sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {

txsEGPLog := []*state.EffectiveGasPriceLog{}
for _, tx := range l2Block.transactions {
txsEGPLog = append(txsEGPLog, &tx.EGPLog)
egpLog := tx.EGPLog
txsEGPLog = append(txsEGPLog, &egpLog)
}

// Store L2 block in the state
Expand Down Expand Up @@ -393,13 +394,13 @@ func (f *finalizer) closeWIPL2Block(ctx context.Context) {
}

func (f *finalizer) openNewWIPL2Block(ctx context.Context, prevTimestamp *time.Time) {
//TODO: use better f.wipBatch.remainingResources.Sub() instead to substract directly
// Substract the bytes needed to store the changeL2Block of the new L2 block into the WIP batch
//TODO: use better f.wipBatch.remainingResources.Sub() instead to subtract directly
// Subtract the bytes needed to store the changeL2Block of the new L2 block into the WIP batch
f.wipBatch.remainingResources.Bytes = f.wipBatch.remainingResources.Bytes - changeL2BlockSize
// Substract poseidon and arithmetic counters need to calculate the InfoRoot when the L2 block is closed
f.wipBatch.remainingResources.ZKCounters.UsedPoseidonHashes = f.wipBatch.remainingResources.ZKCounters.UsedPoseidonHashes - 256 //TODO: config param
// Subtract poseidon and arithmetic counters need to calculate the InfoRoot when the L2 block is closed
f.wipBatch.remainingResources.ZKCounters.UsedPoseidonHashes = f.wipBatch.remainingResources.ZKCounters.UsedPoseidonHashes - 256 // nolint:gomnd //TODO: config param
f.wipBatch.remainingResources.ZKCounters.UsedArithmetics = f.wipBatch.remainingResources.ZKCounters.UsedArithmetics - 1 //TODO: config param
// After do the substracts we need to check if we have not reached the size limit for the batch
// After do the subtracts we need to check if we have not reached the size limit for the batch
if f.isBatchResourcesExhausted() {
// If we have reached the limit then close the wip batch and create a new one
f.finalizeBatch(ctx)
Expand Down
26 changes: 26 additions & 0 deletions sequencer/mock_db_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions sequencer/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *Sequencer) Start(ctx context.Context) {
}

func (s *Sequencer) updateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer) {
err := state.GenerateDataStreamerFile(ctx, streamServer, s.state, true)
err := state.GenerateDataStreamerFile(ctx, streamServer, s.state, true, nil)
if err != nil {
log.Fatalf("failed to generate data streamer file, err: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion state/batchV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *State) ProcessBatchV2(ctx context.Context, request ProcessRequest, upda
}

//forkID := s.GetForkIDByBatchNumber(request.BatchNumber)
forkID := uint64(7)
forkID := uint64(FORKID_ETROG)

// Create Batch
var processBatchRequest = &executor.ProcessBatchRequestV2{
Expand Down
53 changes: 46 additions & 7 deletions state/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package state
import (
"context"
"encoding/binary"
"math/big"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/ethereum/go-ethereum/common"
"github.com/iden3/go-iden3-crypto/keccak256"
"github.com/jackc/pgx/v4"
)

Expand All @@ -25,6 +27,10 @@ const (
EntryTypeUpdateGER datastreamer.EntryType = 4
// BookMarkTypeL2Block represents a L2 block bookmark
BookMarkTypeL2Block byte = 0
// SystemSC is the system smart contract address
SystemSC = "0x000000000000000000000000000000005ca1ab1e"
// posConstant is the constant used to compute the position of the intermediate state root
posConstant = 1
)

// DSBatch represents a data stream batch
Expand Down Expand Up @@ -92,10 +98,11 @@ func (b DSL2BlockStart) Decode(data []byte) DSL2BlockStart {

// DSL2Transaction represents a data stream L2 transaction
type DSL2Transaction struct {
L2BlockNumber uint64 // Not included in the encoded data
EffectiveGasPricePercentage uint8 // 1 byte
IsValid uint8 // 1 byte
EncodedLength uint32 // 4 bytes
L2BlockNumber uint64 // Not included in the encoded data
EffectiveGasPricePercentage uint8 // 1 byte
IsValid uint8 // 1 byte
StateRoot common.Hash // 32 bytes
EncodedLength uint32 // 4 bytes
Encoded []byte
}

Expand All @@ -104,6 +111,7 @@ func (l DSL2Transaction) Encode() []byte {
bytes := make([]byte, 0)
bytes = append(bytes, byte(l.EffectiveGasPricePercentage))
bytes = append(bytes, byte(l.IsValid))
bytes = append(bytes, l.StateRoot[:]...)
bytes = binary.LittleEndian.AppendUint32(bytes, l.EncodedLength)
bytes = append(bytes, l.Encoded...)
return bytes
Expand All @@ -113,8 +121,9 @@ func (l DSL2Transaction) Encode() []byte {
func (l DSL2Transaction) Decode(data []byte) DSL2Transaction {
l.EffectiveGasPricePercentage = uint8(data[0])
l.IsValid = uint8(data[1])
l.EncodedLength = binary.LittleEndian.Uint32(data[2:6])
l.Encoded = data[6:]
l.StateRoot = common.BytesToHash(data[2:34])
l.EncodedLength = binary.LittleEndian.Uint32(data[34:38])
l.Encoded = data[38:]
return l
}

Expand Down Expand Up @@ -202,10 +211,12 @@ type DSState interface {
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*DSBatch, error)
GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*DSL2Block, error)
GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*DSL2Transaction, error)
GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error)
GetLastL2BlockHeader(ctx context.Context, dbTx pgx.Tx) (*L2Header, error)
}

// GenerateDataStreamerFile generates or resumes a data stream file
func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool) error {
func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool, imStateRoots *map[uint64][]byte) error {
header := streamServer.GetHeader()

var currentBatchNumber uint64 = 0
Expand Down Expand Up @@ -412,6 +423,18 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
}

for _, tx := range l2block.Txs {
// Populate intermediate state root
if imStateRoots == nil || (*imStateRoots)[blockStart.L2BlockNumber] == nil {
position := GetSystemSCPosition(l2block.L2BlockNumber)
imStateRoot, err := stateDB.GetStorageAt(ctx, common.HexToAddress(SystemSC), big.NewInt(0).SetBytes(position), l2block.StateRoot)
if err != nil {
return err
}
tx.StateRoot = common.BigToHash(imStateRoot)
} else {
tx.StateRoot = common.BytesToHash((*imStateRoots)[blockStart.L2BlockNumber])
}

entry, err = streamServer.AddStreamEntry(EntryTypeL2Tx, tx.Encode())
if err != nil {
return err
Expand Down Expand Up @@ -441,6 +464,22 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
return err
}

// GetSystemSCPosition computes the position of the intermediate state root for the system smart contract
func GetSystemSCPosition(blockNumber uint64) []byte {
v1 := big.NewInt(0).SetUint64(blockNumber).Bytes()
v2 := big.NewInt(0).SetUint64(uint64(posConstant)).Bytes()

// Add 0s to make v1 and v2 32 bytes long
for len(v1) < 32 {
v1 = append([]byte{0}, v1...)
}
for len(v2) < 32 {
v2 = append([]byte{0}, v2...)
}

return keccak256.Hash(v1, v2)
}

// computeFullBatches computes the full batches
func computeFullBatches(batches []*DSBatch, l2Blocks []*DSL2Block, l2Txs []*DSL2Transaction) []*DSFullBatch {
currentL2Block := 0
Expand Down
1 change: 1 addition & 0 deletions state/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func DecodeTx(encodedTx string) (*types.Transaction, error) {
return tx, nil
}

// GenerateReceipt generates a receipt from a processed transaction
func GenerateReceipt(blockNumber *big.Int, processedTx *ProcessTransactionResponse) *types.Receipt {
receipt := &types.Receipt{
Type: uint8(processedTx.Type),
Expand Down
Loading

0 comments on commit 9ce36bf

Please sign in to comment.