Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry pick 9ea4ab4 into v0.3.4 #2752

Merged
merged 9 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 99 additions & 20 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"math/big"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/client"
Expand Down Expand Up @@ -790,7 +793,7 @@ func (e *EthEndpoints) NewBlockFilter() (interface{}, types.Error) {
}

// internal
func (e *EthEndpoints) newBlockFilter(wsConn *websocket.Conn) (interface{}, types.Error) {
func (e *EthEndpoints) newBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) {
id, err := e.storage.NewBlockFilter(wsConn)
if err != nil {
return RPCErrorResponse(types.DefaultErrorCode, "failed to create new block filter", err)
Expand All @@ -807,7 +810,7 @@ func (e *EthEndpoints) NewFilter(filter LogFilter) (interface{}, types.Error) {
}

// internal
func (e *EthEndpoints) newFilter(wsConn *websocket.Conn, filter LogFilter) (interface{}, types.Error) {
func (e *EthEndpoints) newFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (interface{}, types.Error) {
id, err := e.storage.NewLogFilter(wsConn, filter)
if errors.Is(err, ErrFilterInvalidPayload) {
return RPCErrorResponse(types.InvalidParamsErrorCode, err.Error(), nil)
Expand All @@ -826,7 +829,7 @@ func (e *EthEndpoints) NewPendingTransactionFilter() (interface{}, types.Error)
}

// internal
func (e *EthEndpoints) newPendingTransactionFilter(wsConn *websocket.Conn) (interface{}, types.Error) {
func (e *EthEndpoints) newPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) {
return nil, types.NewRPCError(types.DefaultErrorCode, "not supported yet")
// id, err := e.storage.NewPendingTransactionFilter(wsConn)
// if err != nil {
Expand Down Expand Up @@ -988,7 +991,7 @@ func (e *EthEndpoints) updateFilterLastPoll(filterID string) types.Error {
// The node will return a subscription id.
// For each event that matches the subscription a notification with relevant
// data is sent together with the subscription id.
func (e *EthEndpoints) Subscribe(wsConn *websocket.Conn, name string, logFilter *LogFilter) (interface{}, types.Error) {
func (e *EthEndpoints) Subscribe(wsConn *atomic.Pointer[websocket.Conn], name string, logFilter *LogFilter) (interface{}, types.Error) {
switch name {
case "newHeads":
return e.newBlockFilter(wsConn)
Expand All @@ -1014,70 +1017,146 @@ func (e *EthEndpoints) Unsubscribe(wsConn *websocket.Conn, filterID string) (int

// uninstallFilterByWSConn uninstalls the filters connected to the
// provided web socket connection
func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *websocket.Conn) error {
func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error {
return e.storage.UninstallFilterByWSConn(wsConn)
}

// onNewL2Block is triggered when the state triggers the event for a new l2 block
func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64())
wg := sync.WaitGroup{}

wg.Add(1)
go e.notifyNewHeads(&wg, event)

wg.Add(1)
go e.notifyNewLogs(&wg, event)

wg.Wait()
}

func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
defer wg.Done()
start := time.Now()
blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all block filters with web sockets connections: %v", err)
} else {
b, err := types.NewBlock(&event.Block, nil, false, false)
if err != nil {
log.Errorf("failed to build block response to subscription: %v", err)
return
}
data, err := json.Marshal(b)
if err != nil {
log.Errorf("failed to marshal block response to subscription: %v", err)
return
}
for _, filter := range blockFilters {
b, err := types.NewBlock(&event.Block, nil, false, false)
if err != nil {
log.Errorf("failed to build block response to subscription: %v", err)
} else {
e.sendSubscriptionResponse(filter, b)
}
e.sendSubscriptionResponse(filter, data)
}
}
log.Debugf("[notifyNewHeads] new l2 block event for block %v took %vms to send all the messages for block filters", event.Block.NumberU64(), time.Since(start).Milliseconds())
}

func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
defer wg.Done()
start := time.Now()
logFilters, err := e.storage.GetAllLogFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all log filters with web sockets connections: %v", err)
} else {
for _, filter := range logFilters {
changes, err := e.GetFilterChanges(filter.ID)
filterParameters := filter.Parameters.(LogFilter)
bn := types.BlockNumber(event.Block.NumberU64())

// if from and to blocks are nil, set it to the current block to make
// the query faster
if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil {
filterParameters.FromBlock = &bn
filterParameters.ToBlock = &bn
} else {
// if the filter has a fromBlock value set
// and the event block number is smaller than the
// from block, skip this filter
if filterParameters.FromBlock != nil {
fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
if fromBlock > event.Block.NumberU64() {
continue
}
// otherwise set the from block to a fixed number
// to avoid querying it again in the next step
fixedFromBlock := types.BlockNumber(fromBlock)
filterParameters.FromBlock = &fixedFromBlock
}

// if the filter has a toBlock value set
// and the event block number is greater than the
// to block, skip this filter
if filterParameters.ToBlock != nil {
toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
if toBlock > event.Block.NumberU64() {
continue
}
// otherwise set the to block to a fixed number
// to avoid querying it again in the next step
fixedToBlock := types.BlockNumber(toBlock)
filterParameters.ToBlock = &fixedToBlock
}
}

// get new logs for this specific filter
changes, err := e.internalGetLogs(context.Background(), nil, filterParameters)
if err != nil {
log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", filter.ID, err)
continue
}

// if there are new logs for the filter, send it
if changes != nil {
ethLogs := changes.([]types.Log)
for _, ethLog := range ethLogs {
e.sendSubscriptionResponse(filter, ethLog)
data, err := json.Marshal(ethLog)
if err != nil {
log.Errorf("failed to marshal ethLog response to subscription: %v", err)
}
e.sendSubscriptionResponse(filter, data)
}
}
}
}
log.Debugf("[notifyNewLogs] new l2 block event for block %v took %vms to send all the messages for log filters", event.Block.NumberU64(), time.Since(start).Milliseconds())
}

func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data interface{}) {
func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data []byte) {
const errMessage = "Unable to write WS message to filter %v, %s"
result, err := json.Marshal(data)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
}

res := types.SubscriptionResponse{
JSONRPC: "2.0",
Method: "eth_subscription",
Params: types.SubscriptionResponseParams{
Subscription: filter.ID,
Result: result,
Result: data,
},
}
message, err := json.Marshal(res)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
return
}

err = filter.WsConn.WriteMessage(websocket.TextMessage, message)
err = filter.WsConn.Load().WriteMessage(websocket.TextMessage, message)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
return
}
log.Debugf("WS message sent: %v", string(message))
}
Loading