Skip to content
This repository has been archived by the owner on Feb 17, 2025. It is now read-only.

cherry pick 9ea4ab4 into v0.3.4 #2752

Merged
merged 9 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add queue to ws conn subscription filter
  • Loading branch information
tclemos committed Nov 6, 2023
commit fe85888f3c9235586d1c19ce028525892bc5b134
6 changes: 4 additions & 2 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,8 @@ func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2Block
return
}
for _, filter := range blockFilters {
go filter.SendSubscriptionResponse(data)
filter.sendSubscriptionResponse(data)
go filter.SendEnqueuedSubscriptionData()
}
}
log.Debugf("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start))
Expand Down Expand Up @@ -1128,7 +1129,8 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE
if err != nil {
log.Errorf("failed to marshal ethLog response to subscription: %v", err)
}
go filter.SendSubscriptionResponse(data)
filter.EnqueueSubscriptionDataToBeSent(data)
go filter.SendEnqueuedSubscriptionData()
}
}
}
Expand Down
111 changes: 72 additions & 39 deletions jsonrpc/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/types"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/ethereum/go-ethereum/common"
"github.com/gorilla/websocket"
)
Expand All @@ -23,15 +24,76 @@ const (
FilterTypePendingTx = "pendingTx"
)

// Filter represents a filter.
// Filter represents a filter used by subscriptions to be used
// when a new L2 block is detected to select the correct requested data
type Filter struct {
ID string
Type FilterType
Parameters interface{}
LastPoll time.Time
WsConn *atomic.Pointer[websocket.Conn]

mutex sync.Mutex
wsDataQueue state.Queue[[]byte]
mutex sync.Mutex
isSending bool
}

// EnqueueSubscriptionDataToBeSent enqueues subscription data to be sent
// via web sockets connection
func (f *Filter) EnqueueSubscriptionDataToBeSent(data []byte) {
f.wsDataQueue.Push(data)
}

// SendEnqueuedSubscriptionData consumes all the enqueued subscription data
// and sends it via web sockets connection.
func (f *Filter) SendEnqueuedSubscriptionData() {
if f.isSending {
return
}

f.mutex.Lock()
defer f.mutex.Unlock()
f.isSending = true
for {
d, err := f.wsDataQueue.Pop()
if err == state.ErrQueueEmpty {
break
} else if err != nil {
log.Errorf("failed to pop subscription data from queue to be sent via web sockets to filter %v, %s", f.ID, err.Error())
break
}
f.sendSubscriptionResponse(d)
}
f.isSending = false
}

// sendSubscriptionResponse send data as subscription response via
// web sockets connection controlled by a mutex
func (f *Filter) sendSubscriptionResponse(data []byte) {
const errMessage = "Unable to write WS message to filter %v, %s"

start := time.Now()
res := types.SubscriptionResponse{
JSONRPC: "2.0",
Method: "eth_subscription",
Params: types.SubscriptionResponseParams{
Subscription: f.ID,
Result: data,
},
}
message, err := json.Marshal(res)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, f.ID, err.Error()))
return
}

err = f.WsConn.Load().WriteMessage(websocket.TextMessage, message)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, f.ID, err.Error()))
return
}
log.Debugf("WS message sent: %v", string(message))
log.Debugf("[SendSubscriptionResponse] took %v", time.Since(start))
}

// FilterType express the type of the filter, block, logs, pending transactions
Expand Down Expand Up @@ -93,19 +155,19 @@ func (f *LogFilter) MarshalJSON() ([]byte, error) {
obj.BlockHash = f.BlockHash

if f.FromBlock != nil && (*f.FromBlock == types.LatestBlockNumber) {
fromblock := ""
obj.FromBlock = &fromblock
fromBlock := ""
obj.FromBlock = &fromBlock
} else if f.FromBlock != nil {
fromblock := hex.EncodeUint64(uint64(*f.FromBlock))
obj.FromBlock = &fromblock
fromBlock := hex.EncodeUint64(uint64(*f.FromBlock))
obj.FromBlock = &fromBlock
}

if f.ToBlock != nil && (*f.ToBlock == types.LatestBlockNumber) {
toblock := ""
obj.ToBlock = &toblock
toBlock := ""
obj.ToBlock = &toBlock
} else if f.ToBlock != nil {
toblock := hex.EncodeUint64(uint64(*f.ToBlock))
obj.ToBlock = &toblock
toBlock := hex.EncodeUint64(uint64(*f.ToBlock))
obj.ToBlock = &toBlock
}

if f.Addresses != nil {
Expand Down Expand Up @@ -270,32 +332,3 @@ func (f *LogFilter) Match(log *types.Log) bool {

return true
}

func (f *Filter) SendSubscriptionResponse(data []byte) {
const errMessage = "Unable to write WS message to filter %v, %s"

start := time.Now()
res := types.SubscriptionResponse{
JSONRPC: "2.0",
Method: "eth_subscription",
Params: types.SubscriptionResponseParams{
Subscription: f.ID,
Result: data,
},
}
message, err := json.Marshal(res)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, f.ID, err.Error()))
return
}

f.mutex.Lock()
defer f.mutex.Unlock()
err = f.WsConn.Load().WriteMessage(websocket.TextMessage, message)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, f.ID, err.Error()))
return
}
log.Debugf("WS message sent: %v", string(message))
log.Debugf("[SendSubscriptionResponse] took %v", time.Since(start))
}
66 changes: 66 additions & 0 deletions state/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package state

import (
"fmt"
"sync"
)

// ErrQueueEmpty is returned when a queue operation
// depends on the queue to not be empty, but it is empty
var ErrQueueEmpty = fmt.Errorf("queue is empty")

// Queue is a generic queue implementation that implements FIFO
type Queue[T any] struct {
items []T
mutex sync.Mutex
}

// NewQueue creates a new instance of queue and initializes it
func NewQueue[T any]() *Queue[T] {
return &Queue[T]{
items: make([]T, 0),
}
}

// Push enqueue an item
func (q *Queue[T]) Push(item T) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.items = append(q.items, item)
}

// Top returns the top level item without removing it
func (q *Queue[T]) Top() (T, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
var v T
if len(q.items) == 0 {
return v, ErrQueueEmpty
}
return q.items[0], nil
}

// Pop returns the top level item and unqueues it
func (q *Queue[T]) Pop() (T, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
var v T
if len(q.items) == 0 {
return v, ErrQueueEmpty
}
v = q.items[0]
q.items = q.items[1:]
return v, nil
}

// Len returns the size of the queue
func (q *Queue[T]) Len() int {
q.mutex.Lock()
defer q.mutex.Unlock()
return len(q.items)
}

// IsEmpty returns false if the queue has itens, otherwise true
func (q *Queue[T]) IsEmpty() bool {
return q.Len() == 0
}
52 changes: 52 additions & 0 deletions state/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package state

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestQueue(t *testing.T) {
q := NewQueue[int]()

q.Push(10)
q.Push(20)
q.Push(30)

top, err := q.Top()
require.NoError(t, err)
assert.Equal(t, 10, top)
assert.Equal(t, 3, q.Len())
assert.Equal(t, false, q.IsEmpty())

pop, err := q.Pop()
require.NoError(t, err)
assert.Equal(t, 10, pop)
assert.Equal(t, 2, q.Len())
assert.Equal(t, false, q.IsEmpty())

top, err = q.Top()
require.NoError(t, err)
assert.Equal(t, 20, top)
assert.Equal(t, 2, q.Len())
assert.Equal(t, false, q.IsEmpty())

pop, err = q.Pop()
require.NoError(t, err)
assert.Equal(t, 20, pop)
assert.Equal(t, 1, q.Len())
assert.Equal(t, false, q.IsEmpty())

pop, err = q.Pop()
require.NoError(t, err)
assert.Equal(t, 30, pop)
assert.Equal(t, 0, q.Len())
assert.Equal(t, true, q.IsEmpty())

_, err = q.Top()
require.Error(t, ErrQueueEmpty, err)

_, err = q.Pop()
require.Error(t, ErrQueueEmpty, err)
}