Skip to content

Commit

Permalink
feat(rpc): filter unconfirmed txs by tx hash (#1053)
Browse files Browse the repository at this point in the history
* chore: add TxHash field, WIP

* chore: update mockery

* feat(rpc): add new request type

* chore(rpc): unconfirmed tx route added

* test(rpc): test unconfirmed txs

* chore: improvements

* chore: self-review

* chore: unconfirmed_tx is required
  • Loading branch information
lklimek authored Mar 3, 2025
1 parent cc9e15b commit 8bacade
Show file tree
Hide file tree
Showing 19 changed files with 335 additions and 18 deletions.
1 change: 1 addition & 0 deletions internal/consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (emptyMempool) Update(
) error {
return nil
}
func (emptyMempool) GetTxByHash(_ types.TxKey) types.Tx { return nil }
func (emptyMempool) Flush() {}
func (emptyMempool) FlushAppConn(_ctx context.Context) error { return nil }
func (emptyMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) }
Expand Down
11 changes: 11 additions & 0 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,17 @@ func (txmp *TxMempool) CheckTx(
return txmp.addNewTransaction(wtx, rsp)
}

// GetTxByHash returns transaction by key from the mempool. If the transaction
// does not exist, it returns nil.
func (txmp *TxMempool) GetTxByHash(txHash types.TxKey) types.Tx {
if elt, ok := txmp.txByKey[txHash]; ok {
w := elt.Value.(*WrappedTx)
return w.tx
}

return nil
}

// RemoveTxByKey removes the transaction with the specified key from the
// mempool. It reports an error if no such transaction exists. This operation
// does not remove the transaction from the cache.
Expand Down
48 changes: 48 additions & 0 deletions internal/mempool/mocks/mempool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions internal/mempool/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Mempool interface {
// from the mempool.
RemoveTxByKey(txKey types.TxKey) error

// GetTxByHash returns a transaction from the mempool, if it exists, or nil.
GetTxByHash(txHash types.TxKey) types.Tx

// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes
// bytes total with the condition that the total gasWanted must be less than
// maxGas.
Expand Down
1 change: 1 addition & 0 deletions internal/rpc/core/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Available endpoints:
/status
/health
/unconfirmed_txs
/unconfirmed_tx
/unsafe_flush_mempool
/validators
Expand Down
19 changes: 17 additions & 2 deletions internal/rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"time"

abci "github.com/dashpay/tenderdash/abci/types"
"github.com/dashpay/tenderdash/crypto"
"github.com/dashpay/tenderdash/internal/mempool"
"github.com/dashpay/tenderdash/internal/state/indexer"
tmmath "github.com/dashpay/tenderdash/libs/math"
"github.com/dashpay/tenderdash/rpc/coretypes"
"github.com/dashpay/tenderdash/types"
)

//-----------------------------------------------------------------------------
Expand Down Expand Up @@ -171,7 +173,7 @@ func (env *Environment) BroadcastTxCommit(ctx context.Context, req *coretypes.Re
}
}

// UnconfirmedTxs gets unconfirmed transactions from the mempool in order of priority
// UnconfirmedTxs gets unconfirmed transactions from the mempool in order of priority.
// More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs
func (env *Environment) UnconfirmedTxs(_ctx context.Context, req *coretypes.RequestUnconfirmedTxs) (*coretypes.ResultUnconfirmedTxs, error) {
totalCount := env.Mempool.Size()
Expand All @@ -182,7 +184,6 @@ func (env *Environment) UnconfirmedTxs(_ctx context.Context, req *coretypes.Requ
}

skipCount := validateSkipCount(page, perPage)

txs := env.Mempool.ReapMaxTxs(skipCount + tmmath.MinInt(perPage, totalCount-skipCount))
result := txs[skipCount:]

Expand All @@ -194,6 +195,20 @@ func (env *Environment) UnconfirmedTxs(_ctx context.Context, req *coretypes.Requ
}, nil
}

// return single unconfirmed transaction, matching req.TxHash
func (env *Environment) UnconfirmedTx(_ctx context.Context, req *coretypes.RequestUnconfirmedTx) (*coretypes.ResultUnconfirmedTx, error) {
if req == nil || req.TxHash.IsZero() || len(req.TxHash) != crypto.HashSize {
return nil, fmt.Errorf("you must provide a valid %d-byte transaction hash in hash= argument", crypto.HashSize)
}

tx := env.Mempool.GetTxByHash(types.TxKey(req.TxHash))
if tx == nil {
return nil, fmt.Errorf("transaction %X not found", req.TxHash)
}

return &coretypes.ResultUnconfirmedTx{Tx: tx}, nil
}

// NumUnconfirmedTxs gets number of unconfirmed transactions.
// More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs
func (env *Environment) NumUnconfirmedTxs(_ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) {
Expand Down
2 changes: 2 additions & 0 deletions internal/rpc/core/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewRoutesMap(svc RPCService, opts *RouteOptions) RoutesMap {
"consensus_state": rpc.NewRPCFunc(svc.GetConsensusState),
"consensus_params": rpc.NewRPCFunc(svc.ConsensusParams),
"unconfirmed_txs": rpc.NewRPCFunc(svc.UnconfirmedTxs),
"unconfirmed_tx": rpc.NewRPCFunc(svc.UnconfirmedTx),
"num_unconfirmed_txs": rpc.NewRPCFunc(svc.NumUnconfirmedTxs),

// tx broadcast API
Expand Down Expand Up @@ -113,6 +114,7 @@ type RPCService interface {
Tx(ctx context.Context, req *coretypes.RequestTx) (*coretypes.ResultTx, error)
TxSearch(ctx context.Context, req *coretypes.RequestTxSearch) (*coretypes.ResultTxSearch, error)
UnconfirmedTxs(ctx context.Context, req *coretypes.RequestUnconfirmedTxs) (*coretypes.ResultUnconfirmedTxs, error)
UnconfirmedTx(ctx context.Context, req *coretypes.RequestUnconfirmedTx) (*coretypes.ResultUnconfirmedTx, error)
Unsubscribe(ctx context.Context, req *coretypes.RequestUnsubscribe) (*coretypes.ResultUnsubscribe, error)
UnsubscribeAll(ctx context.Context) (*coretypes.ResultUnsubscribe, error)
Validators(ctx context.Context, req *coretypes.RequestValidators) (*coretypes.ResultValidators, error)
Expand Down
4 changes: 4 additions & 0 deletions light/proxy/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func (p proxyService) UnconfirmedTxs(ctx context.Context, req *coretypes.Request
return p.Client.UnconfirmedTxs(ctx, req.Page.IntPtr(), req.PerPage.IntPtr())
}

func (p proxyService) UnconfirmedTx(ctx context.Context, req *coretypes.RequestUnconfirmedTx) (*coretypes.ResultUnconfirmedTx, error) {
return p.Client.UnconfirmedTx(ctx, req.TxHash)
}

func (p proxyService) Unsubscribe(ctx context.Context, req *coretypes.RequestUnsubscribe) (*coretypes.ResultUnsubscribe, error) {
return p.Client.UnsubscribeWS(ctx, req.Query)
}
Expand Down
4 changes: 4 additions & 0 deletions light/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ func (c *Client) UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coret
return c.next.UnconfirmedTxs(ctx, page, perPage)
}

func (c *Client) UnconfirmedTx(ctx context.Context, txHash []byte) (*coretypes.ResultUnconfirmedTx, error) {
return c.next.UnconfirmedTx(ctx, txHash)
}

func (c *Client) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) {
return c.next.NumUnconfirmedTxs(ctx)
}
Expand Down
23 changes: 17 additions & 6 deletions rpc/client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"net/http"
"time"

"github.com/dashpay/tenderdash/libs/bytes"
tmbytes "github.com/dashpay/tenderdash/libs/bytes"
rpcclient "github.com/dashpay/tenderdash/rpc/client"
"github.com/dashpay/tenderdash/rpc/coretypes"
jsonrpcclient "github.com/dashpay/tenderdash/rpc/jsonrpc/client"
Expand Down Expand Up @@ -208,11 +208,11 @@ func (c *baseRPCClient) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo
return result, nil
}

func (c *baseRPCClient) ABCIQuery(ctx context.Context, path string, data bytes.HexBytes) (*coretypes.ResultABCIQuery, error) {
func (c *baseRPCClient) ABCIQuery(ctx context.Context, path string, data tmbytes.HexBytes) (*coretypes.ResultABCIQuery, error) {
return c.ABCIQueryWithOptions(ctx, path, data, rpcclient.DefaultABCIQueryOptions)
}

func (c *baseRPCClient) ABCIQueryWithOptions(ctx context.Context, path string, data bytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*coretypes.ResultABCIQuery, error) {
func (c *baseRPCClient) ABCIQueryWithOptions(ctx context.Context, path string, data tmbytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*coretypes.ResultABCIQuery, error) {
result := new(coretypes.ResultABCIQuery)
if err := c.caller.Call(ctx, "abci_query", &coretypes.RequestABCIQuery{
Path: path,
Expand Down Expand Up @@ -267,6 +267,17 @@ func (c *baseRPCClient) UnconfirmedTxs(ctx context.Context, page *int, perPage *
return result, nil
}

func (c *baseRPCClient) UnconfirmedTx(ctx context.Context, txHash []byte) (*coretypes.ResultUnconfirmedTx, error) {
result := new(coretypes.ResultUnconfirmedTx)

if err := c.caller.Call(ctx, "unconfirmed_tx", &coretypes.RequestUnconfirmedTx{
TxHash: tmbytes.HexBytes(txHash),
}, result); err != nil {
return nil, err
}
return result, nil
}

func (c *baseRPCClient) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) {
result := new(coretypes.ResultUnconfirmedTxs)
if err := c.caller.Call(ctx, "num_unconfirmed_txs", nil, result); err != nil {
Expand Down Expand Up @@ -379,7 +390,7 @@ func (c *baseRPCClient) Block(ctx context.Context, height *int64) (*coretypes.Re
return result, nil
}

func (c *baseRPCClient) BlockByHash(ctx context.Context, hash bytes.HexBytes) (*coretypes.ResultBlock, error) {
func (c *baseRPCClient) BlockByHash(ctx context.Context, hash tmbytes.HexBytes) (*coretypes.ResultBlock, error) {
result := new(coretypes.ResultBlock)
if err := c.caller.Call(ctx, "block_by_hash", &coretypes.RequestBlockByHash{Hash: hash}, result); err != nil {
return nil, err
Expand Down Expand Up @@ -407,7 +418,7 @@ func (c *baseRPCClient) Header(ctx context.Context, height *int64) (*coretypes.R
return result, nil
}

func (c *baseRPCClient) HeaderByHash(ctx context.Context, hash bytes.HexBytes) (*coretypes.ResultHeader, error) {
func (c *baseRPCClient) HeaderByHash(ctx context.Context, hash tmbytes.HexBytes) (*coretypes.ResultHeader, error) {
result := new(coretypes.ResultHeader)
if err := c.caller.Call(ctx, "header_by_hash", &coretypes.RequestBlockByHash{
Hash: hash,
Expand All @@ -433,7 +444,7 @@ func (c *baseRPCClient) Commit(ctx context.Context, height *int64) (*coretypes.R
return result, nil
}

func (c *baseRPCClient) Tx(ctx context.Context, hash bytes.HexBytes, prove bool) (*coretypes.ResultTx, error) {
func (c *baseRPCClient) Tx(ctx context.Context, hash tmbytes.HexBytes, prove bool) (*coretypes.ResultTx, error) {
result := new(coretypes.ResultTx)
if err := c.caller.Call(ctx, "tx", &coretypes.RequestTx{Hash: hash, Prove: prove}, result); err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions rpc/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type SubscriptionClient interface {
// MempoolClient shows us data about current mempool state.
type MempoolClient interface {
UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error)
UnconfirmedTx(ctx context.Context, txHash []byte) (*coretypes.ResultUnconfirmedTx, error)
NumUnconfirmedTxs(context.Context) (*coretypes.ResultUnconfirmedTxs, error)
CheckTx(context.Context, types.Tx) (*coretypes.ResultCheckTx, error)
RemoveTx(context.Context, types.TxKey) error
Expand Down
3 changes: 3 additions & 0 deletions rpc/client/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func (c *Local) UnconfirmedTxs(ctx context.Context, page, perPage *int) (*corety
PerPage: coretypes.Int64Ptr(perPage),
})
}
func (c *Local) UnconfirmedTx(ctx context.Context, txHash []byte) (*coretypes.ResultUnconfirmedTx, error) {
return c.env.UnconfirmedTx(ctx, &coretypes.RequestUnconfirmedTx{TxHash: txHash})
}

func (c *Local) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) {
return c.env.NumUnconfirmedTxs(ctx)
Expand Down
59 changes: 59 additions & 0 deletions rpc/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 8bacade

Please sign in to comment.