Skip to content

Commit

Permalink
cherry pick WS Improvements into v0.3.4 (#2752)
Browse files Browse the repository at this point in the history
  • Loading branch information
tclemos authored Nov 8, 2023
1 parent d36d1d3 commit 71ad87a
Show file tree
Hide file tree
Showing 20 changed files with 663 additions and 186 deletions.
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ require (
github.com/umbracle/ethgo v0.1.3
github.com/urfave/cli/v2 v2.25.7
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.13.0
golang.org/x/net v0.15.0
golang.org/x/crypto v0.14.0
golang.org/x/net v0.17.0
golang.org/x/sync v0.3.0
google.golang.org/grpc v1.56.1
google.golang.org/protobuf v1.31.0
Expand Down Expand Up @@ -139,8 +139,8 @@ require (
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/term v0.12.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.13.0 // indirect
Expand All @@ -151,7 +151,7 @@ require (
)

require (
github.com/gorilla/websocket v1.5.0
github.com/gorilla/websocket v1.5.1
github.com/holiman/uint256 v1.2.3
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
)
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/habx/pg-commands v0.6.1 h1:+9vo6+N/usIZ5rF6jIJle5Tjvf01B09i0FPfzIvgoIg=
github.com/habx/pg-commands v0.6.1/go.mod h1:PkBR8QOJKbIjv4r1NuOFrz+LyjsbiAtmQbuu6+w0SAA=
Expand Down Expand Up @@ -862,8 +862,8 @@ golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0
golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -958,8 +958,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -1067,17 +1067,17 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20220722155259-a9ba230a4035/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/term v0.12.0 h1:/ZfYdc3zq+q02Rv9vGqTeSItdzZTSNDmfTi0mBAuidU=
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
136 changes: 96 additions & 40 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"math/big"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/client"
Expand Down Expand Up @@ -790,7 +793,7 @@ func (e *EthEndpoints) NewBlockFilter() (interface{}, types.Error) {
}

// internal
func (e *EthEndpoints) newBlockFilter(wsConn *websocket.Conn) (interface{}, types.Error) {
func (e *EthEndpoints) newBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) {
id, err := e.storage.NewBlockFilter(wsConn)
if err != nil {
return RPCErrorResponse(types.DefaultErrorCode, "failed to create new block filter", err)
Expand All @@ -807,7 +810,7 @@ func (e *EthEndpoints) NewFilter(filter LogFilter) (interface{}, types.Error) {
}

// internal
func (e *EthEndpoints) newFilter(wsConn *websocket.Conn, filter LogFilter) (interface{}, types.Error) {
func (e *EthEndpoints) newFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (interface{}, types.Error) {
id, err := e.storage.NewLogFilter(wsConn, filter)
if errors.Is(err, ErrFilterInvalidPayload) {
return RPCErrorResponse(types.InvalidParamsErrorCode, err.Error(), nil)
Expand All @@ -826,7 +829,7 @@ func (e *EthEndpoints) NewPendingTransactionFilter() (interface{}, types.Error)
}

// internal
func (e *EthEndpoints) newPendingTransactionFilter(wsConn *websocket.Conn) (interface{}, types.Error) {
func (e *EthEndpoints) newPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) {
return nil, types.NewRPCError(types.DefaultErrorCode, "not supported yet")
// id, err := e.storage.NewPendingTransactionFilter(wsConn)
// if err != nil {
Expand Down Expand Up @@ -988,7 +991,7 @@ func (e *EthEndpoints) updateFilterLastPoll(filterID string) types.Error {
// The node will return a subscription id.
// For each event that matches the subscription a notification with relevant
// data is sent together with the subscription id.
func (e *EthEndpoints) Subscribe(wsConn *websocket.Conn, name string, logFilter *LogFilter) (interface{}, types.Error) {
func (e *EthEndpoints) Subscribe(wsConn *atomic.Pointer[websocket.Conn], name string, logFilter *LogFilter) (interface{}, types.Error) {
switch name {
case "newHeads":
return e.newBlockFilter(wsConn)
Expand All @@ -1014,70 +1017,123 @@ func (e *EthEndpoints) Unsubscribe(wsConn *websocket.Conn, filterID string) (int

// uninstallFilterByWSConn uninstalls the filters connected to the
// provided web socket connection
func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *websocket.Conn) error {
func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error {
return e.storage.UninstallFilterByWSConn(wsConn)
}

// onNewL2Block is triggered when the state triggers the event for a new l2 block
func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64())
wg := sync.WaitGroup{}

wg.Add(1)
go e.notifyNewHeads(&wg, event)

wg.Add(1)
go e.notifyNewLogs(&wg, event)

wg.Wait()
}

func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
defer wg.Done()
start := time.Now()
blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all block filters with web sockets connections: %v", err)
} else {
b, err := types.NewBlock(&event.Block, nil, false, false)
if err != nil {
log.Errorf("failed to build block response to subscription: %v", err)
return
}
data, err := json.Marshal(b)
if err != nil {
log.Errorf("failed to marshal block response to subscription: %v", err)
return
}
for _, filter := range blockFilters {
b, err := types.NewBlock(&event.Block, nil, false, false)
if err != nil {
log.Errorf("failed to build block response to subscription: %v", err)
} else {
e.sendSubscriptionResponse(filter, b)
}
filter.EnqueueSubscriptionDataToBeSent(data)
go filter.SendEnqueuedSubscriptionData()
}
}
log.Debugf("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start))
}

func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
defer wg.Done()
start := time.Now()
logFilters, err := e.storage.GetAllLogFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all log filters with web sockets connections: %v", err)
} else {
for _, filter := range logFilters {
changes, err := e.GetFilterChanges(filter.ID)
filterParameters := filter.Parameters.(LogFilter)
bn := types.BlockNumber(event.Block.NumberU64())

// if from and to blocks are nil, set it to the current block to make
// the query faster
if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil {
filterParameters.FromBlock = &bn
filterParameters.ToBlock = &bn
} else {
// if the filter has a fromBlock value set
// and the event block number is smaller than the
// from block, skip this filter
if filterParameters.FromBlock != nil {
fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
if fromBlock > event.Block.NumberU64() {
continue
}
// otherwise set the from block to a fixed number
// to avoid querying it again in the next step
fixedFromBlock := types.BlockNumber(fromBlock)
filterParameters.FromBlock = &fixedFromBlock
}

// if the filter has a toBlock value set
// and the event block number is greater than the
// to block, skip this filter
if filterParameters.ToBlock != nil {
toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
if toBlock > event.Block.NumberU64() {
continue
}
// otherwise set the to block to a fixed number
// to avoid querying it again in the next step
fixedToBlock := types.BlockNumber(toBlock)
filterParameters.ToBlock = &fixedToBlock
}
}

// get new logs for this specific filter
changes, err := e.internalGetLogs(context.Background(), nil, filterParameters)
if err != nil {
log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", filter.ID, err)
continue
}

// if there are new logs for the filter, send it
if changes != nil {
ethLogs := changes.([]types.Log)
for _, ethLog := range ethLogs {
e.sendSubscriptionResponse(filter, ethLog)
data, err := json.Marshal(ethLog)
if err != nil {
log.Errorf("failed to marshal ethLog response to subscription: %v", err)
}
filter.EnqueueSubscriptionDataToBeSent(data)
go filter.SendEnqueuedSubscriptionData()
}
}
}
}
}

func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data interface{}) {
const errMessage = "Unable to write WS message to filter %v, %s"
result, err := json.Marshal(data)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
}

res := types.SubscriptionResponse{
JSONRPC: "2.0",
Method: "eth_subscription",
Params: types.SubscriptionResponseParams{
Subscription: filter.ID,
Result: result,
},
}
message, err := json.Marshal(res)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
}

err = filter.WsConn.WriteMessage(websocket.TextMessage, message)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
}
log.Debugf("WS message sent: %v", string(message))
log.Debugf("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start))
}
Loading

0 comments on commit 71ad87a

Please sign in to comment.