Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix replaced tx as failed when duplicated nonce #2308

Merged
merged 2 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions sequencer/addrqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,48 +32,48 @@ func newAddrQueue(addr common.Address, nonce uint64, balance *big.Int) *addrQueu
}
}

// addTx adds a tx to the addrQueue and updates the ready a notReady Txs. Also if this tx matches
// with an existing tx with the same nonce, we return in the replacedTx the one with lower gasPrice.
// The replacedTx will be later set as failed in the pool
func (a *addrQueue) addTx(tx *TxTracker) (newReadyTx, prevReadyTx, discardedTx *TxTracker, dropReason error) {
var disTx *TxTracker
// addTx adds a tx to the addrQueue and updates the ready a notReady Txs. Also if the new tx matches
// an existing tx with the same nonce but the new tx has better or equal gasPrice, we will return in the replacedTx
// the existing tx with lower gasPrice (the replacedTx will be later set as failed in the pool).
// If the existing tx has better gasPrice then we will drop the new tx (dropReason = ErrDuplicatedNonce)
func (a *addrQueue) addTx(tx *TxTracker) (newReadyTx, prevReadyTx, replacedTx *TxTracker, dropReason error) {
var repTx *TxTracker

if a.currentNonce == tx.Nonce { // Is a possible readyTx
// We set the tx as readyTx if we do not have one assigned or if the gasPrice is better or equal than the current readyTx
if a.readyTx == nil || ((a.readyTx != nil) && (tx.GasPrice.Cmp(a.readyTx.GasPrice) >= 0)) {
oldReadyTx := a.readyTx
if (oldReadyTx != nil) && (oldReadyTx.HashStr != tx.HashStr) {
// if it is a different tx then we need to return the replaced tx to set as failed in the pool
disTx = oldReadyTx
repTx = oldReadyTx
}
if a.currentBalance.Cmp(tx.Cost) >= 0 { //
if a.currentBalance.Cmp(tx.Cost) >= 0 {
a.readyTx = tx
return tx, oldReadyTx, disTx, nil
return tx, oldReadyTx, repTx, nil
} else { // If there is not enough balance we set the new tx as notReadyTxs
a.readyTx = nil
a.notReadyTxs[tx.Nonce] = tx
return nil, oldReadyTx, disTx, nil
return nil, oldReadyTx, repTx, nil
}
} else { // We have an already readytx with the same nonce and better gas price, we discard the new tx
disTx = tx
return nil, nil, disTx, nil
return nil, nil, nil, ErrDuplicatedNonce
}
} else if a.currentNonce > tx.Nonce {
return nil, nil, nil, runtime.ErrIntrinsicInvalidNonce
}

nrTx, found := a.notReadyTxs[tx.Nonce]
if !found || ((found) && (tx.GasPrice.Cmp(nrTx.GasPrice) >= 0)) {
a.notReadyTxs[tx.Nonce] = tx
if (found) && (nrTx.HashStr != tx.HashStr) {
// if it is a different tx then we need to return the replaced tx to set as failed in the pool
disTx = nrTx
repTx = nrTx
}
a.notReadyTxs[tx.Nonce] = tx
return nil, nil, repTx, nil
} else {
disTx = tx
// We have an already notReadytx with the same nonce and better gas price, we discard the new tx
return nil, nil, nil, ErrDuplicatedNonce
}

return nil, nil, disTx, nil
}

// ExpireTransactions removes the txs that have been in the queue for more than maxTime
Expand Down
46 changes: 29 additions & 17 deletions sequencer/addrqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ type notReadyTx struct {
}

type addrQueueAddTxTestCase struct {
name string
hash common.Hash
nonce uint64
gasPrice *big.Int
cost *big.Int
expectedReadyTx common.Hash
expectedNotReadyTx []notReadyTx
expectedDiscardedTx common.Hash
name string
hash common.Hash
nonce uint64
gasPrice *big.Int
cost *big.Int
expectedReadyTx common.Hash
expectedNotReadyTx []notReadyTx
expectedReplacedTx common.Hash
err error
}

var addr addrQueue
Expand All @@ -36,7 +37,7 @@ func processAddTxTestCases(t *testing.T, testCases []addrQueueAddTxTestCase) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tx := newTestTxTracker(tc.hash, tc.nonce, tc.gasPrice, tc.cost)
newReadyTx, _, replacedTx, _ := addr.addTx(tx)
newReadyTx, _, replacedTx, err := addr.addTx(tx)
if tc.expectedReadyTx.String() == emptyHash.String() {
if !(addr.readyTx == nil) {
t.Fatalf("Error readyTx. Expected=nil, Actual=%s", addr.readyTx.HashStr)
Expand All @@ -60,17 +61,27 @@ func processAddTxTestCases(t *testing.T, testCases []addrQueueAddTxTestCase) {
}
}

if tc.expectedDiscardedTx.String() == emptyHash.String() {
if tc.expectedReplacedTx.String() == emptyHash.String() {
if !(replacedTx == nil) {
t.Fatalf("Error discardedTx. Expected=%s, Actual=%s", tc.expectedDiscardedTx, replacedTx.HashStr)
t.Fatalf("Error replacedTx. Expected=%s, Actual=%s", tc.expectedReplacedTx, replacedTx.HashStr)
}
} else {
if (replacedTx == nil) || ((replacedTx != nil) && !(replacedTx.Hash == tc.expectedDiscardedTx)) {
if (replacedTx == nil) || ((replacedTx != nil) && !(replacedTx.Hash == tc.expectedReplacedTx)) {
replacedTxStr := "nil"
if replacedTx != nil {
replacedTxStr = replacedTx.HashStr
}
t.Fatalf("Error discardedTx. Expected=%s, Actual=%s", tc.expectedDiscardedTx, replacedTxStr)
t.Fatalf("Error replacedTx. Expected=%s, Actual=%s", tc.expectedReplacedTx, replacedTxStr)
}
}

if tc.err == nil {
if err != nil {
t.Fatalf("Error returned error. Expected=nil, Actual=%s", err)
}
} else {
if tc.err != err {
t.Fatalf("Error returned error. Expected=%s, Actual=%s", tc.err, err)
}
}
})
Expand Down Expand Up @@ -101,7 +112,7 @@ func TestAddrQueue(t *testing.T) {
expectedNotReadyTx: []notReadyTx{
{nonce: 2, hash: common.Hash{0x2}},
},
expectedDiscardedTx: common.Hash{0x11},
expectedReplacedTx: common.Hash{0x11},
},
{
name: "Replace readyTx for the same tx 0x1 with best gasPrice", hash: common.Hash{0x1}, nonce: 1, gasPrice: new(big.Int).SetInt64(8), cost: new(big.Int).SetInt64(5),
Expand All @@ -125,7 +136,7 @@ func TestAddrQueue(t *testing.T) {
{nonce: 2, hash: common.Hash{0x2}},
{nonce: 4, hash: common.Hash{0x44}},
},
expectedDiscardedTx: common.Hash{0x4},
expectedReplacedTx: common.Hash{0x4},
},
{
name: "Replace tx with nonce 4 for the same tx 0x44 with best GasPrice", hash: common.Hash{0x44}, nonce: 4, gasPrice: new(big.Int).SetInt64(4), cost: new(big.Int).SetInt64(5),
Expand All @@ -134,7 +145,7 @@ func TestAddrQueue(t *testing.T) {
{nonce: 2, hash: common.Hash{0x2}},
{nonce: 4, hash: common.Hash{0x44}},
},
expectedDiscardedTx: common.Hash{},
expectedReplacedTx: common.Hash{},
},
{
name: "Add tx 0x22 with nonce 2 with lower GasPrice than 0x2", hash: common.Hash{0x22}, nonce: 2, gasPrice: new(big.Int).SetInt64(1), cost: new(big.Int).SetInt64(5),
Expand All @@ -143,7 +154,8 @@ func TestAddrQueue(t *testing.T) {
{nonce: 2, hash: common.Hash{0x2}},
{nonce: 4, hash: common.Hash{0x44}},
},
expectedDiscardedTx: common.Hash{0x22},
expectedReplacedTx: common.Hash{},
err: ErrDuplicatedNonce,
},
}

Expand Down
15 changes: 7 additions & 8 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,20 @@ func (d *dbManager) addTxToWorker(tx pool.Transaction) error {
if err != nil {
return err
}
replacedTx, dropReason, isWIP := d.worker.AddTxTracker(d.ctx, txTracker)
replacedTx, dropReason := d.worker.AddTxTracker(d.ctx, txTracker)
if dropReason != nil {
failedReason := dropReason.Error()
return d.txPool.UpdateTxStatus(d.ctx, txTracker.Hash, pool.TxStatusFailed, false, &failedReason)
} else {
if isWIP {
return d.txPool.UpdateTxWIPStatus(d.ctx, tx.Hash(), true)
}
if replacedTx != nil {
failedReason := "duplicated nonce"
return d.txPool.UpdateTxStatus(d.ctx, txTracker.Hash, pool.TxStatusFailed, false, &failedReason)
failedReason := ErrReplacedTransaction.Error()
error := d.txPool.UpdateTxStatus(d.ctx, replacedTx.Hash, pool.TxStatusFailed, false, &failedReason)
if error != nil {
log.Warnf("error when setting as failed replacedTx(%s)", replacedTx.HashStr)
}
}
return d.txPool.UpdateTxWIPStatus(d.ctx, tx.Hash(), true)
}

return nil
}

// BeginStateTransaction starts a db transaction in the state
Expand Down
5 changes: 5 additions & 0 deletions sequencer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,9 @@ var (
ErrEffectiveGasPriceReprocess = errors.New("effective gas price requires reprocessing the transaction")
// ErrZeroL1GasPrice is returned if the L1 gas price is 0.
ErrZeroL1GasPrice = errors.New("L1 gas price 0")
// ErrDuplicatedNonce is returned when adding a new tx to the worker and there is an existing tx
// with the same nonce and higher gasPrice (in this case we keep the existing tx)
ErrDuplicatedNonce = errors.New("duplicated nonce")
// ErrReplacedTransaction is returned when an existing tx is replaced by a new tx with the same nonce and higher gasPrice
ErrReplacedTransaction = errors.New("replaced transaction")
)
2 changes: 1 addition & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type workerInterface interface {
GetBestFittingTx(resources state.BatchResources) *TxTracker
UpdateAfterSingleSuccessfulTxExecution(from common.Address, touchedAddresses map[common.Address]*state.InfoReadWrite) []*TxTracker
UpdateTx(txHash common.Hash, from common.Address, ZKCounters state.ZKCounters)
AddTxTracker(ctx context.Context, txTracker *TxTracker) (replacedTx *TxTracker, dropReason error, isWIP bool)
AddTxTracker(ctx context.Context, txTracker *TxTracker) (replacedTx *TxTracker, dropReason error)
MoveTxToNotReady(txHash common.Hash, from common.Address, actualNonce *uint64, actualBalance *big.Int) []*TxTracker
DeleteTx(txHash common.Hash, from common.Address)
HandleL2Reorg(txHashes []common.Hash)
Expand Down
13 changes: 3 additions & 10 deletions sequencer/mock_worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions sequencer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (w *Worker) NewTxTracker(tx types.Transaction, counters state.ZKCounters, i
}

// AddTxTracker adds a new Tx to the Worker
func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *TxTracker, dropReason error, isWIP bool) {
func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *TxTracker, dropReason error) {
w.workerMutex.Lock()
defer w.workerMutex.Unlock()

Expand All @@ -59,19 +59,19 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T
if err != nil {
dropReason = fmt.Errorf("AddTx GetLastStateRoot error: %v", err)
log.Error(dropReason)
return nil, dropReason, false
return nil, dropReason
}
nonce, err := w.state.GetNonceByStateRoot(ctx, tx.From, root)
if err != nil {
dropReason = fmt.Errorf("AddTx GetNonceByStateRoot error: %v", err)
log.Error(dropReason)
return nil, dropReason, false
return nil, dropReason
}
balance, err := w.state.GetBalanceByStateRoot(ctx, tx.From, root)
if err != nil {
dropReason = fmt.Errorf("AddTx GetBalanceByStateRoot error: %v", err)
log.Error(dropReason)
return nil, dropReason, false
return nil, dropReason
}

addr = newAddrQueue(tx.From, nonce.Uint64(), balance)
Expand All @@ -88,8 +88,8 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T
var newReadyTx, prevReadyTx, repTx *TxTracker
newReadyTx, prevReadyTx, repTx, dropReason = addr.addTx(tx)
if dropReason != nil {
log.Infof("AddTx tx(%s) dropped from addrQueue(%s)", tx.HashStr, tx.FromStr)
return repTx, dropReason, false
log.Infof("AddTx tx(%s) dropped from addrQueue(%s), reason: %s", tx.HashStr, tx.FromStr, dropReason.Error())
return repTx, dropReason
}

// Update the EfficiencyList (if needed)
Expand All @@ -106,7 +106,7 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T
log.Infof("AddTx replacedTx(%s) nonce(%d) cost(%s) has been replaced", repTx.HashStr, repTx.Nonce, repTx.Cost.String())
}

return repTx, nil, true
return repTx, nil
}

func (w *Worker) applyAddressUpdate(from common.Address, fromNonce *uint64, fromBalance *big.Int) (*TxTracker, *TxTracker, []*TxTracker) {
Expand Down
2 changes: 1 addition & 1 deletion sequencer/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func processWorkerAddTxTestCases(t *testing.T, worker *Worker, testCases []worke
tx.updateZKCounters(testCase.counters, worker.batchConstraints, worker.batchResourceWeights)
t.Logf("%s=%s", testCase.name, fmt.Sprintf("%.2f", tx.Efficiency))

_, err, _ := worker.AddTxTracker(ctx, &tx)
_, err := worker.AddTxTracker(ctx, &tx)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion synchronizer/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.