Skip to content
This repository has been archived by the owner on Feb 17, 2025. It is now read-only.

Improve WS subscription #2635

Merged
merged 12 commits into from
Oct 13, 2023
Next Next commit
WIP: WS subscriptions
  • Loading branch information
tclemos committed Oct 4, 2023
commit 2af514ab0e52bfd5e801d723decba93bca1c5a64
6 changes: 6 additions & 0 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/big"
"net/http"
"strings"
"time"

"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/client"
Expand Down Expand Up @@ -1048,6 +1049,8 @@ func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *websocket.Conn) error {

// onNewL2Block is triggered when the state triggers the event for a new l2 block
func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64())
start := time.Now()
blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all block filters with web sockets connections: %v", err)
Expand All @@ -1061,7 +1064,9 @@ func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
}
}
}
log.Debugf("[onNewL2Block] new l2 block event for block %v took %vms to send all the messages for block filters", event.Block.NumberU64(), time.Since(start).Milliseconds())

start = time.Now()
logFilters, err := e.storage.GetAllLogFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all log filters with web sockets connections: %v", err)
Expand All @@ -1081,6 +1086,7 @@ func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
}
}
}
log.Debugf("[onNewL2Block] new l2 block event for block %v took %vms to send all the messages for log filters", event.Block.NumberU64(), time.Since(start).Milliseconds())
}

func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data interface{}) {
Expand Down
10 changes: 5 additions & 5 deletions jsonrpc/mocks/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ func NewServer(
storage storageInterface,
services []Service,
) *Server {
s.PrepareWebSocket()
if cfg.WebSockets.Enabled {
s.StartToMonitorNewL2Blocks()
}

handler := newJSONRpcHandler()

for _, service := range services {
Expand Down
2 changes: 1 addition & 1 deletion jsonrpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newMockedServer(t *testing.T, cfg Config) (*mockedServer, *mocksWrapper, *e

var newL2BlockEventHandler state.NewL2BlockEventHandler = func(e state.NewL2BlockEvent) {}
st.On("RegisterNewL2BlockEventHandler", mock.IsType(newL2BlockEventHandler)).Once()
st.On("PrepareWebSocket").Once()
st.On("StartToMonitorNewL2Blocks").Once()

services := []Service{}
if _, ok := apis[APIEth]; ok {
Expand Down
2 changes: 1 addition & 1 deletion jsonrpc/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type PoolInterface interface {

// StateInterface gathers the methods required to interact with the state.
type StateInterface interface {
PrepareWebSocket()
StartToMonitorNewL2Blocks()
BeginStateTransaction(ctx context.Context) (pgx.Tx, error)
DebugTransaction(ctx context.Context, transactionHash common.Hash, traceConfig state.TraceConfig, dbTx pgx.Tx) (*runtime.ExecutionResult, error)
EstimateGas(transaction *types.Transaction, senderAddress common.Address, l2BlockNumber *uint64, dbTx pgx.Tx) (uint64, []byte, error)
Expand Down
31 changes: 23 additions & 8 deletions state/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ type NewL2BlockEvent struct {
Block types.Block
}

// PrepareWebSocket allows the RPC to prepare ws
func (s *State) PrepareWebSocket() {
// StartToMonitorNewL2Blocks starts 2 go routines that will
// monitor new blocks and execute handlers registered to be executed
// when a new l2 block is detected. This is used by the RPC WebSocket
// filter subscription but can be used by any other component that
// needs to react to a new L2 block added to the state.
func (s *State) StartToMonitorNewL2Blocks() {
lastL2Block, err := s.GetLastL2Block(context.Background(), nil)
if errors.Is(err, ErrStateNotSynchronized) {
lastL2Block = types.NewBlockWithHeader(&types.Header{Number: big.NewInt(0)})
Expand All @@ -43,22 +47,26 @@ func (s *State) RegisterNewL2BlockEventHandler(h NewL2BlockEventHandler) {

func (s *State) handleEvents() {
for newL2BlockEvent := range s.newL2BlockEvents {
log.Debugf("[handleEvents] new l2 block event detected for block: %v", newL2BlockEvent.Block.NumberU64())
if len(s.newL2BlockEventHandlers) == 0 {
continue
}

wg := sync.WaitGroup{}
for _, handler := range s.newL2BlockEventHandlers {
wg.Add(1)
go func(h NewL2BlockEventHandler) {
go func(h NewL2BlockEventHandler, e NewL2BlockEvent) {
defer func() {
wg.Done()
if r := recover(); r != nil {
log.Errorf("failed and recovered in NewL2BlockEventHandler: %v", r)
}
}()
h(newL2BlockEvent)
}(handler)
log.Debugf("[handleEvents] triggering new l2 block event handler for block: %v", e.Block.NumberU64())
start := time.Now()
h(e)
log.Debugf("[handleEvents] new l2 block event handler for block %v took %vms to be executed", e.Block.NumberU64(), time.Since(start).Milliseconds())
}(handler, newL2BlockEvent)
}
wg.Wait()
}
Expand Down Expand Up @@ -91,17 +99,24 @@ func (s *State) monitorNewL2Blocks() {
continue
}

for bn := s.lastL2BlockSeen.NumberU64() + uint64(1); bn <= lastL2Block.NumberU64(); bn++ {
fromBlockNumber := s.lastL2BlockSeen.NumberU64() + uint64(1)
toBlockNumber := lastL2Block.NumberU64()
log.Debugf("[monitorNewL2Blocks] new l2 block detected from block %v to %v", fromBlockNumber, toBlockNumber)

for bn := fromBlockNumber; bn <= toBlockNumber; bn++ {
block, err := s.GetL2BlockByNumber(context.Background(), bn, nil)
if err != nil {
log.Errorf("failed to l2 block while monitoring new blocks: %v", err)
log.Errorf("failed to get l2 block while monitoring new blocks: %v", err)
break
}

log.Debugf("[monitorNewL2Blocks] sending NewL2BlockEvent for block %v", block.NumberU64())
start := time.Now()
s.newL2BlockEvents <- NewL2BlockEvent{
Block: *block,
}
log.Infof("new l2 blocks detected, Number %v, Hash %v", block.NumberU64(), block.Hash().String())
log.Debugf("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %vms to be sent", block.NumberU64(), time.Since(start).Milliseconds())
log.Infof("new l2 block detected: number %v, hash %v", block.NumberU64(), block.Hash().String())
s.lastL2BlockSeen = *block
}

Expand Down
5 changes: 4 additions & 1 deletion state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/jackc/pgx/v4"
Expand Down Expand Up @@ -1460,6 +1461,8 @@ func scanLogs(rows pgx.Rows) ([]*types.Log, error) {

// AddL2Block adds a new L2 block to the State Store
func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2Block *types.Block, receipts []*types.Receipt, effectivePercentage uint8, dbTx pgx.Tx) error {
log.Debugf("[AddL2Block] adding l2 block: %v", l2Block.NumberU64())
start := time.Now()
e := p.getExecQuerier(dbTx)

const addTransactionSQL = "INSERT INTO state.transaction (hash, encoded, decoded, l2_block_num, effective_percentage) VALUES($1, $2, $3, $4, $5)"
Expand Down Expand Up @@ -1523,7 +1526,7 @@ func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2
}
}
}

log.Debugf("[AddL2Block] l2 block %v took %vms to be added", l2Block.NumberU64(), time.Since(start).Milliseconds())
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
)

const newL2BlockEventBufferSize = 1000

var (
// ZeroHash is the hash 0x0000000000000000000000000000000000000000000000000000000000000000
ZeroHash = common.Hash{}
Expand Down Expand Up @@ -48,7 +50,7 @@ func NewState(cfg Config, storage *PostgresStorage, executorClient executor.Exec
executorClient: executorClient,
tree: stateTree,
eventLog: eventLog,
newL2BlockEvents: make(chan NewL2BlockEvent),
newL2BlockEvents: make(chan NewL2BlockEvent, newL2BlockEventBufferSize),
newL2BlockEventHandlers: []NewL2BlockEventHandler{},
}

Expand Down