Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort Flow EVM events received from Event Streaming API by their TransactionIndex & EventIndex fields in ascending order #622

Merged
merged 2 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package models

import (
"fmt"
"strings"
"sort"

"github.com/onflow/cadence"
"github.com/onflow/flow-go-sdk"
Expand All @@ -12,20 +12,25 @@ import (
errs "github.com/onflow/flow-evm-gateway/models/errors"
)

const (
BlockExecutedQualifiedIdentifier = string(events.EventTypeBlockExecuted)
TransactionExecutedQualifiedIdentifier = string(events.EventTypeTransactionExecuted)
)

// isBlockExecutedEvent checks whether the given event contains block executed data.
func isBlockExecutedEvent(event cadence.Event) bool {
if event.EventType == nil {
return false
}
return strings.Contains(event.EventType.ID(), string(events.EventTypeBlockExecuted))
return event.EventType.QualifiedIdentifier == BlockExecutedQualifiedIdentifier
}

// isTransactionExecutedEvent checks whether the given event contains transaction executed data.
func isTransactionExecutedEvent(event cadence.Event) bool {
if event.EventType == nil {
return false
}
return strings.Contains(event.EventType.ID(), string(events.EventTypeTransactionExecuted))
return event.EventType.QualifiedIdentifier == TransactionExecutedQualifiedIdentifier
}

// CadenceEvents contains Flow emitted events containing one or zero evm block executed event,
Expand All @@ -39,6 +44,15 @@ type CadenceEvents struct {

// NewCadenceEvents decodes the events into evm types.
func NewCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Event Streaming API returns a different order?

Can we mention what was the default order?

And have we thought about changing the order in the EventStreaming API?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. So the default order seems to be ascending, for the most part. I am not sure if the Event Streaming API does offer such a guarantee, on the received order of the events.

However, regardless of whatever fixes we chose to do on the Event Streaming API, there are blocks that have EVM-related events out-of-order, see: #622 (comment).

And some other cases:

FlowTxIndex: 0, FlowEventIndex: 15
[EVM]:  A.e467b9dd11fa00df.EVM.TransactionExecuted(hash: [...], index: 0, ...)
FlowTxIndex: 0, FlowEventIndex: 16
[EVM]:  A.e467b9dd11fa00df.EVM.TransactionExecuted(hash: [...], index: 1, ...)
FlowTxIndex: 1, FlowEventIndex: 15
[EVM]:  A.e467b9dd11fa00df.EVM.TransactionExecuted(hash: [...], index: 0, ...)

This case though seems to be a race, as the index field of EVM.TransactionExecuted is not strictly increasing in the scope of an EVM block.

In order to incorporate the offchain package from flow-go, and replay EVM blocks & transactions to build a local state index, it is a prerequisite that we supply the blocks & transactions in the correct order, otherwise there will be state mismatches.

So the change in this PR, is more of a safety check, to ensure that we can mitigate such cases, no matter how the 3rd party tools, such as Event Streaming API, behave.

// first we sort all the events in the block, by their TransactionIndex,
// and then we also sort events in the same transaction, by their EventIndex.
sort.Slice(events.Events, func(i, j int) bool {
if events.Events[i].TransactionIndex != events.Events[j].TransactionIndex {
return events.Events[i].TransactionIndex < events.Events[j].TransactionIndex
}
return events.Events[i].EventIndex < events.Events[j].EventIndex
})

e, err := decodeCadenceEvents(events)
if err != nil {
return nil, err
Expand Down
97 changes: 91 additions & 6 deletions models/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestCadenceEvents_Block(t *testing.T) {

// generate txs
for i := 0; i < txCount; i++ {
tx, _, txEvent, err := newTransaction(uint64(i))
tx, _, txEvent, err := newTransaction(uint64(i), uint16(i))
require.NoError(t, err)
hashes[i] = tx.Hash()
events = append(events, txEvent)
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestCadenceEvents_Block(t *testing.T) {
})

t.Run("block with more transaction hashes", func(t *testing.T) {
tx, _, _, err := newTransaction(1)
tx, _, _, err := newTransaction(1, 0)
require.NoError(t, err)

// generate single block
Expand All @@ -153,6 +153,78 @@ func TestCadenceEvents_Block(t *testing.T) {
"block 1 references missing transaction/s",
)
})

t.Run("EVM events are ordered by Flow TransactionIndex & EventIndex", func(t *testing.T) {
txCount := 3
blockEvents := flow.BlockEvents{
BlockID: flow.Identifier{0x1},
Height: 1,
}

// tx1 and tx2 are EVM transactions executed on a single Flow transaction.
tx1, _, txEvent1, err := newTransaction(0, 0)
require.NoError(t, err)
txEvent1.TransactionIndex = 0
txEvent1.EventIndex = 2

tx2, _, txEvent2, err := newTransaction(1, 1)
require.NoError(t, err)
txEvent2.TransactionIndex = 0
txEvent2.EventIndex = 5

// tx3 is a Flow transaction with a single EVM transaction on EventIndex=1
tx3, _, txEvent3, err := newTransaction(2, 0)
require.NoError(t, err)
txEvent3.TransactionIndex = 2
txEvent3.EventIndex = 1

// needed for computing the `TransactionHashRoot` field on
// EVM.BlockExecuted event payload. the order is sensitive.
hashes = []gethCommon.Hash{
tx1.Hash(),
tx2.Hash(),
tx3.Hash(),
}

// add the tx events in a shuffled order
blockEvents.Events = []flow.Event{
txEvent3,
txEvent1,
txEvent2,
}

// generate single block
_, blockEvent, err := newBlock(1, hashes)
require.NoError(t, err)
blockEvent.TransactionIndex = 4
blockEvent.EventIndex = 0
blockEvents.Events = append(blockEvents.Events, blockEvent)

// parse the EventStreaming API response
cdcEvents, err := NewCadenceEvents(blockEvents)
require.NoError(t, err)

// assert that Flow events are sorted by their TransactionIndex and EventIndex fields
assert.Equal(
t,
[]flow.Event{
txEvent1,
txEvent2,
txEvent3,
blockEvent,
},
cdcEvents.events.Events,
)

// assert that EVM transactions & receipts are sorted by their
// TransactionIndex field
for i := 0; i < txCount; i++ {
tx := cdcEvents.transactions[i]
receipt := cdcEvents.receipts[i]
assert.Equal(t, tx.Hash(), receipt.TxHash)
assert.Equal(t, uint(i), receipt.TransactionIndex)
}
})
}

func Test_EventDecoding(t *testing.T) {
Expand All @@ -171,7 +243,7 @@ func Test_EventDecoding(t *testing.T) {
// generate txs
for i := 0; i < txCount; i++ {
var err error
txs[i], results[i], txEvents[i], err = newTransaction(uint64(i))
txs[i], results[i], txEvents[i], err = newTransaction(uint64(i), uint16(i))
require.NoError(t, err)
hashes[i] = txs[i].Hash()
blockEvents.Events = append(blockEvents.Events, txEvents[i])
Expand Down Expand Up @@ -224,12 +296,22 @@ func Test_EventDecoding(t *testing.T) {
}
}

func newTransaction(nonce uint64) (Transaction, *types.Result, flow.Event, error) {
tx := gethTypes.NewTransaction(nonce, gethCommon.HexToAddress("0x1"), big.NewInt(10), uint64(100), big.NewInt(123), nil)
func newTransaction(nonce uint64, txIndex uint16) (Transaction, *types.Result, flow.Event, error) {
tx := gethTypes.NewTransaction(
nonce,
gethCommon.HexToAddress("0x1"),
big.NewInt(10),
uint64(100),
big.NewInt(123),
nil,
)
res := &types.Result{
ValidationError: nil,
VMError: nil,
TxType: tx.Type(),
GasConsumed: 1,
CumulativeGasUsed: 1,
GasRefund: 0,
DeployedContractAddress: &types.Address{0x5, 0x6, 0x7},
ReturnedData: []byte{0x55},
Logs: []*gethTypes.Log{{
Expand All @@ -239,7 +321,10 @@ func newTransaction(nonce uint64) (Transaction, *types.Result, flow.Event, error
Address: gethCommon.Address{0x3, 0x5},
Topics: []gethCommon.Hash{{0x2, 0x66}, {0x7, 0x1}},
}},
TxHash: tx.Hash(),
TxHash: tx.Hash(),
Index: txIndex,
PrecompiledCalls: []byte{},
StateChangeCommitment: []byte{},
}

txEncoded, err := tx.MarshalBinary()
Expand Down
Loading