Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add im state root to data stream #2893

Merged
merged 14 commits into from
Dec 11, 2023
13 changes: 13 additions & 0 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ func (d *dbManager) sendDataToStreamer() {
}

for _, l2Transaction := range l2Transactions {
// Populate intermediate state root
position := state.GetSystemSCPosition(blockStart.L2BlockNumber)
imStateRoot, err := d.GetStorageAt(context.Background(), common.HexToAddress(state.SystemSC), big.NewInt(0).SetBytes(position), l2Block.StateRoot)
if err != nil {
log.Errorf("failed to get storage at for l2block %v: %v", l2Block.L2BlockNumber, err)
}
l2Transaction.StateRoot = common.BigToHash(imStateRoot)

_, err = d.streamServer.AddStreamEntry(state.EntryTypeL2Tx, l2Transaction.Encode())
if err != nil {
log.Errorf("failed to add l2tx stream entry for l2block %v: %v", l2Block.L2BlockNumber, err)
Expand Down Expand Up @@ -726,3 +734,8 @@ func (d *dbManager) GetForcedBatch(ctx context.Context, forcedBatchNumber uint64
func (d *dbManager) GetForkIDByBatchNumber(batchNumber uint64) uint64 {
return d.state.GetForkIDByBatchNumber(batchNumber)
}

// GetStorageAt returns the storage at a given address and position
func (d *dbManager) GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error) {
return d.state.GetStorageAt(ctx, address, position, root)
}
2 changes: 2 additions & 0 deletions sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type stateInterface interface {
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*state.DSBatch, error)
GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*state.DSL2Block, error)
GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*state.DSL2Transaction, error)
GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error)
}

type workerInterface interface {
Expand Down Expand Up @@ -137,4 +138,5 @@ type dbManagerInterface interface {
StoreProcessedTxAndDeleteFromPool(ctx context.Context, tx transactionToStore) error
GetForcedBatch(ctx context.Context, forcedBatchNumber uint64, dbTx pgx.Tx) (*state.ForcedBatch, error)
GetForkIDByBatchNumber(batchNumber uint64) uint64
GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error)
}
26 changes: 26 additions & 0 deletions sequencer/mock_db_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions sequencer/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *Sequencer) Start(ctx context.Context) {
}

func (s *Sequencer) updateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer) {
err := state.GenerateDataStreamerFile(ctx, streamServer, s.state, true)
err := state.GenerateDataStreamerFile(ctx, streamServer, s.state, true, nil)
if err != nil {
log.Fatalf("failed to generate data streamer file, err: %v", err)
}
Expand Down
56 changes: 47 additions & 9 deletions state/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package state
import (
"context"
"encoding/binary"
"math/big"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/iden3/go-iden3-crypto/keccak256"
"github.com/jackc/pgx/v4"
)

Expand All @@ -25,6 +28,10 @@ const (
EntryTypeUpdateGER datastreamer.EntryType = 4
// BookMarkTypeL2Block represents a L2 block bookmark
BookMarkTypeL2Block byte = 0
// SystemSC is the system smart contract address
SystemSC = "0x000000000000000000000000000000005ca1ab1e"
// posConstant is the constant used to compute the position of the intermediate state root
posConstant = 1
)

// DSBatch represents a data stream batch
Expand Down Expand Up @@ -92,10 +99,11 @@ func (b DSL2BlockStart) Decode(data []byte) DSL2BlockStart {

// DSL2Transaction represents a data stream L2 transaction
type DSL2Transaction struct {
L2BlockNumber uint64 // Not included in the encoded data
EffectiveGasPricePercentage uint8 // 1 byte
IsValid uint8 // 1 byte
EncodedLength uint32 // 4 bytes
L2BlockNumber uint64 // Not included in the encoded data
EffectiveGasPricePercentage uint8 // 1 byte
IsValid uint8 // 1 byte
StateRoot common.Hash // 32 bytes
EncodedLength uint32 // 4 bytes
Encoded []byte
}

Expand All @@ -104,6 +112,7 @@ func (l DSL2Transaction) Encode() []byte {
bytes := make([]byte, 0)
bytes = append(bytes, byte(l.EffectiveGasPricePercentage))
bytes = append(bytes, byte(l.IsValid))
bytes = append(bytes, l.StateRoot[:]...)
bytes = binary.LittleEndian.AppendUint32(bytes, l.EncodedLength)
bytes = append(bytes, l.Encoded...)
return bytes
Expand All @@ -113,8 +122,9 @@ func (l DSL2Transaction) Encode() []byte {
func (l DSL2Transaction) Decode(data []byte) DSL2Transaction {
l.EffectiveGasPricePercentage = uint8(data[0])
l.IsValid = uint8(data[1])
l.EncodedLength = binary.LittleEndian.Uint32(data[2:6])
l.Encoded = data[6:]
l.StateRoot = common.BytesToHash(data[2:34])
l.EncodedLength = binary.LittleEndian.Uint32(data[34:38])
l.Encoded = data[38:]
return l
}

Expand Down Expand Up @@ -202,10 +212,12 @@ type DSState interface {
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*DSBatch, error)
GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*DSL2Block, error)
GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*DSL2Transaction, error)
GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error)
GetLastL2BlockHeader(ctx context.Context, dbTx pgx.Tx) (*types.Header, error)
}

// GenerateDataStreamerFile generates or resumes a data stream file
func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool) error {
func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool, imStateRoots *map[uint64][]byte) error {
header := streamServer.GetHeader()

var currentBatchNumber uint64 = 0
Expand Down Expand Up @@ -345,8 +357,6 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St

// Gererate full batches
fullBatches := computeFullBatches(batches, l2Blocks, l2Txs)
log.Debugf("Full batches: %+v", fullBatches)

currentBatchNumber += limit

for _, batch := range fullBatches {
Expand Down Expand Up @@ -418,6 +428,18 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
}

for _, tx := range l2block.Txs {
// Populate intermediate state root
if imStateRoots == nil || (*imStateRoots)[blockStart.L2BlockNumber] == nil {
position := GetSystemSCPosition(l2block.L2BlockNumber)
imStateRoot, err := stateDB.GetStorageAt(ctx, common.HexToAddress(SystemSC), big.NewInt(0).SetBytes(position), l2block.StateRoot)
if err != nil {
return err
}
tx.StateRoot = common.BigToHash(imStateRoot)
} else {
tx.StateRoot = common.BytesToHash((*imStateRoots)[blockStart.L2BlockNumber])
}

entry, err = streamServer.AddStreamEntry(EntryTypeL2Tx, tx.Encode())
if err != nil {
return err
Expand Down Expand Up @@ -447,6 +469,22 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
return err
}

// GetSystemSCPosition computes the position of the intermediate state root for the system smart contract
func GetSystemSCPosition(blockNumber uint64) []byte {
v1 := big.NewInt(0).SetUint64(blockNumber).Bytes()
v2 := big.NewInt(0).SetUint64(uint64(posConstant)).Bytes()

// Add 0s to make v1 and v2 32 bytes long
for len(v1) < 32 {
v1 = append([]byte{0}, v1...)
}
for len(v2) < 32 {
v2 = append([]byte{0}, v2...)
}

return keccak256.Hash(v1, v2)
}

// computeFullBatches computes the full batches
func computeFullBatches(batches []*DSBatch, l2Blocks []*DSL2Block, l2Txs []*DSL2Transaction) []*DSFullBatch {
currentL2Block := 0
Expand Down
25 changes: 20 additions & 5 deletions state/test/datastream_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package test

import (
"fmt"
"testing"
"time"

"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -28,14 +30,15 @@ func TestL2BlockStartEncode(t *testing.T) {

func TestL2TransactionEncode(t *testing.T) {
l2Transaction := state.DSL2Transaction{
EffectiveGasPricePercentage: 128, // 1 byte
IsValid: 1, // 1 byte
EncodedLength: 5, // 4 bytes
Encoded: []byte{1, 2, 3, 4, 5}, // 5 bytes
EffectiveGasPricePercentage: 128, // 1 byte
IsValid: 1, // 1 byte
StateRoot: common.HexToHash("0x010203"), // 32 bytes
EncodedLength: 5, // 4 bytes
Encoded: []byte{1, 2, 3, 4, 5}, // 5 bytes
}

encoded := l2Transaction.Encode()
expected := []byte{128, 1, 5, 0, 0, 0, 1, 2, 3, 4, 5}
expected := []byte{128, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 5, 0, 0, 0, 1, 2, 3, 4, 5}
assert.Equal(t, expected, encoded)
}

Expand All @@ -53,3 +56,15 @@ func TestL2BlockEndEncode(t *testing.T) {

assert.Equal(t, expected, encoded)
}

func TestCalculateSCPosition(t *testing.T) {
a := time.Now()
blockNumber := uint64(2934867)
expected := common.HexToHash("0xaa93c484856be45716623765b429a967296594ca362e61e91d671fb422e0f744")
position := state.GetSystemSCPosition(blockNumber)
assert.Equal(t, expected, common.BytesToHash(position))
b := time.Now()

c := b.Sub(a)
fmt.Println(c)
}
21 changes: 13 additions & 8 deletions tools/datastreamer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-data-streamer/log"
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/merkletree"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
Expand All @@ -28,15 +27,21 @@ type OnlineConfig struct {
StreamType datastreamer.StreamType `mapstructure:"StreamType"`
}

// MTConfig is the configuration for the merkle tree
type MTConfig struct {
URI string `mapstructure:"URI"`
MaxThreads int `mapstructure:"MaxThreads"`
}

// Config is the configuration for the tool
type Config struct {
ChainID uint64 `mapstructure:"ChainID"`
Online OnlineConfig `mapstructure:"Online"`
Offline datastreamer.Config `mapstructure:"Offline"`
StateDB db.Config `mapstructure:"StateDB"`
Executor executor.Config `mapstructure:"Executor"`
MerkeTree merkletree.Config `mapstructure:"MerkeTree"`
Log log.Config `mapstructure:"Log"`
ChainID uint64 `mapstructure:"ChainID"`
Online OnlineConfig `mapstructure:"Online"`
Offline datastreamer.Config `mapstructure:"Offline"`
StateDB db.Config `mapstructure:"StateDB"`
Executor executor.Config `mapstructure:"Executor"`
MerkleTree MTConfig `mapstructure:"MerkleTree"`
Log log.Config `mapstructure:"Log"`
}

// Default parses the default configuration values.
Expand Down
3 changes: 2 additions & 1 deletion tools/datastreamer/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ MaxConns = 200
URI = "zkevm-prover:50071"
MaxGRPCMessageSize = 100000000

[MerkeTree]
[MerkleTree]
URI = "zkevm-prover:50061"
MaxThreads = 20

[Log]
Environment = "development" # "production" or "development"
Expand Down
3 changes: 2 additions & 1 deletion tools/datastreamer/config/tool.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ MaxConns = 200
URI = "zkevm-prover:50071"
MaxGRPCMessageSize = 100000000

[MerkeTree]
[MerkleTree]
URI = "zkevm-prover:50061"
MaxThreads = 20

[Log]
Environment = "development"
Expand Down
Loading