Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kan/v0.11 access older results #131

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions access/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ func TransactionResultToMessage(result *TransactionResult) *access.TransactionRe
}
}

func MessageToTransactionResult(message *access.TransactionResultResponse) *TransactionResult {

return &TransactionResult{
Status: flow.TransactionStatus(message.Status),
StatusCode: uint(message.StatusCode),
ErrorMessage: message.ErrorMessage,
Events: convert.MessagesToEvents(message.Events),
}
}

// NetworkParameters contains the network-wide parameters for the Flow blockchain.
type NetworkParameters struct {
ChainID flow.ChainID
Expand Down
22 changes: 22 additions & 0 deletions cmd/access/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func main() {
rpcEng *rpc.Engine
collectionRPC access.AccessAPIClient
executionRPC execution.ExecutionAPIClient
historicalAccessRPCs []access.AccessAPIClient
err error
conCache *buffer.PendingBlocks // pending block cache for follower
transactionTimings *stdmap.TransactionTimings
Expand All @@ -77,6 +78,7 @@ func main() {
flags.StringVarP(&rpcConf.HTTPListenAddr, "http-addr", "h", "localhost:8000", "the address the http proxy server listens on")
flags.StringVarP(&rpcConf.CollectionAddr, "static-collection-ingress-addr", "", "", "the address (of the collection node) to send transactions to")
flags.StringVarP(&rpcConf.ExecutionAddr, "script-addr", "s", "localhost:9000", "the address (of the execution node) forward the script to")
flags.StringVarP(&rpcConf.HistoricalAccessAddrs, "historical-access-addr", "", "", "comma separated rpc addresses for historical access nodes")
flags.BoolVar(&logTxTimeToFinalized, "log-tx-time-to-finalized", false, "log transaction time to finalized")
flags.BoolVar(&logTxTimeToExecuted, "log-tx-time-to-executed", false, "log transaction time to executed")
flags.BoolVar(&logTxTimeToFinalizedExecuted, "log-tx-time-to-finalized-executed", false, "log transaction time to finalized and executed")
Expand Down Expand Up @@ -113,6 +115,25 @@ func main() {
executionRPC = execution.NewExecutionAPIClient(executionRPCConn)
return nil
}).
Module("historical access node clients", func(node *cmd.FlowNodeBuilder) error {
addrs := strings.Split(rpcConf.HistoricalAccessAddrs, ",")
for _, addr := range addrs {
if strings.TrimSpace(addr) == "" {
continue
}
node.Logger.Info().Err(err).Msgf("Historical access node Addr: %s", addr)

historicalAccessRPCConn, err := grpc.Dial(
addr,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcutils.DefaultMaxMsgSize)),
grpc.WithInsecure())
if err != nil {
return err
}
historicalAccessRPCs = append(historicalAccessRPCs, access.NewAccessAPIClient(historicalAccessRPCConn))
}
return nil
}).
Module("block cache", func(node *cmd.FlowNodeBuilder) error {
conCache = buffer.NewPendingBlocks()
return nil
Expand Down Expand Up @@ -156,6 +177,7 @@ func main() {
rpcConf,
executionRPC,
collectionRPC,
historicalAccessRPCs,
node.Storage.Blocks,
node.Storage.Headers,
node.Storage.Collections,
Expand Down
4 changes: 3 additions & 1 deletion engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (suite *Suite) RunTest(
suite.state,
suite.execClient,
suite.collClient,
nil,
blocks,
headers,
collections,
Expand Down Expand Up @@ -280,6 +281,7 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() {
nil, // setting collectionRPC to nil to choose a random collection node for each send tx request
nil,
nil,
nil,
collections,
transactions,
suite.chainID,
Expand Down Expand Up @@ -444,7 +446,7 @@ func (suite *Suite) TestGetSealedTransaction() {
blocksToMarkExecuted, err := stdmap.NewTimes(100)
require.NoError(suite.T(), err)

rpcEng := rpc.New(suite.log, suite.state, rpc.Config{}, nil, nil, blocks, headers, collections, transactions,
rpcEng := rpc.New(suite.log, suite.state, rpc.Config{}, nil, nil, nil, blocks, headers, collections, transactions,
suite.chainID, metrics, 0, false)

// create the ingest engine
Expand Down
2 changes: 1 addition & 1 deletion engine/access/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (suite *Suite) SetupTest() {
blocksToMarkExecuted, err := stdmap.NewTimes(100)
require.NoError(suite.T(), err)

rpcEng := rpc.New(log, suite.proto.state, rpc.Config{}, nil, nil, suite.blocks, suite.headers, suite.collections,
rpcEng := rpc.New(log, suite.proto.state, rpc.Config{}, nil, nil, nil, suite.blocks, suite.headers, suite.collections,
suite.transactions, flow.Testnet, metrics.NewNoopCollector(), 0, false)

eng, err := New(log, net, suite.proto.state, suite.me, suite.request, suite.blocks, suite.headers, suite.collections,
Expand Down
9 changes: 9 additions & 0 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func New(
state protocol.State,
executionRPC execproto.ExecutionAPIClient,
collectionRPC accessproto.AccessAPIClient,
historicalAccessNodes []accessproto.AccessAPIClient,
blocks storage.Blocks,
headers storage.Headers,
collections storage.Collections,
Expand Down Expand Up @@ -84,6 +85,7 @@ func New(
retry: retry,
collectionGRPCPort: collectionGRPCPort,
connFactory: connFactory,
previousAccessNodes: historicalAccessNodes,
},
backendEvents: backendEvents{
executionRPC: executionRPC,
Expand Down Expand Up @@ -164,6 +166,13 @@ func (b *Backend) GetNetworkParameters(_ context.Context) access.NetworkParamete
}

func convertStorageError(err error) error {
if err == nil {
return nil
}
if status.Code(err) == codes.NotFound {
// Already converted
return err
}
if errors.Is(err, storage.ErrNotFound) {
return status.Errorf(codes.NotFound, "not found: %v", err)
}
Expand Down
44 changes: 24 additions & 20 deletions engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ type Suite struct {
snapshot *protocol.Snapshot
log zerolog.Logger

blocks *storagemock.Blocks
headers *storagemock.Headers
collections *storagemock.Collections
transactions *storagemock.Transactions
colClient *access.AccessAPIClient
execClient *access.ExecutionAPIClient
chainID flow.ChainID
blocks *storagemock.Blocks
headers *storagemock.Headers
collections *storagemock.Collections
transactions *storagemock.Transactions
colClient *access.AccessAPIClient
execClient *access.ExecutionAPIClient
historicalAccessClient *access.AccessAPIClient
chainID flow.ChainID
}

func TestHandler(t *testing.T) {
Expand All @@ -59,6 +60,7 @@ func (suite *Suite) SetupTest() {
suite.colClient = new(access.AccessAPIClient)
suite.execClient = new(access.ExecutionAPIClient)
suite.chainID = flow.Testnet
suite.historicalAccessClient = new(access.AccessAPIClient)
}

func (suite *Suite) TestPing() {
Expand All @@ -74,7 +76,7 @@ func (suite *Suite) TestPing() {
suite.state,
suite.execClient,
suite.colClient,
nil, nil, nil, nil,
nil, nil, nil, nil, nil,
suite.chainID,
metrics.NewNoopCollector(),
0,
Expand All @@ -96,7 +98,7 @@ func (suite *Suite) TestGetLatestFinalizedBlockHeader() {
backend := New(
suite.state,
suite.execClient,
nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil,
suite.chainID,
metrics.NewNoopCollector(),
0,
Expand All @@ -123,7 +125,7 @@ func (suite *Suite) TestGetLatestSealedBlockHeader() {

backend := New(
suite.state,
nil, nil, nil,
nil, nil, nil, nil,
suite.headers, nil, nil,
suite.chainID,
metrics.NewNoopCollector(),
Expand Down Expand Up @@ -155,7 +157,7 @@ func (suite *Suite) TestGetTransaction() {

backend := New(
suite.state,
nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil,
suite.transactions,
suite.chainID,
metrics.NewNoopCollector(),
Expand All @@ -182,7 +184,7 @@ func (suite *Suite) TestGetCollection() {

backend := New(
suite.state,
nil, nil, nil, nil,
nil, nil, nil, nil, nil,
suite.collections,
suite.transactions,
suite.chainID,
Expand Down Expand Up @@ -249,6 +251,7 @@ func (suite *Suite) TestTransactionStatusTransition() {
suite.state,
suite.execClient,
nil,
nil,
suite.blocks,
suite.headers,
suite.collections,
Expand Down Expand Up @@ -350,6 +353,7 @@ func (suite *Suite) TestTransactionExpiredStatusTransition() {
suite.state,
suite.execClient,
nil,
nil,
suite.blocks,
suite.headers,
suite.collections,
Expand Down Expand Up @@ -396,7 +400,7 @@ func (suite *Suite) TestGetLatestFinalizedBlock() {

backend := New(
suite.state,
nil, nil,
nil, nil, nil,
suite.blocks,
nil, nil, nil,
suite.chainID,
Expand Down Expand Up @@ -480,7 +484,7 @@ func (suite *Suite) TestGetEventsForBlockIDs() {
backend := New(
suite.state,
suite.execClient,
nil,
nil, nil,
suite.blocks,
nil, nil, nil,
suite.chainID,
Expand Down Expand Up @@ -573,7 +577,7 @@ func (suite *Suite) TestGetEventsForHeightRange() {
suite.Run("invalid request max height < min height", func() {
backend := New(
suite.state,
nil, nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil,
suite.chainID,
metrics.NewNoopCollector(),
0,
Expand All @@ -600,7 +604,7 @@ func (suite *Suite) TestGetEventsForHeightRange() {
backend := New(
suite.state,
suite.execClient,
nil,
nil, nil,
suite.blocks,
suite.headers,
nil, nil,
Expand Down Expand Up @@ -629,7 +633,7 @@ func (suite *Suite) TestGetEventsForHeightRange() {
backend := New(
suite.state,
suite.execClient,
nil,
nil, nil,
suite.blocks,
suite.headers,
nil, nil,
Expand Down Expand Up @@ -691,7 +695,7 @@ func (suite *Suite) TestGetAccount() {
backend := New(
suite.state,
suite.execClient,
nil, nil,
nil, nil, nil,
suite.headers,
nil, nil,
suite.chainID,
Expand Down Expand Up @@ -750,7 +754,7 @@ func (suite *Suite) TestGetAccountAtBlockHeight() {
backend := New(
suite.state,
suite.execClient,
nil, nil,
nil, nil, nil,
suite.headers,
nil, nil,
flow.Testnet,
Expand All @@ -774,7 +778,7 @@ func (suite *Suite) TestGetNetworkParameters() {
expectedChainID := flow.Mainnet

backend := New(
nil, nil, nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil, nil,
flow.Mainnet,
metrics.NewNoopCollector(),
0,
Expand Down
61 changes: 56 additions & 5 deletions engine/access/rpc/backend/backend_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type backendTransactions struct {
retry *Retry
collectionGRPCPort uint
connFactory ConnectionFactory

previousAccessNodes []accessproto.AccessAPIClient
}

// SendTransaction forwards the transaction to the collection node
Expand Down Expand Up @@ -173,11 +175,16 @@ func (b *backendTransactions) SendRawTransaction(
return b.trySendTransaction(ctx, tx)
}

func (b *backendTransactions) GetTransaction(_ context.Context, txID flow.Identifier) (*flow.TransactionBody, error) {
func (b *backendTransactions) GetTransaction(ctx context.Context, txID flow.Identifier) (*flow.TransactionBody, error) {
// look up transaction from storage
tx, err := b.transactions.ByID(txID)
if err != nil {
return nil, convertStorageError(err)
txErr := convertStorageError(err)
if txErr != nil {
if status.Code(txErr) == codes.NotFound {
return b.getHistoricalTransaction(ctx, txID)
}
// Other Error trying to retrieve the transaction, return with err
return nil, txErr
}

return tx, nil
Expand All @@ -189,8 +196,13 @@ func (b *backendTransactions) GetTransactionResult(
) (*access.TransactionResult, error) {
// look up transaction from storage
tx, err := b.transactions.ByID(txID)
if err != nil {
return nil, convertStorageError(err)
txErr := convertStorageError(err)
if txErr != nil {
if status.Code(txErr) == codes.NotFound {
// Tx not found. If we have historical Sporks setup, lets look through those as well
return b.getHistoricalTransactionResult(ctx, txID)
}
return nil, txErr
}

// get events for the transaction
Expand Down Expand Up @@ -317,6 +329,45 @@ func (b *backendTransactions) lookupTransactionResult(
return true, events, txStatus, message, nil
}

func (b *backendTransactions) getHistoricalTransaction(
ctx context.Context,
txID flow.Identifier,
) (*flow.TransactionBody, error) {
for _, historicalNode := range b.previousAccessNodes {
txResp, err := historicalNode.GetTransaction(ctx, &accessproto.GetTransactionRequest{Id: txID[:]})
if err == nil {
tx, err := convert.MessageToTransaction(txResp.Transaction, b.chainID.Chain())
// Found on a historical node. Report
return &tx, err
}
// Otherwise, if not found, just continue
if status.Code(err) == codes.NotFound {
continue
}
// TODO should we do something if the error isn't not found?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log that the error was not "NotFound"

}
return nil, status.Errorf(codes.NotFound, "no known transaction with ID %s", txID)
}

func (b *backendTransactions) getHistoricalTransactionResult(
ctx context.Context,
txID flow.Identifier,
) (*access.TransactionResult, error) {
for _, historicalNode := range b.previousAccessNodes {
result, err := historicalNode.GetTransactionResult(ctx, &accessproto.GetTransactionRequest{Id: txID[:]})
if err == nil {
// Found on a historical node. Report
return access.MessageToTransactionResult(result), nil
}
// Otherwise, if not found, just continue
if status.Code(err) == codes.NotFound {
continue
}
// TODO should we do something if the error isn't not found?
}
return nil, status.Errorf(codes.NotFound, "no known transaction with ID %s", txID)
}

func (b *backendTransactions) registerTransactionForRetry(tx *flow.TransactionBody) {
referenceBlock, err := b.state.AtBlockID(tx.ReferenceBlockID).Head()
if err != nil {
Expand Down
Loading