Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add forced batches tx to addrQueue #2398

Merged
merged 46 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
2001932
add forced batches tx to addrQueue
ToniRamirezM Aug 10, 2023
e732e04
fix test
ToniRamirezM Aug 10, 2023
0e35892
fix test
ToniRamirezM Aug 10, 2023
8df9763
fix test
ToniRamirezM Aug 10, 2023
1f4142d
fix test
ToniRamirezM Aug 10, 2023
8761519
fix test
ToniRamirezM Aug 10, 2023
ba82a63
fix test
ToniRamirezM Aug 10, 2023
3c672dc
fix test
ToniRamirezM Aug 10, 2023
4cfbad2
fix test
ToniRamirezM Aug 10, 2023
5c22561
fix test
ToniRamirezM Aug 10, 2023
817e0c7
refactor
ToniRamirezM Aug 10, 2023
fd4c367
fix test
ToniRamirezM Aug 10, 2023
015d78d
fix test
ToniRamirezM Aug 10, 2023
b3f7d38
fix test
ToniRamirezM Aug 10, 2023
68292b6
fix test
ToniRamirezM Aug 10, 2023
1c2241e
fix test
ToniRamirezM Aug 10, 2023
e55f677
fixes
ToniRamirezM Aug 11, 2023
9c171e3
fixes
ToniRamirezM Aug 11, 2023
5fcf980
fixes
ToniRamirezM Aug 11, 2023
d111a6a
fixes
ToniRamirezM Aug 11, 2023
b429a08
fixes
ToniRamirezM Aug 11, 2023
a819e4f
fixes
ToniRamirezM Aug 11, 2023
b6c9dc0
fixes
ToniRamirezM Aug 11, 2023
837a01d
fixes
ToniRamirezM Aug 11, 2023
2382332
fixes
ToniRamirezM Aug 11, 2023
a462d02
fixes
ToniRamirezM Aug 11, 2023
92abf44
fixes
ToniRamirezM Aug 11, 2023
e1258e1
fixe hash and from
ToniRamirezM Aug 11, 2023
f163814
fixe hash and from
ToniRamirezM Aug 11, 2023
80e6dbb
fixe hash and from
ToniRamirezM Aug 11, 2023
f1c3c83
fixe hash and from
ToniRamirezM Aug 11, 2023
7c71dc8
fix test
ToniRamirezM Aug 11, 2023
85ee60f
fix test
ToniRamirezM Aug 11, 2023
fe79112
fix test
ToniRamirezM Aug 11, 2023
9d42dae
fix test
ToniRamirezM Aug 11, 2023
6b7b4ea
fix test
ToniRamirezM Aug 11, 2023
acc9a5b
fix test
ToniRamirezM Aug 11, 2023
b86109d
fix test
ToniRamirezM Aug 11, 2023
b529831
improve tests
ToniRamirezM Aug 16, 2023
77132bd
improve tests
ToniRamirezM Aug 16, 2023
7010992
improve tests
ToniRamirezM Aug 16, 2023
2722c2b
improve tests
ToniRamirezM Aug 16, 2023
fe9cfb6
improve tests
ToniRamirezM Aug 16, 2023
2c63219
refactor
ToniRamirezM Aug 17, 2023
d634b61
refactor
ToniRamirezM Aug 17, 2023
f2d43ec
improve logs
ToniRamirezM Aug 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion sequencer/addrqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type addrQueue struct {
currentBalance *big.Int
readyTx *TxTracker
notReadyTxs map[uint64]*TxTracker
forcedTxs map[common.Hash]struct{}
pendingTxsToStore map[common.Hash]struct{}
}

Expand All @@ -30,6 +31,7 @@ func newAddrQueue(addr common.Address, nonce uint64, balance *big.Int) *addrQueu
currentBalance: balance,
readyTx: nil,
notReadyTxs: make(map[uint64]*TxTracker),
forcedTxs: make(map[common.Hash]struct{}),
pendingTxsToStore: make(map[common.Hash]struct{}),
}
}
Expand Down Expand Up @@ -78,6 +80,11 @@ func (a *addrQueue) addTx(tx *TxTracker) (newReadyTx, prevReadyTx, replacedTx *T
}
}

// addForcedTx adds a forced tx to the list of forced txs
func (a *addrQueue) addForcedTx(txHash common.Hash) {
a.forcedTxs[txHash] = struct{}{}
}

// addPendingTxToStore adds a tx to the list of pending txs to store in the DB (trusted state)
func (a *addrQueue) addPendingTxToStore(txHash common.Hash) {
a.pendingTxsToStore[txHash] = struct{}{}
Expand Down Expand Up @@ -110,7 +117,7 @@ func (a *addrQueue) ExpireTransactions(maxTime time.Duration) ([]*TxTracker, *Tx

// IsEmpty returns true if the addrQueue is empty
func (a *addrQueue) IsEmpty() bool {
return a.readyTx == nil && len(a.notReadyTxs) == 0 && len(a.pendingTxsToStore) == 0
return a.readyTx == nil && len(a.notReadyTxs) == 0 && len(a.forcedTxs) == 0 && len(a.pendingTxsToStore) == 0
}

// deleteTx deletes the tx from the addrQueue
Expand All @@ -133,6 +140,15 @@ func (a *addrQueue) deleteTx(txHash common.Hash) (deletedReadyTx *TxTracker) {
}
}

// deleteForcedTx deletes the tx from the addrQueue
func (a *addrQueue) deleteForcedTx(txHash common.Hash) {
if _, found := a.forcedTxs[txHash]; found {
delete(a.forcedTxs, txHash)
} else {
log.Warnf("tx (%s) not found in forcedTxs list", txHash.String())
}
}

// deletePendingTxToStore delete a tx from the list of pending txs to store in the DB (trusted state)
func (a *addrQueue) deletePendingTxToStore(txHash common.Hash) {
if _, found := a.pendingTxsToStore[txHash]; found {
Expand Down
84 changes: 61 additions & 23 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type finalizer struct {
}

type transactionToStore struct {
txTracker *TxTracker
hash common.Hash
from common.Address
response *state.ProcessTransactionResponse
batchResponse *state.ProcessBatchResponse
batchNumber uint64
Expand Down Expand Up @@ -207,10 +208,8 @@ func (f *finalizer) storePendingTransactions(ctx context.Context) {
// Now f.storedFlushID >= tx.flushId, we can store tx
f.storeProcessedTx(ctx, tx)

if tx.txTracker != nil {
// Delete the txTracker from the pending list in the worker (addrQueue)
f.worker.DeletePendingTxToStore(tx.txTracker.Hash, tx.txTracker.From)
}
// Delete the tx from the pending list in the worker (addrQueue)
f.worker.DeletePendingTxToStore(tx.hash, tx.from)

f.pendingTransactionsToStoreWG.Done()
case <-ctx.Done():
Expand Down Expand Up @@ -264,6 +263,32 @@ func (f *finalizer) listenForClosingSignals(ctx context.Context) {
// ForcedBatch ch
case fb := <-f.closingSignalCh.ForcedBatchCh:
log.Debugf("finalizer received forced batch at block number: %v", fb.BlockNumber)

// Add the forced batch's transactions to the worker
batchNumber, err := f.dbManager.GetLastBatchNumber(ctx)
if err != nil {
// An error accessing the database is fatal
f.halt(ctx, err)
}

// Decode the transactions inside the forced batch
forkID := f.dbManager.GetForkIDByBatchNumber(batchNumber)
txs, _, _, err := state.DecodeTxs(fb.RawTxsData, forkID)
if err != nil {
// A forced batch can contain anything so this may happen
log.Warnf("failed to decode transactions from forced batch, Err: %v", err)
} else {
// If txs could be extracted from the forced batch, add them to the worker
for _, tx := range txs {
sender, err := state.GetSender(tx)
if err != nil {
log.Warnf("failed to get sender from tx, Err: %v", err)
continue
}
f.worker.AddForcedTx(tx.Hash(), sender)
}
}

f.nextForcedBatchesMux.Lock()
f.nextForcedBatches = f.sortForcedBatches(append(f.nextForcedBatches, fb))
if f.nextForcedBatchDeadline == 0 {
Expand Down Expand Up @@ -301,18 +326,16 @@ func (f *finalizer) updateLastPendingFlushID(newFlushID uint64) {
// addPendingTxToStore adds a pending tx that is ready to be stored in the state DB once its flushid has been stored by the executor
func (f *finalizer) addPendingTxToStore(ctx context.Context, txToStore transactionToStore) {
f.pendingTransactionsToStoreWG.Add(1)
if txToStore.txTracker != nil {
f.worker.AddPendingTxToStore(txToStore.txTracker.Hash, txToStore.txTracker.From)
}

f.worker.AddPendingTxToStore(txToStore.hash, txToStore.from)

select {
case f.pendingTransactionsToStore <- txToStore:
case <-ctx.Done():
// If context is cancelled before we can send to the channel, we must decrement the WaitGroup count and
// delete the pending TxToStore added in the worker
f.pendingTransactionsToStoreWG.Done()
if txToStore.txTracker != nil {
f.worker.DeletePendingTxToStore(txToStore.txTracker.Hash, txToStore.txTracker.From)
}
f.worker.DeletePendingTxToStore(txToStore.hash, txToStore.from)
}
}

Expand Down Expand Up @@ -692,7 +715,8 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
}

txToStore := transactionToStore{
txTracker: tx,
hash: tx.Hash,
from: tx.From,
response: result.Responses[0],
batchResponse: result,
batchNumber: f.batch.batchNumber,
Expand All @@ -709,9 +733,7 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx

f.batch.countOfTxs++

if tx != nil {
f.updateWorkerAfterSuccessfulProcessing(ctx, tx, result)
}
f.updateWorkerAfterSuccessfulProcessing(ctx, tx.Hash, tx.From, false, result)

return nil, nil
}
Expand All @@ -730,8 +752,14 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat
}
}

from, err := state.GetSender(txResp.Tx)
if err != nil {
log.Errorf("handleForcedTxsProcessResp: failed to get sender: %s", err)
}

txToStore := transactionToStore{
txTracker: nil,
hash: txResp.TxHash,
from: from,
response: txResp,
batchResponse: result,
batchNumber: request.BatchNumber,
Expand All @@ -747,6 +775,10 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat
f.updateLastPendingFlushID(result.FlushID)

f.addPendingTxToStore(ctx, txToStore)

if err == nil {
f.updateWorkerAfterSuccessfulProcessing(ctx, txResp.TxHash, from, true, result)
}
}
}

Expand All @@ -765,20 +797,26 @@ func (f *finalizer) storeProcessedTx(ctx context.Context, txToStore transactionT
metrics.TxProcessed(metrics.TxProcessedLabelSuccessful, 1)
}

func (f *finalizer) updateWorkerAfterSuccessfulProcessing(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse) {
// Delete the transaction from the txSorted list
f.worker.DeleteTx(tx.Hash, tx.From)
log.Debug("tx deleted from txSorted list", "txHash", tx.Hash.String(), "from", tx.From.Hex())
func (f *finalizer) updateWorkerAfterSuccessfulProcessing(ctx context.Context, txHash common.Hash, txFrom common.Address, isForced bool, result *state.ProcessBatchResponse) {
// Delete the transaction from the worker
if isForced {
f.worker.DeleteForcedTx(txHash, txFrom)
log.Debug("forced tx deleted from worker", "txHash", txHash.String(), "from", txFrom.Hex())
return
} else {
f.worker.DeleteTx(txHash, txFrom)
log.Debug("tx deleted from worker", "txHash", txHash.String(), "from", txFrom.Hex())
}

start := time.Now()
txsToDelete := f.worker.UpdateAfterSingleSuccessfulTxExecution(tx.From, result.ReadWriteAddresses)
txsToDelete := f.worker.UpdateAfterSingleSuccessfulTxExecution(txFrom, result.ReadWriteAddresses)
for _, txToDelete := range txsToDelete {
err := f.dbManager.UpdateTxStatus(ctx, txToDelete.Hash, pool.TxStatusFailed, false, txToDelete.FailedReason)
if err != nil {
log.Errorf("failed to update status to failed in the pool for tx: %s, err: %s", txToDelete.Hash.String(), err)
} else {
metrics.TxProcessed(metrics.TxProcessedLabelFailed, 1)
continue
}
metrics.TxProcessed(metrics.TxProcessedLabelFailed, 1)
}
metrics.WorkerProcessingTime(time.Since(start))
}
Expand Down
Loading