diff --git a/sequencer/addrqueue.go b/sequencer/addrqueue.go index e6acf90b03..684a060aed 100644 --- a/sequencer/addrqueue.go +++ b/sequencer/addrqueue.go @@ -32,11 +32,12 @@ func newAddrQueue(addr common.Address, nonce uint64, balance *big.Int) *addrQueu } } -// addTx adds a tx to the addrQueue and updates the ready a notReady Txs. Also if this tx matches -// with an existing tx with the same nonce, we return in the replacedTx the one with lower gasPrice. -// The replacedTx will be later set as failed in the pool -func (a *addrQueue) addTx(tx *TxTracker) (newReadyTx, prevReadyTx, discardedTx *TxTracker, dropReason error) { - var disTx *TxTracker +// addTx adds a tx to the addrQueue and updates the ready a notReady Txs. Also if the new tx matches +// an existing tx with the same nonce but the new tx has better or equal gasPrice, we will return in the replacedTx +// the existing tx with lower gasPrice (the replacedTx will be later set as failed in the pool). +// If the existing tx has better gasPrice then we will drop the new tx (dropReason = ErrDuplicatedNonce) +func (a *addrQueue) addTx(tx *TxTracker) (newReadyTx, prevReadyTx, replacedTx *TxTracker, dropReason error) { + var repTx *TxTracker if a.currentNonce == tx.Nonce { // Is a possible readyTx // We set the tx as readyTx if we do not have one assigned or if the gasPrice is better or equal than the current readyTx @@ -44,19 +45,18 @@ func (a *addrQueue) addTx(tx *TxTracker) (newReadyTx, prevReadyTx, discardedTx * oldReadyTx := a.readyTx if (oldReadyTx != nil) && (oldReadyTx.HashStr != tx.HashStr) { // if it is a different tx then we need to return the replaced tx to set as failed in the pool - disTx = oldReadyTx + repTx = oldReadyTx } - if a.currentBalance.Cmp(tx.Cost) >= 0 { // + if a.currentBalance.Cmp(tx.Cost) >= 0 { a.readyTx = tx - return tx, oldReadyTx, disTx, nil + return tx, oldReadyTx, repTx, nil } else { // If there is not enough balance we set the new tx as notReadyTxs a.readyTx = nil a.notReadyTxs[tx.Nonce] = tx - return nil, oldReadyTx, disTx, nil + return nil, oldReadyTx, repTx, nil } } else { // We have an already readytx with the same nonce and better gas price, we discard the new tx - disTx = tx - return nil, nil, disTx, nil + return nil, nil, nil, ErrDuplicatedNonce } } else if a.currentNonce > tx.Nonce { return nil, nil, nil, runtime.ErrIntrinsicInvalidNonce @@ -64,16 +64,16 @@ func (a *addrQueue) addTx(tx *TxTracker) (newReadyTx, prevReadyTx, discardedTx * nrTx, found := a.notReadyTxs[tx.Nonce] if !found || ((found) && (tx.GasPrice.Cmp(nrTx.GasPrice) >= 0)) { + a.notReadyTxs[tx.Nonce] = tx if (found) && (nrTx.HashStr != tx.HashStr) { // if it is a different tx then we need to return the replaced tx to set as failed in the pool - disTx = nrTx + repTx = nrTx } - a.notReadyTxs[tx.Nonce] = tx + return nil, nil, repTx, nil } else { - disTx = tx + // We have an already notReadytx with the same nonce and better gas price, we discard the new tx + return nil, nil, nil, ErrDuplicatedNonce } - - return nil, nil, disTx, nil } // ExpireTransactions removes the txs that have been in the queue for more than maxTime diff --git a/sequencer/addrqueue_test.go b/sequencer/addrqueue_test.go index 9f0a7405b1..d39ce5a356 100644 --- a/sequencer/addrqueue_test.go +++ b/sequencer/addrqueue_test.go @@ -13,14 +13,15 @@ type notReadyTx struct { } type addrQueueAddTxTestCase struct { - name string - hash common.Hash - nonce uint64 - gasPrice *big.Int - cost *big.Int - expectedReadyTx common.Hash - expectedNotReadyTx []notReadyTx - expectedDiscardedTx common.Hash + name string + hash common.Hash + nonce uint64 + gasPrice *big.Int + cost *big.Int + expectedReadyTx common.Hash + expectedNotReadyTx []notReadyTx + expectedReplacedTx common.Hash + err error } var addr addrQueue @@ -36,7 +37,7 @@ func processAddTxTestCases(t *testing.T, testCases []addrQueueAddTxTestCase) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { tx := newTestTxTracker(tc.hash, tc.nonce, tc.gasPrice, tc.cost) - newReadyTx, _, replacedTx, _ := addr.addTx(tx) + newReadyTx, _, replacedTx, err := addr.addTx(tx) if tc.expectedReadyTx.String() == emptyHash.String() { if !(addr.readyTx == nil) { t.Fatalf("Error readyTx. Expected=nil, Actual=%s", addr.readyTx.HashStr) @@ -60,17 +61,27 @@ func processAddTxTestCases(t *testing.T, testCases []addrQueueAddTxTestCase) { } } - if tc.expectedDiscardedTx.String() == emptyHash.String() { + if tc.expectedReplacedTx.String() == emptyHash.String() { if !(replacedTx == nil) { - t.Fatalf("Error discardedTx. Expected=%s, Actual=%s", tc.expectedDiscardedTx, replacedTx.HashStr) + t.Fatalf("Error replacedTx. Expected=%s, Actual=%s", tc.expectedReplacedTx, replacedTx.HashStr) } } else { - if (replacedTx == nil) || ((replacedTx != nil) && !(replacedTx.Hash == tc.expectedDiscardedTx)) { + if (replacedTx == nil) || ((replacedTx != nil) && !(replacedTx.Hash == tc.expectedReplacedTx)) { replacedTxStr := "nil" if replacedTx != nil { replacedTxStr = replacedTx.HashStr } - t.Fatalf("Error discardedTx. Expected=%s, Actual=%s", tc.expectedDiscardedTx, replacedTxStr) + t.Fatalf("Error replacedTx. Expected=%s, Actual=%s", tc.expectedReplacedTx, replacedTxStr) + } + } + + if tc.err == nil { + if err != nil { + t.Fatalf("Error returned error. Expected=nil, Actual=%s", err) + } + } else { + if tc.err != err { + t.Fatalf("Error returned error. Expected=%s, Actual=%s", tc.err, err) } } }) @@ -101,7 +112,7 @@ func TestAddrQueue(t *testing.T) { expectedNotReadyTx: []notReadyTx{ {nonce: 2, hash: common.Hash{0x2}}, }, - expectedDiscardedTx: common.Hash{0x11}, + expectedReplacedTx: common.Hash{0x11}, }, { name: "Replace readyTx for the same tx 0x1 with best gasPrice", hash: common.Hash{0x1}, nonce: 1, gasPrice: new(big.Int).SetInt64(8), cost: new(big.Int).SetInt64(5), @@ -125,7 +136,7 @@ func TestAddrQueue(t *testing.T) { {nonce: 2, hash: common.Hash{0x2}}, {nonce: 4, hash: common.Hash{0x44}}, }, - expectedDiscardedTx: common.Hash{0x4}, + expectedReplacedTx: common.Hash{0x4}, }, { name: "Replace tx with nonce 4 for the same tx 0x44 with best GasPrice", hash: common.Hash{0x44}, nonce: 4, gasPrice: new(big.Int).SetInt64(4), cost: new(big.Int).SetInt64(5), @@ -134,7 +145,7 @@ func TestAddrQueue(t *testing.T) { {nonce: 2, hash: common.Hash{0x2}}, {nonce: 4, hash: common.Hash{0x44}}, }, - expectedDiscardedTx: common.Hash{}, + expectedReplacedTx: common.Hash{}, }, { name: "Add tx 0x22 with nonce 2 with lower GasPrice than 0x2", hash: common.Hash{0x22}, nonce: 2, gasPrice: new(big.Int).SetInt64(1), cost: new(big.Int).SetInt64(5), @@ -143,7 +154,8 @@ func TestAddrQueue(t *testing.T) { {nonce: 2, hash: common.Hash{0x2}}, {nonce: 4, hash: common.Hash{0x44}}, }, - expectedDiscardedTx: common.Hash{0x22}, + expectedReplacedTx: common.Hash{}, + err: ErrDuplicatedNonce, }, } diff --git a/sequencer/dbmanager.go b/sequencer/dbmanager.go index d12baca844..b47de338ef 100644 --- a/sequencer/dbmanager.go +++ b/sequencer/dbmanager.go @@ -139,21 +139,20 @@ func (d *dbManager) addTxToWorker(tx pool.Transaction) error { if err != nil { return err } - replacedTx, dropReason, isWIP := d.worker.AddTxTracker(d.ctx, txTracker) + replacedTx, dropReason := d.worker.AddTxTracker(d.ctx, txTracker) if dropReason != nil { failedReason := dropReason.Error() return d.txPool.UpdateTxStatus(d.ctx, txTracker.Hash, pool.TxStatusFailed, false, &failedReason) } else { - if isWIP { - return d.txPool.UpdateTxWIPStatus(d.ctx, tx.Hash(), true) - } if replacedTx != nil { - failedReason := "duplicated nonce" - return d.txPool.UpdateTxStatus(d.ctx, txTracker.Hash, pool.TxStatusFailed, false, &failedReason) + failedReason := ErrReplacedTransaction.Error() + error := d.txPool.UpdateTxStatus(d.ctx, replacedTx.Hash, pool.TxStatusFailed, false, &failedReason) + if error != nil { + log.Warnf("error when setting as failed replacedTx(%s)", replacedTx.HashStr) + } } + return d.txPool.UpdateTxWIPStatus(d.ctx, tx.Hash(), true) } - - return nil } // BeginStateTransaction starts a db transaction in the state diff --git a/sequencer/errors.go b/sequencer/errors.go index 0dd8547276..afb5efec44 100644 --- a/sequencer/errors.go +++ b/sequencer/errors.go @@ -11,4 +11,9 @@ var ( ErrEffectiveGasPriceReprocess = errors.New("effective gas price requires reprocessing the transaction") // ErrZeroL1GasPrice is returned if the L1 gas price is 0. ErrZeroL1GasPrice = errors.New("L1 gas price 0") + // ErrDuplicatedNonce is returned when adding a new tx to the worker and there is an existing tx + // with the same nonce and higher gasPrice (in this case we keep the existing tx) + ErrDuplicatedNonce = errors.New("duplicated nonce") + // ErrReplacedTransaction is returned when an existing tx is replaced by a new tx with the same nonce and higher gasPrice + ErrReplacedTransaction = errors.New("replaced transaction") ) diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index f5a2bc62de..49ea5a512a 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -87,7 +87,7 @@ type workerInterface interface { GetBestFittingTx(resources state.BatchResources) *TxTracker UpdateAfterSingleSuccessfulTxExecution(from common.Address, touchedAddresses map[common.Address]*state.InfoReadWrite) []*TxTracker UpdateTx(txHash common.Hash, from common.Address, ZKCounters state.ZKCounters) - AddTxTracker(ctx context.Context, txTracker *TxTracker) (replacedTx *TxTracker, dropReason error, isWIP bool) + AddTxTracker(ctx context.Context, txTracker *TxTracker) (replacedTx *TxTracker, dropReason error) MoveTxToNotReady(txHash common.Hash, from common.Address, actualNonce *uint64, actualBalance *big.Int) []*TxTracker DeleteTx(txHash common.Hash, from common.Address) HandleL2Reorg(txHashes []common.Hash) diff --git a/sequencer/mock_worker.go b/sequencer/mock_worker.go index 785addc7ce..1262c09023 100644 --- a/sequencer/mock_worker.go +++ b/sequencer/mock_worker.go @@ -21,13 +21,12 @@ type WorkerMock struct { } // AddTxTracker provides a mock function with given fields: ctx, txTracker -func (_m *WorkerMock) AddTxTracker(ctx context.Context, txTracker *TxTracker) (*TxTracker, error, bool) { +func (_m *WorkerMock) AddTxTracker(ctx context.Context, txTracker *TxTracker) (*TxTracker, error) { ret := _m.Called(ctx, txTracker) var r0 *TxTracker var r1 error - var r2 bool - if rf, ok := ret.Get(0).(func(context.Context, *TxTracker) (*TxTracker, error, bool)); ok { + if rf, ok := ret.Get(0).(func(context.Context, *TxTracker) (*TxTracker, error)); ok { return rf(ctx, txTracker) } if rf, ok := ret.Get(0).(func(context.Context, *TxTracker) *TxTracker); ok { @@ -44,13 +43,7 @@ func (_m *WorkerMock) AddTxTracker(ctx context.Context, txTracker *TxTracker) (* r1 = ret.Error(1) } - if rf, ok := ret.Get(2).(func(context.Context, *TxTracker) bool); ok { - r2 = rf(ctx, txTracker) - } else { - r2 = ret.Get(2).(bool) - } - - return r0, r1, r2 + return r0, r1 } // DeleteTx provides a mock function with given fields: txHash, from diff --git a/sequencer/worker.go b/sequencer/worker.go index fefecf99d6..cba6938f7c 100644 --- a/sequencer/worker.go +++ b/sequencer/worker.go @@ -45,7 +45,7 @@ func (w *Worker) NewTxTracker(tx types.Transaction, counters state.ZKCounters, i } // AddTxTracker adds a new Tx to the Worker -func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *TxTracker, dropReason error, isWIP bool) { +func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *TxTracker, dropReason error) { w.workerMutex.Lock() defer w.workerMutex.Unlock() @@ -59,19 +59,19 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T if err != nil { dropReason = fmt.Errorf("AddTx GetLastStateRoot error: %v", err) log.Error(dropReason) - return nil, dropReason, false + return nil, dropReason } nonce, err := w.state.GetNonceByStateRoot(ctx, tx.From, root) if err != nil { dropReason = fmt.Errorf("AddTx GetNonceByStateRoot error: %v", err) log.Error(dropReason) - return nil, dropReason, false + return nil, dropReason } balance, err := w.state.GetBalanceByStateRoot(ctx, tx.From, root) if err != nil { dropReason = fmt.Errorf("AddTx GetBalanceByStateRoot error: %v", err) log.Error(dropReason) - return nil, dropReason, false + return nil, dropReason } addr = newAddrQueue(tx.From, nonce.Uint64(), balance) @@ -88,8 +88,8 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T var newReadyTx, prevReadyTx, repTx *TxTracker newReadyTx, prevReadyTx, repTx, dropReason = addr.addTx(tx) if dropReason != nil { - log.Infof("AddTx tx(%s) dropped from addrQueue(%s)", tx.HashStr, tx.FromStr) - return repTx, dropReason, false + log.Infof("AddTx tx(%s) dropped from addrQueue(%s), reason: %s", tx.HashStr, tx.FromStr, dropReason.Error()) + return repTx, dropReason } // Update the EfficiencyList (if needed) @@ -106,7 +106,7 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T log.Infof("AddTx replacedTx(%s) nonce(%d) cost(%s) has been replaced", repTx.HashStr, repTx.Nonce, repTx.Cost.String()) } - return repTx, nil, true + return repTx, nil } func (w *Worker) applyAddressUpdate(from common.Address, fromNonce *uint64, fromBalance *big.Int) (*TxTracker, *TxTracker, []*TxTracker) { diff --git a/sequencer/worker_test.go b/sequencer/worker_test.go index 38083fd803..51787f188c 100644 --- a/sequencer/worker_test.go +++ b/sequencer/worker_test.go @@ -61,7 +61,7 @@ func processWorkerAddTxTestCases(t *testing.T, worker *Worker, testCases []worke tx.updateZKCounters(testCase.counters, worker.batchConstraints, worker.batchResourceWeights) t.Logf("%s=%s", testCase.name, fmt.Sprintf("%.2f", tx.Efficiency)) - _, err, _ := worker.AddTxTracker(ctx, &tx) + _, err := worker.AddTxTracker(ctx, &tx) if err != nil { return } diff --git a/synchronizer/mock_state.go b/synchronizer/mock_state.go index 12ec0a0388..a90c4227c2 100644 --- a/synchronizer/mock_state.go +++ b/synchronizer/mock_state.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.28.1. DO NOT EDIT. +// Code generated by mockery v2.22.1. DO NOT EDIT. package synchronizer