From ac53abd237a7c41dcf6713e162edc8f01f7d796a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Thu, 5 Oct 2023 09:23:23 +0200 Subject: [PATCH] Populate stream file on sequencer startup (#2613) (#2614) * populates stream file on startup * fix test --- .gitignore | 2 + go.mod | 2 +- go.sum | 4 +- sequencer/datastream.go | 63 ----- sequencer/datastream_test.go | 36 --- sequencer/dbmanager.go | 50 +++- sequencer/interfaces.go | 3 + sequencer/mock_state.go | 78 ++++++ sequencer/sequencer.go | 186 +++++++++++++- state/datastream.go | 93 +++++++ state/pgstatestorage.go | 114 +++++++++ state/test/datastream_test.go | 55 +++++ tools/datastreamer/Makefile | 23 ++ tools/datastreamer/config/config.go | 86 +++++++ tools/datastreamer/config/default.go | 21 ++ tools/datastreamer/config/tool.config.toml | 17 ++ tools/datastreamer/db/config.go | 19 ++ tools/datastreamer/db/db.go | 154 ++++++++++++ tools/datastreamer/db/logger.go | 27 +++ tools/datastreamer/main.go | 268 +++++++++++++++++++++ 20 files changed, 1177 insertions(+), 124 deletions(-) delete mode 100644 sequencer/datastream.go delete mode 100644 sequencer/datastream_test.go create mode 100644 state/datastream.go create mode 100644 state/test/datastream_test.go create mode 100644 tools/datastreamer/Makefile create mode 100644 tools/datastreamer/config/config.go create mode 100644 tools/datastreamer/config/default.go create mode 100644 tools/datastreamer/config/tool.config.toml create mode 100644 tools/datastreamer/db/config.go create mode 100644 tools/datastreamer/db/db.go create mode 100644 tools/datastreamer/db/logger.go create mode 100644 tools/datastreamer/main.go diff --git a/.gitignore b/.gitignore index e460c1f4b5..c21fbd68d2 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,8 @@ /test/contracts/bin/**/*.bin /test/contracts/bin/**/*.abi +/tools/datastreamer/*.bin + **/.DS_Store .vscode .idea/ diff --git a/go.mod b/go.mod index 9003197104..8ac396390e 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node go 1.19 require ( - github.com/0xPolygonHermez/zkevm-data-streamer v0.0.6 + github.com/0xPolygonHermez/zkevm-data-streamer v0.0.8 github.com/didip/tollbooth/v6 v6.1.2 github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127 github.com/ethereum/go-ethereum v1.13.2 diff --git a/go.sum b/go.sum index a18c621ee5..cb3af9eec8 100644 --- a/go.sum +++ b/go.sum @@ -42,8 +42,8 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/0xPolygonHermez/zkevm-data-streamer v0.0.6 h1:jdK1BMWftw0vHFlxFc+pi202rH+3av1x/bQ20NXFINk= -github.com/0xPolygonHermez/zkevm-data-streamer v0.0.6/go.mod h1:PNLzimb4objx43SXmVlzEjp2lUFxeeq+WpQumm1PUY0= +github.com/0xPolygonHermez/zkevm-data-streamer v0.0.8 h1:hOByFEvUC8hJnfbINMFzXxBru07AQLEhN50afow6Eu8= +github.com/0xPolygonHermez/zkevm-data-streamer v0.0.8/go.mod h1:UqLxA+/R20fm63Mp+J7wYMfh6WoE+6vBj6rOmFGuRm4= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= diff --git a/sequencer/datastream.go b/sequencer/datastream.go deleted file mode 100644 index 6e0cbe0069..0000000000 --- a/sequencer/datastream.go +++ /dev/null @@ -1,63 +0,0 @@ -package sequencer - -import ( - "encoding/binary" - - "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" - "github.com/ethereum/go-ethereum/common" -) - -const ( - // StreamTypeSequencer represents a Sequencer stream - StreamTypeSequencer datastreamer.StreamType = 1 - // EntryTypeL2Block represents a L2 block - EntryTypeL2Block datastreamer.EntryType = 1 - // EntryTypeL2Tx represents a L2 transaction - EntryTypeL2Tx datastreamer.EntryType = 2 -) - -// DSL2FullBlock represents a data stream L2 full block and its transactions -type DSL2FullBlock struct { - L2Block DSL2Block - Txs []DSL2Transaction -} - -// DSL2Block represents a data stream L2 block -type DSL2Block struct { - BatchNumber uint64 - L2BlockNumber uint64 - Timestamp uint64 - GlobalExitRoot common.Hash - Coinbase common.Address -} - -// Encode returns the encoded L2Block as a byte slice -func (b DSL2Block) Encode() []byte { - bytes := make([]byte, 0) - bytes = binary.LittleEndian.AppendUint64(bytes, b.BatchNumber) - bytes = binary.LittleEndian.AppendUint64(bytes, b.L2BlockNumber) - bytes = binary.LittleEndian.AppendUint64(bytes, uint64(b.Timestamp)) - bytes = append(bytes, b.GlobalExitRoot.Bytes()...) - bytes = append(bytes, b.Coinbase.Bytes()...) - return bytes -} - -// DSL2Transaction represents a data stream L2 transaction -type DSL2Transaction struct { - BatchNumber uint64 - EffectiveGasPricePercentage uint8 - IsValid uint8 - EncodedLength uint32 - Encoded []byte -} - -// Encode returns the encoded L2Transaction as a byte slice -func (l DSL2Transaction) Encode() []byte { - bytes := make([]byte, 0) - bytes = binary.LittleEndian.AppendUint64(bytes, l.BatchNumber) - bytes = append(bytes, byte(l.EffectiveGasPricePercentage)) - bytes = append(bytes, byte(l.IsValid)) - bytes = binary.LittleEndian.AppendUint32(bytes, l.EncodedLength) - bytes = append(bytes, l.Encoded...) - return bytes -} diff --git a/sequencer/datastream_test.go b/sequencer/datastream_test.go deleted file mode 100644 index 971525ba51..0000000000 --- a/sequencer/datastream_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package sequencer - -import ( - "testing" - - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/assert" -) - -func TestL2BlockEncode(t *testing.T) { - l2Block := DSL2Block{ - BatchNumber: 1, // 8 bytes - L2BlockNumber: 2, // 8 bytes - Timestamp: 3, // 8 bytes - GlobalExitRoot: common.HexToHash("0x04"), // 32 bytes - Coinbase: common.HexToAddress("0x05"), // 20 bytes - } - - encoded := l2Block.Encode() - assert.Equal(t, []byte{1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5}, encoded) -} - -func TestL2TransactionEncode(t *testing.T) { - l2Transaction := DSL2Transaction{ - BatchNumber: 1, // 8 bytes - EffectiveGasPricePercentage: 128, // 1 byte - IsValid: 1, // 1 byte - EncodedLength: 5, // 4 bytes - Encoded: []byte{1, 2, 3, 4, 5}, // 5 bytes - } - - encoded := l2Transaction.Encode() - assert.Equal(t, []byte{1, 0, 0, 0, 0, 0, 0, 0, 128, 1, 5, 0, 0, 0, 1, 2, 3, 4, 5}, encoded) -} diff --git a/sequencer/dbmanager.go b/sequencer/dbmanager.go index c30daec18d..9de7800e09 100644 --- a/sequencer/dbmanager.go +++ b/sequencer/dbmanager.go @@ -29,7 +29,7 @@ type dbManager struct { batchConstraints state.BatchConstraintsCfg numberOfReorgs uint64 streamServer *datastreamer.StreamServer - dataToStream chan DSL2FullBlock + dataToStream chan state.DSL2FullBlock } func (d *dbManager) GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error) { @@ -48,16 +48,16 @@ type ClosingBatchParameters struct { EffectivePercentages []uint8 } -func newDBManager(ctx context.Context, config DBManagerCfg, txPool txPool, state stateInterface, worker *Worker, closingSignalCh ClosingSignalCh, batchConstraints state.BatchConstraintsCfg) *dbManager { - numberOfReorgs, err := state.CountReorgs(ctx, nil) +func newDBManager(ctx context.Context, config DBManagerCfg, txPool txPool, stateInterface stateInterface, worker *Worker, closingSignalCh ClosingSignalCh, batchConstraints state.BatchConstraintsCfg) *dbManager { + numberOfReorgs, err := stateInterface.CountReorgs(ctx, nil) if err != nil { log.Error("failed to get number of reorgs: %v", err) } return &dbManager{ctx: ctx, cfg: config, txPool: txPool, - state: state, worker: worker, l2ReorgCh: closingSignalCh.L2ReorgCh, + state: stateInterface, worker: worker, l2ReorgCh: closingSignalCh.L2ReorgCh, batchConstraints: batchConstraints, numberOfReorgs: numberOfReorgs, - dataToStream: make(chan DSL2FullBlock, batchConstraints.MaxTxsPerBatch*datastreamChannelMultiplier)} + dataToStream: make(chan state.DSL2FullBlock, batchConstraints.MaxTxsPerBatch*datastreamChannelMultiplier)} } // Start stars the dbManager routines @@ -174,20 +174,40 @@ func (d *dbManager) sendDataToStreamer() { continue } - _, err = d.streamServer.AddStreamEntry(EntryTypeL2Block, l2Block.Encode()) + blockStart := state.DSL2BlockStart{ + BatchNumber: l2Block.BatchNumber, + L2BlockNumber: l2Block.L2BlockNumber, + Timestamp: l2Block.Timestamp, + GlobalExitRoot: l2Block.GlobalExitRoot, + Coinbase: l2Block.Coinbase, + ForkID: l2Block.ForkID, + } + + _, err = d.streamServer.AddStreamEntry(state.EntryTypeL2BlockStart, blockStart.Encode()) if err != nil { log.Errorf("failed to add stream entry for l2block %v: %v", l2Block.L2BlockNumber, err) continue } for _, l2Transaction := range l2Transactions { - _, err = d.streamServer.AddStreamEntry(EntryTypeL2Tx, l2Transaction.Encode()) + _, err = d.streamServer.AddStreamEntry(state.EntryTypeL2Tx, l2Transaction.Encode()) if err != nil { log.Errorf("failed to add l2tx stream entry for l2block %v: %v", l2Block.L2BlockNumber, err) continue } } + blockEnd := state.DSL2BlockEnd{ + L2BlockNumber: l2Block.L2BlockNumber, + BlockHash: l2Block.BlockHash, + StateRoot: l2Block.StateRoot, + } + + _, err = d.streamServer.AddStreamEntry(state.EntryTypeL2BlockEnd, blockEnd.Encode()) + if err != nil { + log.Fatal(err) + } + err = d.streamServer.CommitAtomicOp() if err != nil { log.Errorf("failed to commit atomic op for l2block %v: %v ", l2Block.L2BlockNumber, err) @@ -286,25 +306,29 @@ func (d *dbManager) StoreProcessedTxAndDeleteFromPool(ctx context.Context, tx tr // Send data to streamer if d.streamServer != nil { - l2Block := DSL2Block{ + forkID := d.state.GetForkIDByBatchNumber(tx.batchNumber) + + l2Block := state.DSL2Block{ BatchNumber: tx.batchNumber, L2BlockNumber: l2BlockHeader.Number.Uint64(), - Timestamp: uint64(tx.timestamp.Unix()), + Timestamp: tx.timestamp.Unix(), GlobalExitRoot: batch.GlobalExitRoot, Coinbase: tx.coinbase, + ForkID: uint16(forkID), + BlockHash: l2BlockHeader.Hash(), + StateRoot: l2BlockHeader.Root, } - l2Transaction := DSL2Transaction{ - BatchNumber: batch.BatchNumber, + l2Transaction := state.DSL2Transaction{ EffectiveGasPricePercentage: uint8(tx.response.EffectivePercentage), IsValid: 1, EncodedLength: uint32(len(txData)), Encoded: txData, } - d.dataToStream <- DSL2FullBlock{ + d.dataToStream <- state.DSL2FullBlock{ L2Block: l2Block, - Txs: []DSL2Transaction{l2Transaction}, + Txs: []state.DSL2Transaction{l2Transaction}, } } diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index a1dc84a5cd..32bf0c1ce7 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -81,6 +81,9 @@ type stateInterface interface { FlushMerkleTree(ctx context.Context) error GetStoredFlushID(ctx context.Context) (uint64, string, error) GetForkIDByBatchNumber(batchNumber uint64) uint64 + GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*state.DSL2Block, error) + GetDSL2Blocks(ctx context.Context, limit, offset uint64, dbTx pgx.Tx) ([]*state.DSL2Block, error) + GetDSL2Transactions(ctx context.Context, minL2Block, maxL2Block uint64, dbTx pgx.Tx) ([]*state.DSL2Transaction, error) } type workerInterface interface { diff --git a/sequencer/mock_state.go b/sequencer/mock_state.go index 190fe4958c..caba444074 100644 --- a/sequencer/mock_state.go +++ b/sequencer/mock_state.go @@ -210,6 +210,84 @@ func (_m *StateMock) GetBatchByNumber(ctx context.Context, batchNumber uint64, d return r0, r1 } +// GetDSGenesisBlock provides a mock function with given fields: ctx, dbTx +func (_m *StateMock) GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*state.DSL2Block, error) { + ret := _m.Called(ctx, dbTx) + + var r0 *state.DSL2Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, pgx.Tx) (*state.DSL2Block, error)); ok { + return rf(ctx, dbTx) + } + if rf, ok := ret.Get(0).(func(context.Context, pgx.Tx) *state.DSL2Block); ok { + r0 = rf(ctx, dbTx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*state.DSL2Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, pgx.Tx) error); ok { + r1 = rf(ctx, dbTx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetDSL2Blocks provides a mock function with given fields: ctx, limit, offset, dbTx +func (_m *StateMock) GetDSL2Blocks(ctx context.Context, limit uint64, offset uint64, dbTx pgx.Tx) ([]*state.DSL2Block, error) { + ret := _m.Called(ctx, limit, offset, dbTx) + + var r0 []*state.DSL2Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, pgx.Tx) ([]*state.DSL2Block, error)); ok { + return rf(ctx, limit, offset, dbTx) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, pgx.Tx) []*state.DSL2Block); ok { + r0 = rf(ctx, limit, offset, dbTx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*state.DSL2Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64, uint64, pgx.Tx) error); ok { + r1 = rf(ctx, limit, offset, dbTx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetDSL2Transactions provides a mock function with given fields: ctx, minL2Block, maxL2Block, dbTx +func (_m *StateMock) GetDSL2Transactions(ctx context.Context, minL2Block uint64, maxL2Block uint64, dbTx pgx.Tx) ([]*state.DSL2Transaction, error) { + ret := _m.Called(ctx, minL2Block, maxL2Block, dbTx) + + var r0 []*state.DSL2Transaction + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, pgx.Tx) ([]*state.DSL2Transaction, error)); ok { + return rf(ctx, minL2Block, maxL2Block, dbTx) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, pgx.Tx) []*state.DSL2Transaction); ok { + r0 = rf(ctx, minL2Block, maxL2Block, dbTx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*state.DSL2Transaction) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64, uint64, pgx.Tx) error); ok { + r1 = rf(ctx, minL2Block, maxL2Block, dbTx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetForcedBatch provides a mock function with given fields: ctx, forcedBatchNumber, dbTx func (_m *StateMock) GetForcedBatch(ctx context.Context, forcedBatchNumber uint64, dbTx pgx.Tx) (*state.ForcedBatch, error) { ret := _m.Called(ctx, forcedBatchNumber, dbTx) diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go index 0227045125..4538754b7a 100644 --- a/sequencer/sequencer.go +++ b/sequencer/sequencer.go @@ -2,6 +2,7 @@ package sequencer import ( "context" + "encoding/binary" "errors" "fmt" "reflect" @@ -88,26 +89,33 @@ func (s *Sequencer) Start(ctx context.Context) { // Start stream server if enabled if s.cfg.StreamServer.Enabled { - streamServer, err := datastreamer.New(s.cfg.StreamServer.Port, StreamTypeSequencer, s.cfg.StreamServer.Filename, &s.cfg.StreamServer.Log) + streamServer, err := datastreamer.New(s.cfg.StreamServer.Port, state.StreamTypeSequencer, s.cfg.StreamServer.Filename, &s.cfg.StreamServer.Log) if err != nil { log.Fatalf("failed to create stream server, err: %v", err) } // Set entities definition entriesDefinition := map[datastreamer.EntryType]datastreamer.EntityDefinition{ - EntryTypeL2Block: { - Name: "L2Block", - StreamType: StreamTypeSequencer, - Definition: reflect.TypeOf(DSL2Block{}), + state.EntryTypeL2BlockStart: { + Name: "L2BlockStart", + StreamType: state.StreamTypeSequencer, + Definition: reflect.TypeOf(state.DSL2BlockStart{}), }, - EntryTypeL2Tx: { + state.EntryTypeL2Tx: { Name: "L2Transaction", - StreamType: StreamTypeSequencer, - Definition: reflect.TypeOf(DSL2Transaction{}), + StreamType: state.StreamTypeSequencer, + Definition: reflect.TypeOf(state.DSL2Transaction{}), + }, + state.EntryTypeL2BlockEnd: { + Name: "L2BlockEnd", + StreamType: state.StreamTypeSequencer, + Definition: reflect.TypeOf(state.DSL2BlockEnd{}), }, } - streamServer.SetEntriesDefinition(entriesDefinition) + streamServer.SetEntriesDef(entriesDefinition) + + s.updateDataStreamerFile(ctx, &streamServer) dbManager.streamServer = &streamServer err = dbManager.streamServer.Start() @@ -150,6 +158,166 @@ func (s *Sequencer) Start(ctx context.Context) { <-ctx.Done() } +func (s *Sequencer) updateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer) { + var currentL2Block uint64 + var currentTxIndex uint64 + var err error + + header := streamServer.GetHeader() + + if header.TotalEntries == 0 { + // Get Genesis block + genesisL2Block, err := s.state.GetDSGenesisBlock(ctx, nil) + if err != nil { + log.Fatal(err) + } + + err = streamServer.StartAtomicOp() + if err != nil { + log.Fatal(err) + } + + genesisBlock := state.DSL2BlockStart{ + BatchNumber: genesisL2Block.BatchNumber, + L2BlockNumber: genesisL2Block.L2BlockNumber, + Timestamp: genesisL2Block.Timestamp, + GlobalExitRoot: genesisL2Block.GlobalExitRoot, + Coinbase: genesisL2Block.Coinbase, + ForkID: genesisL2Block.ForkID, + } + + log.Infof("Genesis block: %+v", genesisBlock) + + _, err = streamServer.AddStreamEntry(1, genesisBlock.Encode()) + if err != nil { + log.Fatal(err) + } + + genesisBlockEnd := state.DSL2BlockEnd{ + L2BlockNumber: genesisL2Block.L2BlockNumber, + BlockHash: genesisL2Block.BlockHash, + StateRoot: genesisL2Block.StateRoot, + } + + _, err = streamServer.AddStreamEntry(state.EntryTypeL2BlockEnd, genesisBlockEnd.Encode()) + if err != nil { + log.Fatal(err) + } + + err = streamServer.CommitAtomicOp() + if err != nil { + log.Fatal(err) + } + } else { + latestEntry, err := streamServer.GetEntry(header.TotalEntries - 1) + if err != nil { + log.Fatal(err) + } + + log.Infof("Latest entry: %+v", latestEntry) + + switch latestEntry.EntryType { + case state.EntryTypeL2BlockStart: + log.Info("Latest entry type is L2BlockStart") + currentL2Block = binary.LittleEndian.Uint64(latestEntry.Data[8:16]) + case state.EntryTypeL2Tx: + log.Info("Latest entry type is L2Tx") + for latestEntry.EntryType == state.EntryTypeL2Tx { + currentTxIndex++ + latestEntry, err = streamServer.GetEntry(header.TotalEntries - currentTxIndex) + if err != nil { + log.Fatal(err) + } + } + if latestEntry.EntryType != state.EntryTypeL2BlockStart { + log.Fatal("Latest entry is not a L2BlockStart") + } + currentL2Block = binary.LittleEndian.Uint64(latestEntry.Data[8:16]) + case state.EntryTypeL2BlockEnd: + log.Info("Latest entry type is L2BlockEnd") + currentL2Block = binary.LittleEndian.Uint64(latestEntry.Data[0:8]) + } + } + + log.Infof("Current transaction index: %d", currentTxIndex) + if currentTxIndex == 0 { + currentL2Block++ + } + log.Infof("Current L2 block number: %d", currentL2Block) + + var limit uint64 = 1000 + var offset uint64 = currentL2Block + var entry uint64 = header.TotalEntries + var l2blocks []*state.DSL2Block + + if entry > 0 { + entry-- + } + + for err == nil { + log.Infof("Current entry number: %d", entry) + + l2blocks, err = s.state.GetDSL2Blocks(ctx, limit, offset, nil) + offset += limit + if len(l2blocks) == 0 { + break + } + // Get transactions for all the retrieved l2 blocks + l2Transactions, err := s.state.GetDSL2Transactions(ctx, l2blocks[0].L2BlockNumber, l2blocks[len(l2blocks)-1].L2BlockNumber, nil) + if err != nil { + log.Fatal(err) + } + + err = streamServer.StartAtomicOp() + if err != nil { + log.Fatal(err) + } + + for x, l2block := range l2blocks { + if currentTxIndex > 0 { + x += int(currentTxIndex) + currentTxIndex = 0 + } + + blockStart := state.DSL2BlockStart{ + BatchNumber: l2block.BatchNumber, + L2BlockNumber: l2block.L2BlockNumber, + Timestamp: l2block.Timestamp, + GlobalExitRoot: l2block.GlobalExitRoot, + Coinbase: l2block.Coinbase, + ForkID: l2block.ForkID, + } + + _, err = streamServer.AddStreamEntry(state.EntryTypeL2BlockStart, blockStart.Encode()) + if err != nil { + log.Fatal(err) + } + + entry, err = streamServer.AddStreamEntry(state.EntryTypeL2Tx, l2Transactions[x].Encode()) + if err != nil { + log.Fatal(err) + } + + blockEnd := state.DSL2BlockEnd{ + L2BlockNumber: l2block.L2BlockNumber, + BlockHash: l2block.BlockHash, + StateRoot: l2block.StateRoot, + } + + _, err = streamServer.AddStreamEntry(state.EntryTypeL2BlockEnd, blockEnd.Encode()) + if err != nil { + log.Fatal(err) + } + } + err = streamServer.CommitAtomicOp() + if err != nil { + log.Fatal(err) + } + } + + log.Info("Data streamer file updated") +} + func (s *Sequencer) bootstrap(ctx context.Context, dbManager *dbManager, finalizer *finalizer) (*WipBatch, *state.ProcessRequest) { var ( currBatch *WipBatch diff --git a/state/datastream.go b/state/datastream.go new file mode 100644 index 0000000000..427be2e784 --- /dev/null +++ b/state/datastream.go @@ -0,0 +1,93 @@ +package state + +import ( + "encoding/binary" + + "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" + "github.com/ethereum/go-ethereum/common" +) + +const ( + // StreamTypeSequencer represents a Sequencer stream + StreamTypeSequencer datastreamer.StreamType = 1 + // EntryTypeL2BlockStart represents a L2 block start + EntryTypeL2BlockStart datastreamer.EntryType = 1 + // EntryTypeL2Tx represents a L2 transaction + EntryTypeL2Tx datastreamer.EntryType = 2 + // EntryTypeL2BlockEnd represents a L2 block end + EntryTypeL2BlockEnd datastreamer.EntryType = 3 +) + +// DSL2FullBlock represents a data stream L2 full block and its transactions +type DSL2FullBlock struct { + L2Block DSL2Block + Txs []DSL2Transaction +} + +// DSL2Block is a full l2 block +type DSL2Block struct { + BatchNumber uint64 // 8 bytes + L2BlockNumber uint64 // 8 bytes + Timestamp int64 // 8 bytes + GlobalExitRoot common.Hash // 32 bytes + Coinbase common.Address // 20 bytes + ForkID uint16 // 2 bytes + BlockHash common.Hash // 32 bytes + StateRoot common.Hash // 32 bytes +} + +// DSL2BlockStart represents a data stream L2 block start +type DSL2BlockStart struct { + BatchNumber uint64 // 8 bytes + L2BlockNumber uint64 // 8 bytes + Timestamp int64 // 8 bytes + GlobalExitRoot common.Hash // 32 bytes + Coinbase common.Address // 20 bytes + ForkID uint16 // 2 bytes +} + +// Encode returns the encoded DSL2BlockStart as a byte slice +func (b DSL2BlockStart) Encode() []byte { + bytes := make([]byte, 0) + bytes = binary.LittleEndian.AppendUint64(bytes, b.BatchNumber) + bytes = binary.LittleEndian.AppendUint64(bytes, b.L2BlockNumber) + bytes = binary.LittleEndian.AppendUint64(bytes, uint64(b.Timestamp)) + bytes = append(bytes, b.GlobalExitRoot.Bytes()...) + bytes = append(bytes, b.Coinbase.Bytes()...) + bytes = binary.LittleEndian.AppendUint16(bytes, b.ForkID) + return bytes +} + +// DSL2Transaction represents a data stream L2 transaction +type DSL2Transaction struct { + EffectiveGasPricePercentage uint8 // 1 byte + IsValid uint8 // 1 byte + EncodedLength uint32 // 4 bytes + Encoded []byte +} + +// Encode returns the encoded DSL2Transaction as a byte slice +func (l DSL2Transaction) Encode() []byte { + bytes := make([]byte, 0) + bytes = append(bytes, byte(l.EffectiveGasPricePercentage)) + bytes = append(bytes, byte(l.IsValid)) + bytes = binary.LittleEndian.AppendUint32(bytes, l.EncodedLength) + bytes = append(bytes, l.Encoded...) + return bytes +} + +// DSL2BlockEnd represents a L2 block end +type DSL2BlockEnd struct { + L2BlockNumber uint64 // 8 bytes + BlockHash common.Hash // 32 bytes + StateRoot common.Hash // 32 bytes +} + +// Encode returns the encoded DSL2BlockEnd as a byte slice +func (b DSL2BlockEnd) Encode() []byte { + bytes := make([]byte, 0) + bytes = binary.LittleEndian.AppendUint64(bytes, b.L2BlockNumber) + bytes = append(bytes, b.BlockHash.Bytes()...) + bytes = append(bytes, b.StateRoot.Bytes()...) + return bytes +} diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index a97c0c9e7e..5e5ef76ade 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -2549,3 +2549,117 @@ func (p *PostgresStorage) UpdateForkID(ctx context.Context, forkID ForkIDInterva } return nil } + +// GetDSGenesisBlock returns the genesis block +func (p *PostgresStorage) GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*DSL2Block, error) { + const genesisL2BlockSQL = `SELECT 0 as batch_num, l2b.block_num, l2b.created_at, '0x0000000000000000000000000000000000000000' as global_exit_root, l2b.header->>'miner' AS coinbase, 0 as fork_id, l2b.block_hash, l2b.state_root + FROM state.l2block l2b + WHERE l2b.block_num = 0` + + e := p.getExecQuerier(dbTx) + + row := e.QueryRow(ctx, genesisL2BlockSQL) + + l2block, err := scanL2Block(row) + if err != nil { + return nil, err + } + + return l2block, nil +} + +// GetDSL2Blocks returns the L2 blocks +func (p *PostgresStorage) GetDSL2Blocks(ctx context.Context, limit, offset uint64, dbTx pgx.Tx) ([]*DSL2Block, error) { + const l2BlockSQL = `SELECT l2b.batch_num, l2b.block_num, l2b.created_at, b.global_exit_root, l2b.header->>'miner' AS coinbase, f.fork_id, l2b.block_hash, l2b.state_root + FROM state.l2block l2b, state.batch b, state.fork_id f + WHERE l2b.batch_num = b.batch_num AND l2b.batch_num between f.from_batch_num AND f.to_batch_num + ORDER BY l2b.block_num ASC limit $1 offset $2` + e := p.getExecQuerier(dbTx) + rows, err := e.Query(ctx, l2BlockSQL, limit, offset) + if err != nil { + return nil, err + } + defer rows.Close() + + l2blocks := make([]*DSL2Block, 0, len(rows.RawValues())) + + for rows.Next() { + l2block, err := scanL2Block(rows) + if err != nil { + return nil, err + } + l2blocks = append(l2blocks, l2block) + } + + return l2blocks, nil +} + +func scanL2Block(row pgx.Row) (*DSL2Block, error) { + l2Block := DSL2Block{} + var ( + gerStr string + coinbaseStr string + timestamp time.Time + blockHashStr string + stateRootStr string + ) + if err := row.Scan( + &l2Block.BatchNumber, + &l2Block.L2BlockNumber, + ×tamp, + &gerStr, + &coinbaseStr, + &l2Block.ForkID, + &blockHashStr, + &stateRootStr, + ); err != nil { + return &l2Block, err + } + l2Block.GlobalExitRoot = common.HexToHash(gerStr) + l2Block.Coinbase = common.HexToAddress(coinbaseStr) + l2Block.Timestamp = timestamp.Unix() + l2Block.BlockHash = common.HexToHash(blockHashStr) + l2Block.StateRoot = common.HexToHash(stateRootStr) + + return &l2Block, nil +} + +// GetDSL2Transactions returns the L2 transactions +func (p *PostgresStorage) GetDSL2Transactions(ctx context.Context, minL2Block, maxL2Block uint64, dbTx pgx.Tx) ([]*DSL2Transaction, error) { + const l2TxSQL = `SELECT t.effective_percentage, LENGTH(t.encoded), t.encoded + FROM state.transaction t + WHERE l2_block_num BETWEEN $1 AND $2 + ORDER BY t.l2_block_num ASC` + + e := p.getExecQuerier(dbTx) + rows, err := e.Query(ctx, l2TxSQL, minL2Block, maxL2Block) + if err != nil { + return nil, err + } + defer rows.Close() + + l2Txs := make([]*DSL2Transaction, 0, len(rows.RawValues())) + + for rows.Next() { + l2Tx, err := scanL2Transaction(rows) + if err != nil { + return nil, err + } + l2Txs = append(l2Txs, l2Tx) + } + + return l2Txs, nil +} + +func scanL2Transaction(row pgx.Row) (*DSL2Transaction, error) { + l2Transaction := DSL2Transaction{} + if err := row.Scan( + &l2Transaction.EffectiveGasPricePercentage, + &l2Transaction.EncodedLength, + &l2Transaction.Encoded, + ); err != nil { + return &l2Transaction, err + } + l2Transaction.IsValid = 1 + return &l2Transaction, nil +} diff --git a/state/test/datastream_test.go b/state/test/datastream_test.go new file mode 100644 index 0000000000..9c2002b842 --- /dev/null +++ b/state/test/datastream_test.go @@ -0,0 +1,55 @@ +package test + +import ( + "testing" + + "github.com/0xPolygonHermez/zkevm-node/state" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" +) + +func TestL2BlockStartEncode(t *testing.T) { + l2BlockStart := state.DSL2BlockStart{ + BatchNumber: 1, // 8 bytes + L2BlockNumber: 2, // 8 bytes + Timestamp: 3, // 8 bytes + GlobalExitRoot: common.HexToHash("0x04"), // 32 bytes + Coinbase: common.HexToAddress("0x05"), // 20 bytes + ForkID: 5, + } + + encoded := l2BlockStart.Encode() + expected := []byte{1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 5, 0} + + assert.Equal(t, expected, encoded) +} + +func TestL2TransactionEncode(t *testing.T) { + l2Transaction := state.DSL2Transaction{ + EffectiveGasPricePercentage: 128, // 1 byte + IsValid: 1, // 1 byte + EncodedLength: 5, // 4 bytes + Encoded: []byte{1, 2, 3, 4, 5}, // 5 bytes + } + + encoded := l2Transaction.Encode() + expected := []byte{128, 1, 5, 0, 0, 0, 1, 2, 3, 4, 5} + assert.Equal(t, expected, encoded) +} + +func TestL2BlockEndEncode(t *testing.T) { + l2BlockEnd := state.DSL2BlockEnd{ + L2BlockNumber: 1, // 8 bytes + BlockHash: common.HexToHash("0x02"), // 32 bytes + StateRoot: common.HexToHash("0x03"), // 32 bytes + } + + encoded := l2BlockEnd.Encode() + expected := []byte{1, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3} + + assert.Equal(t, expected, encoded) +} diff --git a/tools/datastreamer/Makefile b/tools/datastreamer/Makefile new file mode 100644 index 0000000000..46af8cf882 --- /dev/null +++ b/tools/datastreamer/Makefile @@ -0,0 +1,23 @@ +# Check dependencies +# Check for Go +.PHONY: check-go +check-go: + @which go > /dev/null || (echo "Error: Go is not installed" && exit 1) + +# Targets that require the checks +run-tool: check-go + +.PHONY: run-tool +run-tool: ## Runs the tool to populate the binary file + go run main.go run -cfg config/tool.config.toml + +## Help display. +## Pulls comments from beside commands and prints a nicely formatted +## display with the commands and their usage information. +.DEFAULT_GOAL := help + +.PHONY: help +help: ## Prints this help + @grep -h -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) \ + | sort \ + | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' diff --git a/tools/datastreamer/config/config.go b/tools/datastreamer/config/config.go new file mode 100644 index 0000000000..ff869dba79 --- /dev/null +++ b/tools/datastreamer/config/config.go @@ -0,0 +1,86 @@ +package config + +import ( + "bytes" + "path/filepath" + "strings" + + "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" + "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/0xPolygonHermez/zkevm-node/tools/datastreamer/db" + "github.com/mitchellh/mapstructure" + "github.com/spf13/viper" + "github.com/urfave/cli/v2" +) + +const ( + // FlagCfg is the flag for cfg + FlagCfg = "cfg" +) + +// Config is the configuration for the tool +type Config struct { + StreamServer datastreamer.Config `mapstructure:"StreamServer"` + StateDB db.Config `mapstructure:"StateDB"` +} + +// Default parses the default configuration values. +func Default() (*Config, error) { + var cfg Config + viper.SetConfigType("toml") + + err := viper.ReadConfig(bytes.NewBuffer([]byte(DefaultValues))) + if err != nil { + return nil, err + } + err = viper.Unmarshal(&cfg, viper.DecodeHook(mapstructure.TextUnmarshallerHookFunc())) + if err != nil { + return nil, err + } + return &cfg, nil +} + +// Load parses the configuration values from the config file and environment variables +func Load(ctx *cli.Context) (*Config, error) { + cfg, err := Default() + if err != nil { + return nil, err + } + configFilePath := ctx.String(FlagCfg) + if configFilePath != "" { + dirName, fileName := filepath.Split(configFilePath) + + fileExtension := strings.TrimPrefix(filepath.Ext(fileName), ".") + fileNameWithoutExtension := strings.TrimSuffix(fileName, "."+fileExtension) + + viper.AddConfigPath(dirName) + viper.SetConfigName(fileNameWithoutExtension) + viper.SetConfigType(fileExtension) + } + viper.AutomaticEnv() + replacer := strings.NewReplacer(".", "_") + viper.SetEnvKeyReplacer(replacer) + viper.SetEnvPrefix("ZKEVM_DATA_STREAMER") + err = viper.ReadInConfig() + if err != nil { + _, ok := err.(viper.ConfigFileNotFoundError) + if ok { + log.Infof("config file not found") + } else { + log.Infof("error reading config file: ", err) + return nil, err + } + } + + decodeHooks := []viper.DecoderConfigOption{ + // this allows arrays to be decoded from env var separated by ",", example: MY_VAR="value1,value2,value3" + viper.DecodeHook(mapstructure.ComposeDecodeHookFunc(mapstructure.TextUnmarshallerHookFunc(), mapstructure.StringToSliceHookFunc(","))), + } + + err = viper.Unmarshal(&cfg, decodeHooks...) + if err != nil { + return nil, err + } + + return cfg, nil +} diff --git a/tools/datastreamer/config/default.go b/tools/datastreamer/config/default.go new file mode 100644 index 0000000000..9ff7dcbd2f --- /dev/null +++ b/tools/datastreamer/config/default.go @@ -0,0 +1,21 @@ +package config + +// DefaultValues is the default configuration +const DefaultValues = ` +[StreamServer] +Port = 8080 +Filename = "datastreamer.bin" + [Log] + Environment = "development" # "production" or "development" + Level = "info" + Outputs = ["stderr"] + +[StateDB] +User = "state_user" +Password = "state_password" +Name = "state_db" +Host = "localhost" +Port = "5432" +EnableLog = false +MaxConns = 200 +` diff --git a/tools/datastreamer/config/tool.config.toml b/tools/datastreamer/config/tool.config.toml new file mode 100644 index 0000000000..f3179c1a0a --- /dev/null +++ b/tools/datastreamer/config/tool.config.toml @@ -0,0 +1,17 @@ + +[StreamServer] +Port = 6900 +Filename = "streamfile.bin" + [StreamServer.Log] + Environment = "development" + Level = "debug" + Outputs = ["stdout"] + +[StateDB] +User = "state_user" +Password = "state_password" +Name = "state_db" +Host = "localhost" +Port = "5432" +EnableLog = false +MaxConns = 200 diff --git a/tools/datastreamer/db/config.go b/tools/datastreamer/db/config.go new file mode 100644 index 0000000000..434f77f1f5 --- /dev/null +++ b/tools/datastreamer/db/config.go @@ -0,0 +1,19 @@ +package db + +// Config is the configuration for the database +type Config struct { + // Database name + Name string `mapstructure:"Name"` + // Database User name + User string `mapstructure:"User"` + // Database Password of the user + Password string `mapstructure:"Password"` + // Host address of database + Host string `mapstructure:"Host"` + // Port Number of database + Port string `mapstructure:"Port"` + // EnableLog + EnableLog bool `mapstructure:"EnableLog"` + // MaxConns is the maximum number of connections in the pool. + MaxConns int `mapstructure:"MaxConns"` +} diff --git a/tools/datastreamer/db/db.go b/tools/datastreamer/db/db.go new file mode 100644 index 0000000000..3bfa1ebab7 --- /dev/null +++ b/tools/datastreamer/db/db.go @@ -0,0 +1,154 @@ +package db + +import ( + "context" + "fmt" + "time" + + "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/0xPolygonHermez/zkevm-node/state" + "github.com/ethereum/go-ethereum/common" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" +) + +// StateDB implements the StateDB interface +type StateDB struct { + *pgxpool.Pool +} + +// NewStateDB creates a new StateDB +func NewStateDB(db *pgxpool.Pool) *StateDB { + return &StateDB{ + db, + } +} + +// NewSQLDB creates a new SQL DB +func NewSQLDB(cfg Config) (*pgxpool.Pool, error) { + config, err := pgxpool.ParseConfig(fmt.Sprintf("postgres://%s:%s@%s:%s/%s?pool_max_conns=%d", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Name, cfg.MaxConns)) + if err != nil { + log.Errorf("Unable to parse DB config: %v\n", err) + return nil, err + } + if cfg.EnableLog { + config.ConnConfig.Logger = logger{} + } + conn, err := pgxpool.ConnectConfig(context.Background(), config) + if err != nil { + log.Errorf("Unable to connect to database: %v\n", err) + return nil, err + } + return conn, nil +} + +// GetGenesisBlock returns the genesis block +func (db *StateDB) GetGenesisBlock(ctx context.Context) (*state.DSL2Block, error) { + const genesisL2BlockSQL = `SELECT 0 as batch_num, l2b.block_num, l2b.created_at, '0x0000000000000000000000000000000000000000' as global_exit_root, l2b.header->>'miner' AS coinbase, 0 as fork_id, l2b.block_hash, l2b.state_root + FROM state.l2block l2b + WHERE l2b.block_num = 0` + + row := db.QueryRow(ctx, genesisL2BlockSQL) + + l2block, err := scanL2Block(row) + if err != nil { + return nil, err + } + + return l2block, nil +} + +// GetL2Blocks returns the L2 blocks +func (db *StateDB) GetL2Blocks(ctx context.Context, limit, offset uint64) ([]*state.DSL2Block, error) { + const l2BlockSQL = `SELECT l2b.batch_num, l2b.block_num, l2b.created_at, b.global_exit_root, l2b.header->>'miner' AS coinbase, f.fork_id, l2b.block_hash, l2b.state_root + FROM state.l2block l2b, state.batch b, state.fork_id f + WHERE l2b.batch_num = b.batch_num AND l2b.batch_num between f.from_batch_num AND f.to_batch_num + ORDER BY l2b.block_num ASC limit $1 offset $2` + + rows, err := db.Query(ctx, l2BlockSQL, limit, offset) + if err != nil { + return nil, err + } + defer rows.Close() + + l2blocks := make([]*state.DSL2Block, 0, len(rows.RawValues())) + + for rows.Next() { + l2block, err := scanL2Block(rows) + if err != nil { + return nil, err + } + l2blocks = append(l2blocks, l2block) + } + + return l2blocks, nil +} + +func scanL2Block(row pgx.Row) (*state.DSL2Block, error) { + l2Block := state.DSL2Block{} + var ( + gerStr string + coinbaseStr string + timestamp time.Time + blockHashStr string + stateRootStr string + ) + if err := row.Scan( + &l2Block.BatchNumber, + &l2Block.L2BlockNumber, + ×tamp, + &gerStr, + &coinbaseStr, + &l2Block.ForkID, + &blockHashStr, + &stateRootStr, + ); err != nil { + return &l2Block, err + } + l2Block.GlobalExitRoot = common.HexToHash(gerStr) + l2Block.Coinbase = common.HexToAddress(coinbaseStr) + l2Block.Timestamp = timestamp.Unix() + l2Block.BlockHash = common.HexToHash(blockHashStr) + l2Block.StateRoot = common.HexToHash(stateRootStr) + + return &l2Block, nil +} + +// GetL2Transactions returns the L2 transactions +func (db *StateDB) GetL2Transactions(ctx context.Context, minL2Block, maxL2Block uint64) ([]*state.DSL2Transaction, error) { + const l2TxSQL = `SELECT t.effective_percentage, LENGTH(t.encoded), t.encoded + FROM state.transaction t + WHERE l2_block_num BETWEEN $1 AND $2 + ORDER BY t.l2_block_num ASC` + + rows, err := db.Query(ctx, l2TxSQL, minL2Block, maxL2Block) + if err != nil { + return nil, err + } + defer rows.Close() + + l2Txs := make([]*state.DSL2Transaction, 0, len(rows.RawValues())) + + for rows.Next() { + l2Tx, err := scanL2Transaction(rows) + if err != nil { + return nil, err + } + l2Txs = append(l2Txs, l2Tx) + } + + return l2Txs, nil +} + +func scanL2Transaction(row pgx.Row) (*state.DSL2Transaction, error) { + l2Transaction := state.DSL2Transaction{} + if err := row.Scan( + &l2Transaction.EffectiveGasPricePercentage, + &l2Transaction.EncodedLength, + &l2Transaction.Encoded, + ); err != nil { + return &l2Transaction, err + } + l2Transaction.IsValid = 1 + return &l2Transaction, nil +} diff --git a/tools/datastreamer/db/logger.go b/tools/datastreamer/db/logger.go new file mode 100644 index 0000000000..06177005ae --- /dev/null +++ b/tools/datastreamer/db/logger.go @@ -0,0 +1,27 @@ +package db + +import ( + "context" + "fmt" + + "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/jackc/pgx/v4" +) + +type logger struct{} + +func (l logger) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) { + m := fmt.Sprintf("%s %v", msg, data) + + switch level { + case pgx.LogLevelInfo: + log.Info(m) + case pgx.LogLevelWarn: + log.Warn(m) + case pgx.LogLevelError: + log.Error(m) + default: + m = fmt.Sprintf("%s %s %v", level.String(), msg, data) + log.Debug(m) + } +} diff --git a/tools/datastreamer/main.go b/tools/datastreamer/main.go new file mode 100644 index 0000000000..29f530c356 --- /dev/null +++ b/tools/datastreamer/main.go @@ -0,0 +1,268 @@ +package main + +import ( + "encoding/binary" + "os" + "reflect" + + "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" + "github.com/0xPolygonHermez/zkevm-data-streamer/log" + "github.com/0xPolygonHermez/zkevm-node/state" + "github.com/0xPolygonHermez/zkevm-node/tools/datastreamer/config" + "github.com/0xPolygonHermez/zkevm-node/tools/datastreamer/db" + "github.com/urfave/cli/v2" +) + +const appName = "zkevm-data-streamer-tool" + +var ( + configFileFlag = cli.StringFlag{ + Name: config.FlagCfg, + Aliases: []string{"c"}, + Usage: "Configuration `FILE`", + Required: false, + } +) + +func main() { + app := cli.NewApp() + app.Name = appName + + flags := []cli.Flag{ + &configFileFlag, + } + + app.Commands = []*cli.Command{ + { + Name: "run", + Aliases: []string{}, + Usage: "Run the tool", + Action: start, + Flags: flags, + }, + } + + err := app.Run(os.Args) + if err != nil { + log.Fatal(err) + os.Exit(1) + } +} + +func start(cliCtx *cli.Context) error { + c, err := config.Load(cliCtx) + if err != nil { + return err + } + log.Infof("Loaded configuration: %+v", c) + + // Init logger + log.Init(c.StreamServer.Log) + log.Info("Starting tool") + + // Create a stream server + streamServer, err := datastreamer.New(c.StreamServer.Port, state.StreamTypeSequencer, c.StreamServer.Filename, &c.StreamServer.Log) + if err != nil { + log.Fatal(err) + } + + // Set entities definition + entriesDefinition := map[datastreamer.EntryType]datastreamer.EntityDefinition{ + state.EntryTypeL2BlockStart: { + Name: "L2BlockStart", + StreamType: state.StreamTypeSequencer, + Definition: reflect.TypeOf(state.DSL2BlockStart{}), + }, + state.EntryTypeL2Tx: { + Name: "L2Transaction", + StreamType: state.StreamTypeSequencer, + Definition: reflect.TypeOf(state.DSL2Transaction{}), + }, + state.EntryTypeL2BlockEnd: { + Name: "L2BlockEnd", + StreamType: state.StreamTypeSequencer, + Definition: reflect.TypeOf(state.DSL2BlockEnd{}), + }, + } + + streamServer.SetEntriesDef(entriesDefinition) + err = streamServer.Start() + if err != nil { + log.Fatal(err) + } + + // Connect to the database + stateSqlDB, err := db.NewSQLDB(c.StateDB) + if err != nil { + log.Fatal(err) + } + defer stateSqlDB.Close() + stateDB := db.NewStateDB(stateSqlDB) + log.Info("Connected to the database") + + header := streamServer.GetHeader() + + var currentL2Block uint64 + var currentTxIndex uint64 + + if header.TotalEntries == 0 { + // Get Genesis block + genesisL2Block, err := stateDB.GetGenesisBlock(cliCtx.Context) + if err != nil { + log.Fatal(err) + } + + err = streamServer.StartAtomicOp() + if err != nil { + log.Fatal(err) + } + + genesisBlock := state.DSL2BlockStart{ + BatchNumber: genesisL2Block.BatchNumber, + L2BlockNumber: genesisL2Block.L2BlockNumber, + Timestamp: genesisL2Block.Timestamp, + GlobalExitRoot: genesisL2Block.GlobalExitRoot, + Coinbase: genesisL2Block.Coinbase, + ForkID: genesisL2Block.ForkID, + } + + log.Infof("Genesis block: %+v", genesisBlock) + + _, err = streamServer.AddStreamEntry(1, genesisBlock.Encode()) + if err != nil { + log.Fatal(err) + } + + genesisBlockEnd := state.DSL2BlockEnd{ + L2BlockNumber: genesisL2Block.L2BlockNumber, + BlockHash: genesisL2Block.BlockHash, + StateRoot: genesisL2Block.StateRoot, + } + + _, err = streamServer.AddStreamEntry(state.EntryTypeL2BlockEnd, genesisBlockEnd.Encode()) + if err != nil { + log.Fatal(err) + } + + err = streamServer.CommitAtomicOp() + if err != nil { + log.Fatal(err) + } + } else { + latestEntry, err := streamServer.GetEntry(header.TotalEntries - 1) + if err != nil { + log.Fatal(err) + } + + log.Infof("Latest entry: %+v", latestEntry) + + switch latestEntry.EntryType { + case state.EntryTypeL2BlockStart: + log.Info("Latest entry type is L2BlockStart") + currentL2Block = binary.LittleEndian.Uint64(latestEntry.Data[8:16]) + case state.EntryTypeL2Tx: + log.Info("Latest entry type is L2Tx") + + for latestEntry.EntryType == state.EntryTypeL2Tx { + currentTxIndex++ + latestEntry, err = streamServer.GetEntry(header.TotalEntries - currentTxIndex) + if err != nil { + log.Fatal(err) + } + } + + if latestEntry.EntryType != state.EntryTypeL2BlockStart { + log.Fatal("Latest entry is not a L2BlockStart") + } + currentL2Block = binary.LittleEndian.Uint64(latestEntry.Data[8:16]) + + case state.EntryTypeL2BlockEnd: + log.Info("Latest entry type is L2BlockEnd") + currentL2Block = binary.LittleEndian.Uint64(latestEntry.Data[0:8]) + } + } + + log.Infof("Current transaction index: %d", currentTxIndex) + if currentTxIndex == 0 { + currentL2Block++ + } + log.Infof("Current L2 block number: %d", currentL2Block) + + var limit uint64 = 1000 + var offset uint64 = currentL2Block + var entry uint64 = header.TotalEntries + var l2blocks []*state.DSL2Block + + if entry > 0 { + entry-- + } + + for err == nil { + log.Infof("Current entry number: %d", entry) + + l2blocks, err = stateDB.GetL2Blocks(cliCtx.Context, limit, offset) + offset += limit + if len(l2blocks) == 0 { + break + } + // Get transactions for all the retrieved l2 blocks + l2Transactions, err := stateDB.GetL2Transactions(cliCtx.Context, l2blocks[0].L2BlockNumber, l2blocks[len(l2blocks)-1].L2BlockNumber) + if err != nil { + log.Fatal(err) + } + + err = streamServer.StartAtomicOp() + if err != nil { + log.Fatal(err) + } + + for x, l2block := range l2blocks { + if currentTxIndex > 0 { + x += int(currentTxIndex) + currentTxIndex = 0 + } + + blockStart := state.DSL2BlockStart{ + BatchNumber: l2block.BatchNumber, + L2BlockNumber: l2block.L2BlockNumber, + Timestamp: l2block.Timestamp, + GlobalExitRoot: l2block.GlobalExitRoot, + Coinbase: l2block.Coinbase, + ForkID: l2block.ForkID, + } + + _, err = streamServer.AddStreamEntry(state.EntryTypeL2BlockStart, blockStart.Encode()) + if err != nil { + log.Fatal(err) + } + + entry, err = streamServer.AddStreamEntry(state.EntryTypeL2Tx, l2Transactions[x].Encode()) + if err != nil { + log.Fatal(err) + } + + blockEnd := state.DSL2BlockEnd{ + L2BlockNumber: l2block.L2BlockNumber, + BlockHash: l2block.BlockHash, + StateRoot: l2block.StateRoot, + } + + _, err = streamServer.AddStreamEntry(state.EntryTypeL2BlockEnd, blockEnd.Encode()) + if err != nil { + log.Fatal(err) + } + } + err = streamServer.CommitAtomicOp() + if err != nil { + log.Fatal(err) + } + } + + if err != nil { + log.Fatal(err) + } + + log.Info("Finished tool") + + return nil +}