Skip to content

Commit

Permalink
rpc: add synchronisation for httptest.Server in tests
Browse files Browse the repository at this point in the history
Add waiting for startSending to ensure that the client is ready before
the server starts sending messages.

Close #3005
Close #3312

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Mar 13, 2024
1 parent b12ef70 commit ae77cd0
Showing 1 changed file with 52 additions and 23 deletions.
75 changes: 52 additions & 23 deletions pkg/rpcclient/wsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,13 @@ func TestWSClientEvents(t *testing.T) {
fmt.Sprintf(`{"jsonrpc":"2.0","method":"block_added","params":[%s]}`, b1Verbose),
`{"jsonrpc":"2.0","method":"event_missed","params":[]}`, // the last one, will trigger receiver channels closing.
}
startSending := make(chan struct{})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/ws" && req.Method == "GET" {
var upgrader = websocket.Upgrader{}
ws, err := upgrader.Upgrade(w, req, nil)
require.NoError(t, err)
<-startSending
for _, event := range events {
err = ws.SetWriteDeadline(time.Now().Add(2 * time.Second))
require.NoError(t, err)
Expand Down Expand Up @@ -209,6 +211,7 @@ func TestWSClientEvents(t *testing.T) {
// MissedEvent must close the channels above.

wsc.subscriptionsLock.Unlock()
close(startSending)

var (
b1Cnt, b2Cnt int
Expand Down Expand Up @@ -297,11 +300,13 @@ func TestWSClientNonBlockingEvents(t *testing.T) {
require.True(t, chCap < len(events))

var blocksSent atomic.Bool
startSending := make(chan struct{})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/ws" && req.Method == "GET" {
var upgrader = websocket.Upgrader{}
ws, err := upgrader.Upgrade(w, req, nil)
require.NoError(t, err)
<-startSending
for _, event := range events {
err = ws.SetWriteDeadline(time.Now().Add(2 * time.Second))
require.NoError(t, err)
Expand Down Expand Up @@ -331,6 +336,7 @@ func TestWSClientNonBlockingEvents(t *testing.T) {
wsc.receivers[chan<- *block.Block(bCh)] = []string{"0", "1"}
wsc.subscriptionsLock.Unlock()

close(startSending)
// Check that events are sent to WSClient.
require.Eventually(t, func() bool {
return blocksSent.Load()
Expand Down Expand Up @@ -391,11 +397,12 @@ func TestWSExecutionNotificationNameCheck(t *testing.T) {
func TestWSFilteredSubscriptions(t *testing.T) {
var cases = []struct {
name string
clientCode func(*testing.T, *WSClient)
clientCode func(*testing.T, *WSClient, chan struct{})
serverCode func(*testing.T, *params.Params)
}{
{"block header primary",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
primary := byte(3)
_, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Primary: &primary}, make(chan *block.Header))
require.NoError(t, err)
Expand All @@ -410,7 +417,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"header since",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
var since uint32 = 3
_, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Since: &since}, make(chan *block.Header))
require.NoError(t, err)
Expand All @@ -425,7 +433,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"header till",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
var till uint32 = 3
_, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Till: &till}, make(chan *block.Header))
require.NoError(t, err)
Expand All @@ -440,12 +449,13 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"header primary, since and till",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
var (
since uint32 = 3
primary = byte(2)
till uint32 = 5
)
close(done)
_, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{
Primary: &primary,
Since: &since,
Expand All @@ -463,7 +473,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"blocks primary",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
primary := byte(3)
_, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Primary: &primary}, make(chan *block.Block))
require.NoError(t, err)
Expand All @@ -478,7 +489,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"blocks since",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
var since uint32 = 3
_, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, make(chan *block.Block))
require.NoError(t, err)
Expand All @@ -493,7 +505,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"blocks till",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
var till uint32 = 3
_, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Till: &till}, make(chan *block.Block))
require.NoError(t, err)
Expand All @@ -508,12 +521,13 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"blocks primary, since and till",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
var (
since uint32 = 3
primary = byte(2)
till uint32 = 5
)
close(done)
_, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{
Primary: &primary,
Since: &since,
Expand All @@ -531,7 +545,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"transactions sender",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
sender := util.Uint160{1, 2, 3, 4, 5}
_, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Sender: &sender}, make(chan *transaction.Transaction))
require.NoError(t, err)
Expand All @@ -545,7 +560,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"transactions signer",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
signer := util.Uint160{0, 42}
_, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Signer: &signer}, make(chan *transaction.Transaction))
require.NoError(t, err)
Expand All @@ -559,7 +575,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"transactions sender and signer",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
sender := util.Uint160{1, 2, 3, 4, 5}
signer := util.Uint160{0, 42}
_, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Sender: &sender, Signer: &signer}, make(chan *transaction.Transaction))
Expand All @@ -574,7 +591,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"notifications contract hash",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
contract := util.Uint160{1, 2, 3, 4, 5}
_, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, make(chan *state.ContainedNotificationEvent))
require.NoError(t, err)
Expand All @@ -588,7 +606,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"notifications name",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
name := "my_pretty_notification"
_, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Name: &name}, make(chan *state.ContainedNotificationEvent))
require.NoError(t, err)
Expand All @@ -602,7 +621,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"notifications contract hash and name",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
contract := util.Uint160{1, 2, 3, 4, 5}
name := "my_pretty_notification"
_, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract, Name: &name}, make(chan *state.ContainedNotificationEvent))
Expand All @@ -617,7 +637,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"executions state",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
vmstate := "FAULT"
_, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &vmstate}, make(chan *state.AppExecResult))
require.NoError(t, err)
Expand All @@ -631,7 +652,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"executions container",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
container := util.Uint256{1, 2, 3}
_, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &container}, make(chan *state.AppExecResult))
require.NoError(t, err)
Expand All @@ -645,7 +667,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"executions state and container",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
vmstate := "FAULT"
container := util.Uint256{1, 2, 3}
_, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &vmstate, Container: &container}, make(chan *state.AppExecResult))
Expand All @@ -661,7 +684,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
{
"notary request sender",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
sender := util.Uint160{1, 2, 3, 4, 5}
_, err := wsc.ReceiveNotaryRequests(&neorpc.NotaryRequestFilter{Sender: &sender}, make(chan *result.NotaryRequestEvent))
require.NoError(t, err)
Expand All @@ -677,7 +701,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
{
"notary request signer",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
signer := util.Uint160{0, 42}
_, err := wsc.ReceiveNotaryRequests(&neorpc.NotaryRequestFilter{Signer: &signer}, make(chan *result.NotaryRequestEvent))
require.NoError(t, err)
Expand All @@ -693,7 +718,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
{
"notary request type",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
mempoolType := mempoolevent.TransactionAdded
_, err := wsc.ReceiveNotaryRequests(&neorpc.NotaryRequestFilter{Type: &mempoolType}, make(chan *result.NotaryRequestEvent))
require.NoError(t, err)
Expand All @@ -708,7 +734,8 @@ func TestWSFilteredSubscriptions(t *testing.T) {
},
},
{"notary request sender, signer and type",
func(t *testing.T, wsc *WSClient) {
func(t *testing.T, wsc *WSClient, done chan struct{}) {
close(done)
sender := util.Uint160{1, 2, 3, 4, 5}
signer := util.Uint160{0, 42}
mempoolType := mempoolevent.TransactionAdded
Expand All @@ -727,11 +754,13 @@ func TestWSFilteredSubscriptions(t *testing.T) {
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
startSending := make(chan struct{})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/ws" && req.Method == "GET" {
var upgrader = websocket.Upgrader{}
ws, err := upgrader.Upgrade(w, req, nil)
require.NoError(t, err)
<-startSending
err = ws.SetReadDeadline(time.Now().Add(2 * time.Second))
require.NoError(t, err)
req := params.In{}
Expand All @@ -752,7 +781,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
wsc.getNextRequestID = getTestRequestID
wsc.cache.network = netmode.UnitTestNet
wsc.cache.initDone = true
c.clientCode(t, wsc)
c.clientCode(t, wsc, startSending)
wsc.Close()
})
}
Expand Down

0 comments on commit ae77cd0

Please sign in to comment.