Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/runtime/client: Add the GetUnconfirmedTransactions method #5589

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changelog/5589.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/runtime/client: Add the GetUnconfirmedTransactions method

Similarly to GetUnconfirmedTransactions in the consensus API, this
new method returns the currently pending runtime transactions from
the runtime transaction pool.
4 changes: 4 additions & 0 deletions go/runtime/client/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type RuntimeClient interface {
// its results (outputs and emitted events).
GetTransactionsWithResults(ctx context.Context, request *GetTransactionsRequest) ([]*TransactionWithResults, error)

// GetUnconfirmedTransactions fetches all unconfirmed runtime transactions
// that are currently pending to be included in a block.
GetUnconfirmedTransactions(ctx context.Context, runtimeID common.Namespace) ([][]byte, error)

// GetEvents returns all events emitted in a given block.
GetEvents(ctx context.Context, request *GetEventsRequest) ([]*Event, error)

Expand Down
37 changes: 37 additions & 0 deletions go/runtime/client/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
methodGetTransactions = serviceName.NewMethod("GetTransactions", GetTransactionsRequest{})
// methodGetTransactionsWithResults is the GetTransactionsWithResults method.
methodGetTransactionsWithResults = serviceName.NewMethod("GetTransactionsWithResults", GetTransactionsRequest{})
// methodGetUnconfirmedTransactions is the GetUnconfirmedTransactions method.
methodGetUnconfirmedTransactions = serviceName.NewMethod("GetUnconfirmedTransactions", common.Namespace{})
// methodGetEvents is the GetEvents method.
methodGetEvents = serviceName.NewMethod("GetEvents", GetEventsRequest{})
// methodQuery is the Query method.
Expand Down Expand Up @@ -86,6 +88,10 @@
MethodName: methodGetTransactionsWithResults.ShortName(),
Handler: handlerGetTransactionsWithResults,
},
{
MethodName: methodGetUnconfirmedTransactions.ShortName(),
Handler: handlerGetUnconfirmedTransactions,
},
{
MethodName: methodGetEvents.ShortName(),
Handler: handlerGetEvents,
Expand Down Expand Up @@ -345,6 +351,29 @@
return interceptor(ctx, &rq, info, handler)
}

func handlerGetUnconfirmedTransactions(
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var runtimeID common.Namespace
if err := dec(&runtimeID); err != nil {
return nil, err

Check warning on line 362 in go/runtime/client/api/grpc.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/client/api/grpc.go#L362

Added line #L362 was not covered by tests
}
if interceptor == nil {
return srv.(RuntimeClient).GetUnconfirmedTransactions(ctx, runtimeID)

Check warning on line 365 in go/runtime/client/api/grpc.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/client/api/grpc.go#L365

Added line #L365 was not covered by tests
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodGetUnconfirmedTransactions.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RuntimeClient).GetUnconfirmedTransactions(ctx, req.(common.Namespace))
}
return interceptor(ctx, runtimeID, info, handler)
}

func handlerGetEvents(
srv interface{},
ctx context.Context,
Expand Down Expand Up @@ -495,6 +524,14 @@
return rsp, nil
}

func (c *runtimeClient) GetUnconfirmedTransactions(ctx context.Context, runtimeID common.Namespace) ([][]byte, error) {
var rsp [][]byte
if err := c.conn.Invoke(ctx, methodGetUnconfirmedTransactions.FullName(), runtimeID, &rsp); err != nil {
return nil, err

Check warning on line 530 in go/runtime/client/api/grpc.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/client/api/grpc.go#L530

Added line #L530 was not covered by tests
}
return rsp, nil
}

func (c *runtimeClient) GetEvents(ctx context.Context, request *GetEventsRequest) ([]*Event, error) {
var rsp []*Event
if err := c.conn.Invoke(ctx, methodGetEvents.FullName(), request, &rsp); err != nil {
Expand Down
7 changes: 6 additions & 1 deletion go/runtime/client/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func testSubmitTransaction(
c api.RuntimeClient,
input string,
) {
testInput := []byte(input)
// Submit a test transaction.
testInput := []byte(input)
resp, err := c.SubmitTxMeta(ctx, &api.SubmitTxRequest{Data: testInput, RuntimeID: runtimeID})

// Check if everything is in order.
Expand Down Expand Up @@ -229,6 +229,11 @@ func testQuery(
Data: []byte("test checktx request"),
})
require.NoError(t, err, "CheckTx")

// Get the number of unconfirmed transactions.
utxs, err := c.GetUnconfirmedTransactions(ctx, runtimeID)
require.NoError(t, err, "GetUnconfirmedTransactions")
require.True(t, len(utxs) == 0)
}

func testSubmitTransactionNoWait(
Expand Down
10 changes: 7 additions & 3 deletions go/runtime/txpool/local_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func (lq *localQueue) HandleTxsUsed(hashes []hash.Hash) {
lq.txs = keptTxs
}

func (lq *localQueue) PeekAll() []*TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
return append(make([]*TxQueueMeta, 0, len(lq.txs)), lq.txs...)
}

func (lq *localQueue) TakeAll() []*TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
Expand All @@ -87,9 +93,7 @@ func (lq *localQueue) OfferChecked(tx *TxQueueMeta, _ *protocol.CheckTxMetadata)
}

func (lq *localQueue) GetTxsToPublish() []*TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
return append([]*TxQueueMeta(nil), lq.txs...)
return lq.PeekAll()
}

func (lq *localQueue) size() int {
Expand Down
22 changes: 13 additions & 9 deletions go/runtime/txpool/main_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@

func (mq *mainQueue) GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta {
txMetas := mq.inner.getPrioritizedBatch(nil, countHint)
var txs []*TxQueueMeta
txs := make([]*TxQueueMeta, 0, len(txMetas))
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta) //nolint:gosec
}
Expand All @@ -107,17 +107,26 @@

func (mq *mainQueue) GetSchedulingExtra(offset *hash.Hash, limit uint32) []*TxQueueMeta {
txMetas := mq.inner.getPrioritizedBatch(offset, limit)
var txs []*TxQueueMeta
txs := make([]*TxQueueMeta, 0, len(txMetas))

Check warning on line 110 in go/runtime/txpool/main_queue.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/txpool/main_queue.go#L110

Added line #L110 was not covered by tests
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta) //nolint:gosec
}
return txs
}

func (mq *mainQueue) PeekAll() []*TxQueueMeta {
allTxs := mq.inner.getAll()
txs := make([]*TxQueueMeta, 0, len(allTxs))
for _, tx := range allTxs {
txs = append(txs, &tx.TxQueueMeta) //nolint:gosec
}
return txs
}

func (mq *mainQueue) TakeAll() []*TxQueueMeta {
txMetas := mq.inner.getAll()
mq.inner.clear()
var txs []*TxQueueMeta
txs := make([]*TxQueueMeta, 0, len(txMetas))
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta) //nolint:gosec
}
Expand All @@ -132,10 +141,5 @@
}

func (mq *mainQueue) GetTxsToPublish() []*TxQueueMeta {
txMetas := mq.inner.getAll()
var txs []*TxQueueMeta
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta) //nolint:gosec
}
return txs
return mq.PeekAll()
}
2 changes: 2 additions & 0 deletions go/runtime/txpool/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type UsableTransactionSource interface {
// HandleTxsUsed is a callback to indicate that the scheduler is done with a set of txs, by hash. For most
// implementations, remove it from internal storage.
HandleTxsUsed(hashes []hash.Hash)
// PeekAll returns all transactions without removing them.
PeekAll() []*TxQueueMeta
}

// RecheckableTransactionStore provides methods for rechecking.
Expand Down
11 changes: 11 additions & 0 deletions go/runtime/txpool/rim_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@
// The roothash module manages the incoming message queue on its own, so we don't do anything here.
}

func (rq *rimQueue) PeekAll() []*TxQueueMeta {
rq.l.RLock()
defer rq.l.RUnlock()

txs := make([]*TxQueueMeta, 0, len(rq.txs))
for _, tx := range rq.txs {
txs = append(txs, tx)

Check warning on line 45 in go/runtime/txpool/rim_queue.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/txpool/rim_queue.go#L45

Added line #L45 was not covered by tests
}
return txs
}

// Load loads transactions from roothash incoming messages.
func (rq *rimQueue) Load(inMsgs []*message.IncomingMessage) {
newTxs := map[hash.Hash]*TxQueueMeta{}
Expand Down
14 changes: 14 additions & 0 deletions go/runtime/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type TransactionPool interface {

// PendingCheckSize returns the number of transactions currently pending to be checked.
PendingCheckSize() int

// GetTxs returns all transactions currently queued in the transaction pool.
GetTxs() []*TxQueueMeta
}

// RuntimeHostProvisioner is a runtime host provisioner.
Expand Down Expand Up @@ -425,6 +428,17 @@ func (t *txPool) PendingCheckSize() int {
return t.checkTxQueue.size()
}

func (t *txPool) GetTxs() []*TxQueueMeta {
t.drainLock.Lock()
defer t.drainLock.Unlock()

var txs []*TxQueueMeta
for _, q := range t.usableSources {
txs = append(txs, q.PeekAll()...)
}
return txs
}

func (t *txPool) getCurrentBlockInfo() (*runtime.BlockInfo, time.Time, error) {
t.blockInfoLock.Lock()
defer t.blockInfoLock.Unlock()
Expand Down
18 changes: 18 additions & 0 deletions go/worker/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,24 @@
return results, nil
}

// Implements api.RuntimeClient.
func (s *service) GetUnconfirmedTransactions(_ context.Context, runtimeID common.Namespace) ([][]byte, error) {
rt := s.w.commonWorker.GetRuntime(runtimeID)
if rt == nil {
return nil, api.ErrNotFound

Check warning on line 245 in go/worker/client/service.go

View check run for this annotation

Codecov / codecov/patch

go/worker/client/service.go#L245

Added line #L245 was not covered by tests
}

// Get currently pending transactions from the runtime's transaction pool.
pendingTxs := rt.TxPool.GetTxs()

// Copy the raw transactions to the output slice.
out := make([][]byte, 0, len(pendingTxs))
for _, tx := range pendingTxs {
out = append(out, tx.Raw())

Check warning on line 254 in go/worker/client/service.go

View check run for this annotation

Codecov / codecov/patch

go/worker/client/service.go#L254

Added line #L254 was not covered by tests
}
return out, nil
}

// Implements api.RuntimeClient.
func (s *service) GetEvents(ctx context.Context, request *api.GetEventsRequest) ([]*api.Event, error) {
rt, err := s.w.commonWorker.RuntimeRegistry.GetRuntime(request.RuntimeID)
Expand Down
Loading