Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor data stream tool (#2653) #2657

Merged
merged 3 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func Warnf(template string, args ...interface{}) {
// Fatalf calls log.Fatalf on the root Logger.
func Fatalf(template string, args ...interface{}) {
args = appendStackTraceMaybeArgs(args)
getDefaultLog().Fatalf(template+" %s", args...)
getDefaultLog().Fatalf(template, args...)
}

// Errorf calls log.Errorf on the root logger and stores the error message into
Expand Down
24 changes: 18 additions & 6 deletions state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2598,7 +2598,7 @@ func (p *PostgresStorage) UpdateForkID(ctx context.Context, forkID ForkIDInterva

// GetDSGenesisBlock returns the genesis block
func (p *PostgresStorage) GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*DSL2Block, error) {
const genesisL2BlockSQL = `SELECT 0 as batch_num, l2b.block_num, l2b.created_at, '0x0000000000000000000000000000000000000000' as global_exit_root, l2b.header->>'miner' AS coinbase, 0 as fork_id, l2b.block_hash, l2b.state_root
const genesisL2BlockSQL = `SELECT 0 as batch_num, l2b.block_num, l2b.received_at, '0x0000000000000000000000000000000000000000' as global_exit_root, l2b.header->>'miner' AS coinbase, 0 as fork_id, l2b.block_hash, l2b.state_root
FROM state.l2block l2b
WHERE l2b.block_num = 0`

Expand All @@ -2616,7 +2616,7 @@ func (p *PostgresStorage) GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*

// GetDSL2Blocks returns the L2 blocks
func (p *PostgresStorage) GetDSL2Blocks(ctx context.Context, limit, offset uint64, dbTx pgx.Tx) ([]*DSL2Block, error) {
const l2BlockSQL = `SELECT l2b.batch_num, l2b.block_num, l2b.created_at, b.global_exit_root, l2b.header->>'miner' AS coinbase, f.fork_id, l2b.block_hash, l2b.state_root
const l2BlockSQL = `SELECT l2b.batch_num, l2b.block_num, l2b.received_at, b.global_exit_root, l2b.header->>'miner' AS coinbase, f.fork_id, l2b.block_hash, l2b.state_root
FROM state.l2block l2b, state.batch b, state.fork_id f
WHERE l2b.batch_num = b.batch_num AND l2b.batch_num between f.from_batch_num AND f.to_batch_num
ORDER BY l2b.block_num ASC limit $1 offset $2`
Expand Down Expand Up @@ -2672,7 +2672,7 @@ func scanL2Block(row pgx.Row) (*DSL2Block, error) {

// GetDSL2Transactions returns the L2 transactions
func (p *PostgresStorage) GetDSL2Transactions(ctx context.Context, minL2Block, maxL2Block uint64, dbTx pgx.Tx) ([]*DSL2Transaction, error) {
const l2TxSQL = `SELECT t.effective_percentage, LENGTH(t.encoded), t.encoded
const l2TxSQL = `SELECT t.effective_percentage, t.encoded
FROM state.transaction t
WHERE l2_block_num BETWEEN $1 AND $2
ORDER BY t.l2_block_num ASC`
Expand All @@ -2699,13 +2699,25 @@ func (p *PostgresStorage) GetDSL2Transactions(ctx context.Context, minL2Block, m

func scanL2Transaction(row pgx.Row) (*DSL2Transaction, error) {
l2Transaction := DSL2Transaction{}
encoded := []byte{}
if err := row.Scan(
&l2Transaction.EffectiveGasPricePercentage,
&l2Transaction.EncodedLength,
&l2Transaction.Encoded,
&encoded,
); err != nil {
return &l2Transaction, err
return nil, err
}
tx, err := DecodeTx(string(encoded))
if err != nil {
return nil, err
}

binaryTxData, err := tx.MarshalBinary()
if err != nil {
return nil, err
}

l2Transaction.Encoded = binaryTxData
l2Transaction.EncodedLength = uint32(len(l2Transaction.Encoded))
l2Transaction.IsValid = 1
return &l2Transaction, nil
}
1 change: 1 addition & 0 deletions tools/datastreamer/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ check-go:

# Targets that require the checks
generate-file: check-go
validate-file: check-go
rebuild: check-go
decode-entry: check-go
decode-l2block: check-go
Expand Down
3 changes: 2 additions & 1 deletion tools/datastreamer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"strings"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/0xPolygonHermez/zkevm-node/tools/datastreamer/db"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
"github.com/urfave/cli/v2"
Expand All @@ -21,6 +21,7 @@ const (

// Config is the configuration for the tool
type Config struct {
QuerySize uint64 `mapstructure:"QuerySize"`
StreamServer datastreamer.Config `mapstructure:"StreamServer"`
StateDB db.Config `mapstructure:"StateDB"`
Executor executor.Config `mapstructure:"Executor"`
Expand Down
3 changes: 2 additions & 1 deletion tools/datastreamer/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package config

// DefaultValues is the default configuration
const DefaultValues = `
QuerySize = 1000
[StreamServer]
Port = 8080
Port = 6901
Filename = "datastreamer.bin"
[Log]
Environment = "development" # "production" or "development"
Expand Down
1 change: 1 addition & 0 deletions tools/datastreamer/config/tool.config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
QuerySize = 10000

[StreamServer]
Port = 6901
Expand Down
19 changes: 0 additions & 19 deletions tools/datastreamer/db/config.go

This file was deleted.

166 changes: 0 additions & 166 deletions tools/datastreamer/db/db.go

This file was deleted.

27 changes: 0 additions & 27 deletions tools/datastreamer/db/logger.go

This file was deleted.

10 changes: 8 additions & 2 deletions tools/datastreamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-data-streamer/log"
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/0xPolygonHermez/zkevm-node/tools/datastreamer/config"
"github.com/0xPolygonHermez/zkevm-node/tools/datastreamer/db"
"github.com/ethereum/go-ethereum/common"
"github.com/fatih/color"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -125,6 +125,12 @@ func initializeStreamServer(c *config.Config) (*datastreamer.StreamServer, error
}

streamServer.SetEntriesDef(entriesDefinition)

err = streamServer.Start()
if err != nil {
return nil, err
}

return &streamServer, nil
}

Expand Down Expand Up @@ -250,7 +256,7 @@ func generate(cliCtx *cli.Context) error {
log.Infof("Current transaction index: %d", currentTxIndex)
log.Infof("Current L2 block number: %d", currentL2Block)

var limit uint64 = 1000
var limit uint64 = c.QuerySize
var offset uint64 = currentL2Block
var entry uint64 = header.TotalEntries
var l2blocks []*state.DSL2Block
Expand Down