Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use gas_price index/order/limit when load pending tx from pool #112

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ StateConsistencyCheckInterval = "5s"
Filename = ""
Version = 0
Enabled = false
LoadPendingTxsLimit = 0

[SequenceSender]
WaitPeriodSendSequence = "5s"
Expand Down
5 changes: 5 additions & 0 deletions db/migrations/pool/0014.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- +migrate Up
CREATE INDEX IF NOT EXISTS idx_transaction_gas_price ON pool.transaction (gas_price);

-- +migrate Down
DROP INDEX IF EXISTS pool.idx_transaction_gas_price;
2 changes: 1 addition & 1 deletion docs/config-file/node-config-doc.html

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions docs/config-file/node-config-doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -1937,6 +1937,7 @@ DataSourcePriority=["local", "trusted", "external"]
| - [StateConsistencyCheckInterval](#Sequencer_StateConsistencyCheckInterval ) | No | string | No | - | Duration |
| - [Finalizer](#Sequencer_Finalizer ) | No | object | No | - | Finalizer's specific config properties |
| - [StreamServer](#Sequencer_StreamServer ) | No | object | No | - | StreamServerCfg is the config for the stream server |
| - [LoadPendingTxsLimit](#Sequencer_LoadPendingTxsLimit ) | No | integer | No | - | LoadPendingTxsLimit is used to limit amount txs from the db |

### <a name="Sequencer_DeletePoolTxsL1BlockConfirmations"></a>10.1. `Sequencer.DeletePoolTxsL1BlockConfirmations`

Expand Down Expand Up @@ -2575,6 +2576,20 @@ Must be one of:
UpgradeEtrogBatchNumber=0
```

### <a name="Sequencer_LoadPendingTxsLimit"></a>10.9. `Sequencer.LoadPendingTxsLimit`

**Type:** : `integer`

**Default:** `0`

**Description:** LoadPendingTxsLimit is used to limit amount txs from the db

**Example setting the default value** (0):
```
[Sequencer]
LoadPendingTxsLimit=0
```

## <a name="SequenceSender"></a>11. `[SequenceSender]`

**Type:** : `object`
Expand Down
5 changes: 5 additions & 0 deletions docs/config-file/node-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,11 @@
"additionalProperties": false,
"type": "object",
"description": "StreamServerCfg is the config for the stream server"
},
"LoadPendingTxsLimit": {
"type": "integer",
"description": "LoadPendingTxsLimit is used to limit amount txs from the db",
"default": 0
}
},
"additionalProperties": false,
Expand Down
2 changes: 1 addition & 1 deletion pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type storage interface {
GetPendingTxHashesSince(ctx context.Context, since time.Time) ([]common.Hash, error)
GetTxsByFromAndNonce(ctx context.Context, from common.Address, nonce uint64) ([]Transaction, error)
GetTxsByStatus(ctx context.Context, state TxStatus, limit uint64) ([]Transaction, error)
GetNonWIPPendingTxs(ctx context.Context) ([]Transaction, error)
GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]Transaction, error)
IsTxPending(ctx context.Context, hash common.Hash) (bool, error)
SetGasPrices(ctx context.Context, l2GasPrice uint64, l1GasPrice uint64) error
DeleteGasPricesHistoryOlderThan(ctx context.Context, date time.Time) error
Expand Down
16 changes: 12 additions & 4 deletions pool/pgpoolstorage/pgpoolstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,24 @@ func (p *PostgresPoolStorage) GetTxsByStatus(ctx context.Context, status pool.Tx
}

// GetNonWIPPendingTxs returns an array of transactions
func (p *PostgresPoolStorage) GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error) {
// limit parameter is used to limit amount txs from the db,
// if limit = 0, then there is no limit
func (p *PostgresPoolStorage) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error) {
var (
rows pgx.Rows
err error
sql string
)

sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1`
rows, err = p.db.Query(ctx, sql, pool.TxStatusPending)
if limit == 0 {
sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC`
rows, err = p.db.Query(ctx, sql, pool.TxStatusPending)
} else {
sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC LIMIT $2`
rows, err = p.db.Query(ctx, sql, pool.TxStatusPending, limit)
}

if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,8 @@ func (p *Pool) GetPendingTxs(ctx context.Context, limit uint64) ([]Transaction,
}

// GetNonWIPPendingTxs from the pool
func (p *Pool) GetNonWIPPendingTxs(ctx context.Context) ([]Transaction, error) {
return p.storage.GetNonWIPPendingTxs(ctx)
func (p *Pool) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]Transaction, error) {
return p.storage.GetNonWIPPendingTxs(ctx, limit)
}

// GetSelectedTxs gets selected txs from the pool db
Expand Down
3 changes: 3 additions & 0 deletions sequencer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Config struct {

// StreamServerCfg is the config for the stream server
StreamServer StreamServerCfg `mapstructure:"StreamServer"`

// LoadPendingTxsLimit is used to limit amount txs from the db
LoadPendingTxsLimit uint64 `mapstructure:"LoadPendingTxsLimit"`
}

// StreamServerCfg contains the data streamer's configuration properties
Expand Down
2 changes: 1 addition & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type txPool interface {
DeleteFailedTransactionsOlderThan(ctx context.Context, date time.Time) error
DeleteTransactionByHash(ctx context.Context, hash common.Hash) error
MarkWIPTxsAsPending(ctx context.Context) error
GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error)
GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error)
UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus pool.TxStatus, isWIP bool, failedReason *string) error
GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, *state.ZKCounters, error)
UpdateTxWIPStatus(ctx context.Context, hash common.Hash, isWIP bool) error
Expand Down
2 changes: 1 addition & 1 deletion sequencer/mock_pool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *Sequencer) loadFromPool(ctx context.Context) {
return
}

poolTransactions, err := s.pool.GetNonWIPPendingTxs(ctx)
poolTransactions, err := s.pool.GetNonWIPPendingTxs(ctx, s.cfg.LoadPendingTxsLimit)
if err != nil && err != pool.ErrNotFound {
log.Errorf("error loading txs from pool, error: %v", err)
}
Expand Down