Skip to content

Commit

Permalink
Sort Flow events received from Event Streaming API by their EventInde…
Browse files Browse the repository at this point in the history
…x field in ascending order
  • Loading branch information
m-Peter committed Oct 22, 2024
1 parent db833bc commit f2298e4
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 6 deletions.
7 changes: 7 additions & 0 deletions models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package models

import (
"fmt"
"sort"
"strings"

"github.com/onflow/cadence"
Expand Down Expand Up @@ -39,6 +40,12 @@ type CadenceEvents struct {

// NewCadenceEvents decodes the events into evm types.
func NewCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) {
// first we sort all the events in the block, by their EventIndex,
// to make sure that they are in the correct order.
sort.Slice(events.Events, func(i, j int) bool {
return events.Events[i].EventIndex < events.Events[j].EventIndex
})

e, err := decodeCadenceEvents(events)
if err != nil {
return nil, err
Expand Down
71 changes: 65 additions & 6 deletions models/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestCadenceEvents_Block(t *testing.T) {

// generate txs
for i := 0; i < txCount; i++ {
tx, _, txEvent, err := newTransaction(uint64(i))
tx, _, txEvent, err := newTransaction(uint64(i), uint16(i))
require.NoError(t, err)
hashes[i] = tx.Hash()
events = append(events, txEvent)
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestCadenceEvents_Block(t *testing.T) {
})

t.Run("block with more transaction hashes", func(t *testing.T) {
tx, _, _, err := newTransaction(1)
tx, _, _, err := newTransaction(1, 0)
require.NoError(t, err)

// generate single block
Expand All @@ -153,6 +153,52 @@ func TestCadenceEvents_Block(t *testing.T) {
"block 1 references missing transaction/s",
)
})

t.Run("block & transaction events are ordered by event index", func(t *testing.T) {
txCount := 10
hashes := make([]gethCommon.Hash, txCount)
blockEvents := flow.BlockEvents{
BlockID: flow.Identifier{0x1},
Height: 1,
}

// generate txs
for i := 0; i < txCount; i++ {
tx, _, txEvent, err := newTransaction(uint64(i), uint16(i))
require.NoError(t, err)

// make the Flow events come in reversed order
txIndex := (txCount - 1) - i
txEvent.EventIndex = txIndex

hashes[txIndex] = tx.Hash()
blockEvents.Events = append(blockEvents.Events, txEvent)
}

// generate single block
_, blockEvent, err := newBlock(1, hashes)
require.NoError(t, err)

blockEvent.EventIndex = txCount
blockEvents.Events = append(blockEvents.Events, blockEvent)

cdcEvents, err := NewCadenceEvents(blockEvents)
require.NoError(t, err)

// assert that Flow events are sorted by their EventIndex field
for i, event := range cdcEvents.events.Events {
assert.Equal(t, i, event.EventIndex)
}

// assert that EVM transactions & receipts are sorted by their
// TransactionIndex field
for i := 0; i < txCount; i++ {
tx := cdcEvents.transactions[i]
receipt := cdcEvents.receipts[i]
assert.Equal(t, tx.Hash(), receipt.TxHash)
assert.Equal(t, uint(i), receipt.TransactionIndex)
}
})
}

func Test_EventDecoding(t *testing.T) {
Expand All @@ -171,7 +217,7 @@ func Test_EventDecoding(t *testing.T) {
// generate txs
for i := 0; i < txCount; i++ {
var err error
txs[i], results[i], txEvents[i], err = newTransaction(uint64(i))
txs[i], results[i], txEvents[i], err = newTransaction(uint64(i), uint16(i))
require.NoError(t, err)
hashes[i] = txs[i].Hash()
blockEvents.Events = append(blockEvents.Events, txEvents[i])
Expand Down Expand Up @@ -224,12 +270,22 @@ func Test_EventDecoding(t *testing.T) {
}
}

func newTransaction(nonce uint64) (Transaction, *types.Result, flow.Event, error) {
tx := gethTypes.NewTransaction(nonce, gethCommon.HexToAddress("0x1"), big.NewInt(10), uint64(100), big.NewInt(123), nil)
func newTransaction(nonce uint64, txIndex uint16) (Transaction, *types.Result, flow.Event, error) {
tx := gethTypes.NewTransaction(
nonce,
gethCommon.HexToAddress("0x1"),
big.NewInt(10),
uint64(100),
big.NewInt(123),
nil,
)
res := &types.Result{
ValidationError: nil,
VMError: nil,
TxType: tx.Type(),
GasConsumed: 1,
CumulativeGasUsed: 1,
GasRefund: 0,
DeployedContractAddress: &types.Address{0x5, 0x6, 0x7},
ReturnedData: []byte{0x55},
Logs: []*gethTypes.Log{{
Expand All @@ -239,7 +295,10 @@ func newTransaction(nonce uint64) (Transaction, *types.Result, flow.Event, error
Address: gethCommon.Address{0x3, 0x5},
Topics: []gethCommon.Hash{{0x2, 0x66}, {0x7, 0x1}},
}},
TxHash: tx.Hash(),
TxHash: tx.Hash(),
Index: txIndex,
PrecompiledCalls: []byte{},
StateChangeCommitment: []byte{},
}

txEncoded, err := tx.MarshalBinary()
Expand Down

0 comments on commit f2298e4

Please sign in to comment.