Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid duplicate keysign for outTx already pending #1518

Merged
merged 8 commits into from
Jan 4, 2024
Next Next commit
avoid duplicate keysign for pending outTx
  • Loading branch information
ws4charlie committed Jan 3, 2024
commit 05859b473c15f254ebdb281ed363e12e3f4a2ba5
132 changes: 86 additions & 46 deletions zetaclient/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type EVMChainClient struct {
txWatchList map[ethcommon.Hash]string
Mu *sync.Mutex
db *gorm.DB
outTxPendingTransaction map[string]*ethtypes.Transaction
outTXConfirmedReceipts map[string]*ethtypes.Receipt
outTXConfirmedTransaction map[string]*ethtypes.Transaction
MinNonce int64
Expand Down Expand Up @@ -121,6 +122,7 @@ func NewEVMChainClient(
ob.zetaClient = bridge
ob.txWatchList = make(map[ethcommon.Hash]string)
ob.Tss = tss
ob.outTxPendingTransaction = make(map[string]*ethtypes.Transaction)
ob.outTXConfirmedReceipts = make(map[string]*ethtypes.Receipt)
ob.outTXConfirmedTransaction = make(map[string]*ethtypes.Transaction)
ob.OutTxChan = make(chan OutTx, 100)
Expand Down Expand Up @@ -293,15 +295,11 @@ func (ob *EVMChainClient) Stop() {
// returns: isIncluded, isConfirmed, Error
// If isConfirmed, it also post to ZetaCore
func (ob *EVMChainClient) IsSendOutTxProcessed(sendHash string, nonce uint64, cointype common.CoinType, logger zerolog.Logger) (bool, bool, error) {
ob.Mu.Lock()
params := ob.params
receipt, found1 := ob.outTXConfirmedReceipts[ob.GetTxID(nonce)]
transaction, found2 := ob.outTXConfirmedTransaction[ob.GetTxID(nonce)]
ob.Mu.Unlock()
found := found1 && found2
if !found {
if !ob.hasTxConfirmed(nonce) {
return false, false, nil
}
params := ob.GetCoreParams()
receipt, transaction := ob.GetTxNReceipt(nonce)

sendID := fmt.Sprintf("%s-%d", ob.chain.String(), nonce)
logger = logger.With().Str("sendID", sendID).Logger()
Expand Down Expand Up @@ -588,10 +586,7 @@ func (ob *EVMChainClient) observeOutTx() {
if nonceInt < lowestOutTxNonceToObserve[ob.chain.ChainId] {
continue
}
ob.Mu.Lock()
_, found := ob.outTXConfirmedReceipts[ob.GetTxID(nonceInt)]
ob.Mu.Unlock()
if found { // Go to next tracker if this one has already been confirmed
if ob.hasTxConfirmed(nonceInt) { // Go to next tracker if this one already has a confirmed tx
continue
}
for _, txHash := range tracker.HashList {
Expand All @@ -600,20 +595,13 @@ func (ob *EVMChainClient) observeOutTx() {
ob.logger.ObserveOutTx.Warn().Msgf("observeOutTx timeout on chain %d nonce %d", ob.chain.ChainId, nonceInt)
break TRACKERLOOP
default:
receipt, transaction, err := ob.queryTxByHash(txHash.TxHash, nonceInt)
err := ob.confirmTxByHash(txHash.TxHash, nonceInt)
time.Sleep(time.Duration(rpcRestTime) * time.Millisecond)
if err == nil && receipt != nil { // confirmed
ob.Mu.Lock()
ob.outTXConfirmedReceipts[ob.GetTxID(nonceInt)] = receipt
ob.outTXConfirmedTransaction[ob.GetTxID(nonceInt)] = transaction
ob.Mu.Unlock()
if err == nil { // confirmed
ob.logger.ObserveOutTx.Info().Msgf("observeOutTx confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt)

break
}
if err != nil {
ob.logger.ObserveOutTx.Debug().Err(err).Msgf("error queryTxByHash: chain %s hash %s", ob.chain.String(), txHash.TxHash)
}
ob.logger.ObserveOutTx.Debug().Err(err).Msgf("error confirmTxByHash: chain %s hash %s", ob.chain.String(), txHash.TxHash)
}
}
}
Expand All @@ -625,47 +613,99 @@ func (ob *EVMChainClient) observeOutTx() {
}
}

// return the status of txHash
// receipt nil, err non-nil: txHash not found
// receipt nil, err nil: txHash receipt recorded, but may not be confirmed
// receipt non-nil, err nil: txHash confirmed
func (ob *EVMChainClient) queryTxByHash(txHash string, nonce uint64) (*ethtypes.Receipt, *ethtypes.Transaction, error) {
logger := ob.logger.ObserveOutTx.With().Str("txHash", txHash).Uint64("nonce", nonce).Logger()
if ob.outTXConfirmedReceipts[ob.GetTxID(nonce)] != nil && ob.outTXConfirmedTransaction[ob.GetTxID(nonce)] != nil {
return nil, nil, fmt.Errorf("queryTxByHash: txHash %s receipts already recorded", txHash)
}
// SetPendingTx sets the pending transaction in memory
func (ob *EVMChainClient) SetPendingTx(nonce uint64, transaction *ethtypes.Transaction) {
ob.Mu.Lock()
ob.outTxPendingTransaction[ob.GetTxID(nonce)] = transaction
ob.Mu.Unlock()
}

// GetPendingTx gets the pending transaction from memory
func (ob *EVMChainClient) GetPendingTx(nonce uint64) *ethtypes.Transaction {
ob.Mu.Lock()
transaction := ob.outTxPendingTransaction[ob.GetTxID(nonce)]
ob.Mu.Unlock()
return transaction
}

// SetTxNReceipt sets the receipt and transaction in memory
func (ob *EVMChainClient) SetTxNReceipt(nonce uint64, receipt *ethtypes.Receipt, transaction *ethtypes.Transaction) {
ob.Mu.Lock()
delete(ob.outTxPendingTransaction, ob.GetTxID(nonce)) // remove pending transaction, if any
ob.outTXConfirmedReceipts[ob.GetTxID(nonce)] = receipt
ob.outTXConfirmedTransaction[ob.GetTxID(nonce)] = transaction
ob.Mu.Unlock()
}

// getTxNReceipt gets the receipt and transaction from memory
func (ob *EVMChainClient) GetTxNReceipt(nonce uint64) (*ethtypes.Receipt, *ethtypes.Transaction) {
ob.Mu.Lock()
receipt := ob.outTXConfirmedReceipts[ob.GetTxID(nonce)]
transaction := ob.outTXConfirmedTransaction[ob.GetTxID(nonce)]
ob.Mu.Unlock()
return receipt, transaction
}

// hasTxConfirmed returns true if there is a confirmed tx for 'nonce'
func (ob *EVMChainClient) hasTxConfirmed(nonce uint64) bool {
ob.Mu.Lock()
confirmed := ob.outTXConfirmedReceipts[ob.GetTxID(nonce)] != nil && ob.outTXConfirmedTransaction[ob.GetTxID(nonce)] != nil
ob.Mu.Unlock()
return confirmed
}

// confirmTxByHash checks if a txHash is confirmed and saves transaction and receipt in memory
// returns nil if confirmed or error otherwise
func (ob *EVMChainClient) confirmTxByHash(txHash string, nonce uint64) error {
ctxt, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

// query transaction
transaction, isPending, err := ob.evmClient.TransactionByHash(ctxt, ethcommon.HexToHash(txHash))
if err != nil {
return err
}
if transaction == nil { // should not happen
log.Error().Msgf("confirmTxByHash: transaction is nil for txHash %s nonce %d", txHash, nonce)
return fmt.Errorf("transaction is nil")
}
if isPending { // save pending transaction
ob.SetPendingTx(nonce, transaction)
return fmt.Errorf("confirmTxByHash: txHash %s nonce %d is pending", txHash, nonce)
}

// query receipt
receipt, err := ob.evmClient.TransactionReceipt(ctxt, ethcommon.HexToHash(txHash))
if err != nil {
if err != ethereum.NotFound {
logger.Warn().Err(err).Msgf("queryTxByHash: TransactionReceipt/TransactionByHash error, txHash %s nonce %d", txHash, nonce)
log.Warn().Err(err).Msgf("confirmTxByHash: TransactionReceipt error, txHash %s nonce %d", txHash, nonce)
}
return nil, nil, err
return err
}
transaction, isPending, err := ob.evmClient.TransactionByHash(ctxt, ethcommon.HexToHash(txHash))
if err != nil {
return nil, nil, err
if receipt == nil { // should not happen
log.Error().Msgf("confirmTxByHash: receipt is nil for txHash %s nonce %d", txHash, nonce)
return fmt.Errorf("receipt is nil")
}

// check nonce and confirmations
if transaction.Nonce() != nonce {
return nil, nil, fmt.Errorf("queryTxByHash: txHash %s nonce mismatch: wanted %d, got tx nonce %d", txHash, nonce, transaction.Nonce())
log.Error().Msgf("confirmTxByHash: txHash %s nonce mismatch: wanted %d, got tx nonce %d", txHash, nonce, transaction.Nonce())
return fmt.Errorf("outtx nonce mismatch")
}
confHeight := receipt.BlockNumber.Uint64() + ob.GetCoreParams().ConfirmationCount
if confHeight >= math.MaxInt64 {
return nil, nil, fmt.Errorf("queryTxByHash: confHeight is out of range")
return fmt.Errorf("confirmTxByHash: confHeight is out of range")
}

if confHeight > ob.GetLastBlockHeight() {
log.Info().Msgf("queryTxByHash: txHash %s nonce %d included but not confirmed: receipt block %d, current block %d", txHash, nonce, receipt.BlockNumber, ob.GetLastBlockHeight())
return nil, nil, fmt.Errorf("included but not confirmed")
}
// transaction must NOT be pending
if isPending {
log.Error().Msgf("queryTxByHash: confirmed but still pending: txHash %s nonce %d receipt block %d", txHash, nonce, receipt.BlockNumber)
return nil, nil, fmt.Errorf("confirmed but still pending")
log.Info().Msgf("confirmTxByHash: txHash %s nonce %d included but not confirmed: receipt block %d, current block %d",
txHash, nonce, receipt.BlockNumber, ob.GetLastBlockHeight())
return fmt.Errorf("included but not confirmed")
}
return receipt, transaction, nil

// confirmed, save receipt and transaction
ob.SetTxNReceipt(nonce, receipt, transaction)

return nil
}

// SetLastBlockHeightScanned set last block height scanned (not necessarily caught up with external block; could be slow/paused)
Expand Down
39 changes: 18 additions & 21 deletions zetaclient/evm_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (signer *EVMSigner) TryProcessOutTx(
send *types.CrossChainTx,
outTxMan *OutTxProcessorManager,
outTxID string,
evmClient ChainClient,
chainclient ChainClient,
zetaBridge ZetaCoreBridger,
height uint64,
) {
Expand All @@ -327,7 +327,6 @@ func (signer *EVMSigner) TryProcessOutTx(
myID := zetaBridge.GetKeys().GetOperatorAddress()

var to ethcommon.Address
var err error
var toChain *common.Chain
if send.CctxStatus.Status == types.CctxStatus_PendingRevert {
to = ethcommon.HexToAddress(send.InboundTxParams.Sender)
Expand All @@ -348,13 +347,15 @@ func (signer *EVMSigner) TryProcessOutTx(
logger.Info().Msgf("Transaction doesn't need to be processed status: %d", send.CctxStatus.Status)
return
}
if err != nil {
logger.Error().Err(err).Msg("ParseChain fail; skip")
evmClient, ok := chainclient.(*EVMChainClient)
if !ok {
logger.Error().Msgf("chain client is not an EVMChainClient")
return
}

// Early return if the cctx is already processed
included, confirmed, err := evmClient.IsSendOutTxProcessed(send.Index, send.GetCurrentOutTxParam().OutboundTxTssNonce, send.GetCurrentOutTxParam().CoinType, logger)
nonce := send.GetCurrentOutTxParam().OutboundTxTssNonce
included, confirmed, err := evmClient.IsSendOutTxProcessed(send.Index, nonce, send.GetCurrentOutTxParam().CoinType, logger)
if err != nil {
logger.Error().Err(err).Msg("IsSendOutTxProcessed failed")
}
Expand All @@ -381,7 +382,7 @@ func (signer *EVMSigner) TryProcessOutTx(
logger.Warn().Msgf("gasLimit %d is too high; set to %d", send.GetCurrentOutTxParam().OutboundTxGasLimit, gasLimit)
}

logger.Info().Msgf("chain %s minting %d to %s, nonce %d, finalized zeta bn %d", toChain, send.InboundTxParams.Amount, to.Hex(), send.GetCurrentOutTxParam().OutboundTxTssNonce, send.InboundTxParams.InboundTxFinalizedZetaHeight)
logger.Info().Msgf("chain %s minting %d to %s, nonce %d, finalized zeta bn %d", toChain, send.InboundTxParams.Amount, to.Hex(), nonce, send.InboundTxParams.InboundTxFinalizedZetaHeight)
sendHash, err := hex.DecodeString(send.Index[2:]) // remove the leading 0x
if err != nil || len(sendHash) != 32 {
logger.Error().Err(err).Msgf("decode CCTX %s error", send.Index)
Expand Down Expand Up @@ -412,21 +413,17 @@ func (signer *EVMSigner) TryProcessOutTx(
} else {
gasprice = specified
}
//if common.IsEthereumChain(toChain.ChainId) {
// suggested, err := signer.client.SuggestGasPrice(context.Background())
// if err != nil {
// logger.Error().Err(err).Msgf("cannot get gas price from chain %s ", toChain)
// return
// }
// gasprice = roundUpToNearestGwei(suggested)
//} else {
// specified, ok := new(big.Int).SetString(send.GetCurrentOutTxParam().OutboundTxGasPrice, 10)
// if !ok {
// logger.Error().Err(err).Msgf("cannot convert gas price %s ", send.GetCurrentOutTxParam().OutboundTxGasPrice)
// return
// }
// gasprice = specified
//}

// In case there is a pending transaction, make sure this keysign is a transaction replacement
pendingTx := evmClient.GetPendingTx(nonce)
if pendingTx != nil {
if gasprice.Cmp(pendingTx.GasPrice()) > 0 {
logger.Info().Msgf("replace pending outTx %s nonce %d using gas price %d", pendingTx.Hash().Hex(), nonce, gasprice)
} else {
logger.Info().Msgf("please wait for pending outTx %s nonce %d to be included", pendingTx.Hash().Hex(), nonce)
return
}
}

flags, err := zetaBridge.GetCrosschainFlags()
if err != nil {
Expand Down
11 changes: 0 additions & 11 deletions zetaclient/zetacore_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,17 +250,6 @@ func (co *CoreObserver) scheduleCctxEVM(
break
}

// try confirming the outtx
included, _, err := ob.IsSendOutTxProcessed(cctx.Index, params.OutboundTxTssNonce, params.CoinType, co.logger.ZetaChainWatcher)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("scheduleCctxEVM: IsSendOutTxProcessed faild for chain %d", chainID)
continue
}
if included {
co.logger.ZetaChainWatcher.Info().Msgf("scheduleCctxEVM: outtx %s already included; do not schedule keysign", outTxID)
continue
}

// #nosec G701 positive
interval := uint64(ob.GetCoreParams().OutboundTxScheduleInterval)
lookahead := ob.GetCoreParams().OutboundTxScheduleLookahead
Expand Down