diff --git a/config/default.go b/config/default.go index a0ff2e2941..e2c3dc1dc6 100644 --- a/config/default.go +++ b/config/default.go @@ -162,6 +162,7 @@ StateConsistencyCheckInterval = "5s" Filename = "" Version = 0 Enabled = false +LoadPendingTxsLimit = 0 [SequenceSender] WaitPeriodSendSequence = "5s" diff --git a/db/migrations/pool/0014.sql b/db/migrations/pool/0014.sql new file mode 100644 index 0000000000..4b2bdb0de2 --- /dev/null +++ b/db/migrations/pool/0014.sql @@ -0,0 +1,5 @@ +-- +migrate Up +CREATE INDEX IF NOT EXISTS idx_transaction_gas_price ON pool.transaction (gas_price); + +-- +migrate Down +DROP INDEX IF EXISTS pool.idx_transaction_gas_price; \ No newline at end of file diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index 955862bf70..a9706b2431 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -54,7 +54,7 @@
"300ms"
HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches.
The Sequencer will halt after it closes the batch equal to this number
SequentialBatchSanityCheck indicates if the reprocess of a closed batch (sanity check) must be done in a
sequential way (instead than in parallel)
SequentialProcessL2Block indicates if the processing of a L2 Block must be done in the same finalizer go func instead
in the processPendingL2Blocks go func
Interval is the interval of time to calculate sequencer metrics
"1m"
"300ms"
-
EnableLog is a flag to enable/disable metrics logs
Port to listen on
Filename of the binary data file
Version of the binary data file
ChainID is the chain ID
Enabled is a flag to enable/disable the data streamer
UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1
"1m"
+
EnableLog is a flag to enable/disable metrics logs
Port to listen on
Filename of the binary data file
Version of the binary data file
ChainID is the chain ID
Enabled is a flag to enable/disable the data streamer
UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
LoadPendingTxsLimit is used to limit amount txs from the db
WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1
"1m"
"300ms"
LastBatchVirtualizationTimeMaxWaitPeriod is time since sequences should be sent
"1m"
"300ms"
diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md
index 94765fcf6c..d10585d0b2 100644
--- a/docs/config-file/node-config-doc.md
+++ b/docs/config-file/node-config-doc.md
@@ -1937,6 +1937,7 @@ DataSourcePriority=["local", "trusted", "external"]
| - [StateConsistencyCheckInterval](#Sequencer_StateConsistencyCheckInterval ) | No | string | No | - | Duration |
| - [Finalizer](#Sequencer_Finalizer ) | No | object | No | - | Finalizer's specific config properties |
| - [StreamServer](#Sequencer_StreamServer ) | No | object | No | - | StreamServerCfg is the config for the stream server |
+| - [LoadPendingTxsLimit](#Sequencer_LoadPendingTxsLimit ) | No | integer | No | - | LoadPendingTxsLimit is used to limit amount txs from the db |
### 10.1. `Sequencer.DeletePoolTxsL1BlockConfirmations`
@@ -2575,6 +2576,20 @@ Must be one of:
UpgradeEtrogBatchNumber=0
```
+### 10.9. `Sequencer.LoadPendingTxsLimit`
+
+**Type:** : `integer`
+
+**Default:** `0`
+
+**Description:** LoadPendingTxsLimit is used to limit amount txs from the db
+
+**Example setting the default value** (0):
+```
+[Sequencer]
+LoadPendingTxsLimit=0
+```
+
## 11. `[SequenceSender]`
**Type:** : `object`
diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json
index a67ec40bd8..5e9fe9e25b 100644
--- a/docs/config-file/node-config-schema.json
+++ b/docs/config-file/node-config-schema.json
@@ -986,6 +986,11 @@
"additionalProperties": false,
"type": "object",
"description": "StreamServerCfg is the config for the stream server"
+ },
+ "LoadPendingTxsLimit": {
+ "type": "integer",
+ "description": "LoadPendingTxsLimit is used to limit amount txs from the db",
+ "default": 0
}
},
"additionalProperties": false,
diff --git a/pool/interfaces.go b/pool/interfaces.go
index 4a2e9bd992..20db0171c4 100644
--- a/pool/interfaces.go
+++ b/pool/interfaces.go
@@ -21,7 +21,7 @@ type storage interface {
GetPendingTxHashesSince(ctx context.Context, since time.Time) ([]common.Hash, error)
GetTxsByFromAndNonce(ctx context.Context, from common.Address, nonce uint64) ([]Transaction, error)
GetTxsByStatus(ctx context.Context, state TxStatus, limit uint64) ([]Transaction, error)
- GetNonWIPPendingTxs(ctx context.Context) ([]Transaction, error)
+ GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]Transaction, error)
IsTxPending(ctx context.Context, hash common.Hash) (bool, error)
SetGasPrices(ctx context.Context, l2GasPrice uint64, l1GasPrice uint64) error
DeleteGasPricesHistoryOlderThan(ctx context.Context, date time.Time) error
diff --git a/pool/pgpoolstorage/pgpoolstorage.go b/pool/pgpoolstorage/pgpoolstorage.go
index fbc0aaea62..baab0ce14d 100644
--- a/pool/pgpoolstorage/pgpoolstorage.go
+++ b/pool/pgpoolstorage/pgpoolstorage.go
@@ -172,16 +172,24 @@ func (p *PostgresPoolStorage) GetTxsByStatus(ctx context.Context, status pool.Tx
}
// GetNonWIPPendingTxs returns an array of transactions
-func (p *PostgresPoolStorage) GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error) {
+// limit parameter is used to limit amount txs from the db,
+// if limit = 0, then there is no limit
+func (p *PostgresPoolStorage) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error) {
var (
rows pgx.Rows
err error
sql string
)
- sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
- used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1`
- rows, err = p.db.Query(ctx, sql, pool.TxStatusPending)
+ if limit == 0 {
+ sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
+ used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC`
+ rows, err = p.db.Query(ctx, sql, pool.TxStatusPending)
+ } else {
+ sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
+ used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC LIMIT $2`
+ rows, err = p.db.Query(ctx, sql, pool.TxStatusPending, limit)
+ }
if err != nil {
return nil, err
diff --git a/pool/pool.go b/pool/pool.go
index e9c3e9e4da..193103d89f 100644
--- a/pool/pool.go
+++ b/pool/pool.go
@@ -361,8 +361,8 @@ func (p *Pool) GetPendingTxs(ctx context.Context, limit uint64) ([]Transaction,
}
// GetNonWIPPendingTxs from the pool
-func (p *Pool) GetNonWIPPendingTxs(ctx context.Context) ([]Transaction, error) {
- return p.storage.GetNonWIPPendingTxs(ctx)
+func (p *Pool) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]Transaction, error) {
+ return p.storage.GetNonWIPPendingTxs(ctx, limit)
}
// GetSelectedTxs gets selected txs from the pool db
diff --git a/sequencer/config.go b/sequencer/config.go
index 8b813c52db..780467b129 100644
--- a/sequencer/config.go
+++ b/sequencer/config.go
@@ -30,6 +30,9 @@ type Config struct {
// StreamServerCfg is the config for the stream server
StreamServer StreamServerCfg `mapstructure:"StreamServer"`
+
+ // LoadPendingTxsLimit is used to limit amount txs from the db
+ LoadPendingTxsLimit uint64 `mapstructure:"LoadPendingTxsLimit"`
}
// StreamServerCfg contains the data streamer's configuration properties
diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go
index c92f502e10..e6530c1b3e 100644
--- a/sequencer/interfaces.go
+++ b/sequencer/interfaces.go
@@ -21,7 +21,7 @@ type txPool interface {
DeleteFailedTransactionsOlderThan(ctx context.Context, date time.Time) error
DeleteTransactionByHash(ctx context.Context, hash common.Hash) error
MarkWIPTxsAsPending(ctx context.Context) error
- GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error)
+ GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error)
UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus pool.TxStatus, isWIP bool, failedReason *string) error
GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, *state.ZKCounters, error)
UpdateTxWIPStatus(ctx context.Context, hash common.Hash, isWIP bool) error
diff --git a/sequencer/mock_pool.go b/sequencer/mock_pool.go
index 00bc480699..22d9b44056 100644
--- a/sequencer/mock_pool.go
+++ b/sequencer/mock_pool.go
@@ -180,7 +180,7 @@ func (_m *PoolMock) GetL1AndL2GasPrice() (uint64, uint64) {
}
// GetNonWIPPendingTxs provides a mock function with given fields: ctx
-func (_m *PoolMock) GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error) {
+func (_m *PoolMock) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go
index b79ad26c17..3ff3c9407b 100644
--- a/sequencer/sequencer.go
+++ b/sequencer/sequencer.go
@@ -209,7 +209,7 @@ func (s *Sequencer) loadFromPool(ctx context.Context) {
return
}
- poolTransactions, err := s.pool.GetNonWIPPendingTxs(ctx)
+ poolTransactions, err := s.pool.GetNonWIPPendingTxs(ctx, s.cfg.LoadPendingTxsLimit)
if err != nil && err != pool.ErrNotFound {
log.Errorf("error loading txs from pool, error: %v", err)
}