Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More multi message tests for WebSockets #2184

Merged
merged 7 commits into from
Oct 19, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 160 additions & 26 deletions js/modules/k6/ws/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strconv"
Expand All @@ -42,6 +43,8 @@ import (
"go.k6.io/k6/stats"
)

const statusProtocolSwitch = 101

func assertSessionMetricsEmitted(t *testing.T, sampleContainers []stats.SampleContainer, subprotocol, url string, status int, group string) {
seenSessions := false
seenSessionDuration := false
Expand Down Expand Up @@ -71,21 +74,20 @@ func assertSessionMetricsEmitted(t *testing.T, sampleContainers []stats.SampleCo
assert.True(t, seenSessionDuration, "url %s didn't emit SessionDuration", url)
}

func assertMetricEmitted(t *testing.T, metricName string, sampleContainers []stats.SampleContainer, url string) {
seenMetric := false
func assertMetricEmittedCount(t *testing.T, metricName string, sampleContainers []stats.SampleContainer, url string, count int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should make this function accept a struct. It has a lot of parameters, and, IMHO, it makes it hard to follow what's going on when reading the usage of this function inside the tests.

t.Helper()
actualCount := 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this helper function a t.Helper?


for _, sampleContainer := range sampleContainers {
for _, sample := range sampleContainer.GetSamples() {
surl, ok := sample.Tags.Get("url")
assert.True(t, ok)
if surl == url {
if sample.Metric.Name == metricName {
seenMetric = true
}
if surl == url && sample.Metric.Name == metricName {
actualCount++
}
}
}
assert.True(t, seenMetric, "url %s didn't emit %s", url, metricName)
assert.Equal(t, count, actualCount, "url %s emitted %s %d times, expected was %d times", url, metricName, actualCount, count)
}

func TestSession(t *testing.T) {
Expand Down Expand Up @@ -131,7 +133,7 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")

t.Run("connect_wss", func(t *testing.T) {
_, err := rt.RunString(sr(`
Expand All @@ -142,7 +144,7 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-echo"), statusProtocolSwitch, "")

t.Run("open", func(t *testing.T) {
_, err := rt.RunString(sr(`
Expand All @@ -157,15 +159,15 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")

t.Run("send_receive", func(t *testing.T) {
_, err := rt.RunString(sr(`
var res = ws.connect("WSBIN_URL/ws-echo", function(socket){
socket.on("open", function() {
socket.send("test")
})
socket.on("message", function (data){
socket.on("message", function (data) {
if (!data=="test") {
throw new Error ("echo'd data doesn't match our message!");
}
Expand All @@ -177,9 +179,9 @@ func TestSession(t *testing.T) {
})

samplesBuf := stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "")
assertMetricEmitted(t, metrics.WSMessagesSentName, samplesBuf, sr("WSBIN_URL/ws-echo"))
assertMetricEmitted(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSBIN_URL/ws-echo"))
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSMessagesSentName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1)
assertMetricEmittedCount(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1)

t.Run("interval", func(t *testing.T) {
_, err := rt.RunString(sr(`
Expand All @@ -194,7 +196,7 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")
t.Run("bad interval", func(t *testing.T) {
_, err := rt.RunString(sr(`
var counter = 0;
Expand Down Expand Up @@ -240,7 +242,7 @@ func TestSession(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "setTimeout requires a >0 timeout parameter, received 0.00 ")
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")

t.Run("ping", func(t *testing.T) {
_, err := rt.RunString(sr(`
Expand All @@ -263,8 +265,8 @@ func TestSession(t *testing.T) {
})

samplesBuf = stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "")
assertMetricEmitted(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo"))
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1)

t.Run("multiple_handlers", func(t *testing.T) {
_, err := rt.RunString(sr(`
Expand Down Expand Up @@ -297,8 +299,8 @@ func TestSession(t *testing.T) {
})

samplesBuf = stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "")
assertMetricEmitted(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo"))
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1)

t.Run("client_close", func(t *testing.T) {
_, err := rt.RunString(sr(`
Expand All @@ -315,7 +317,7 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")

serverCloseTests := []struct {
name string
Expand Down Expand Up @@ -346,6 +348,138 @@ func TestSession(t *testing.T) {
assert.NoError(t, err)
})
}

t.Run("multi_message", func(t *testing.T) {
t.Parallel()

tb.Mux.HandleFunc("/ws-echo-multi", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
conn, err := (&websocket.Upgrader{}).Upgrade(w, req, w.Header())
if err != nil {
return
}

for {
messageType, r, e := conn.NextReader()
if e != nil {
return
}
var wc io.WriteCloser
wc, err = conn.NextWriter(messageType)
if err != nil {
return
}
if _, err = io.Copy(wc, r); err != nil {
return
}
if err = wc.Close(); err != nil {
return
}
}
}))

t.Run("send_receive_multiple_ws", func(t *testing.T) {
_, err := rt.RunString(sr(`
var msg1 = "test1"
var msg2 = "test2"
var msg3 = "test3"
var allMsgsRecvd = false
var res = ws.connect("WSBIN_URL/ws-echo-multi", (socket) => {
socket.on("open", () => {
socket.send(msg1)
})
socket.on("message", (data) => {
if (data == msg1){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest doing:

socket.on("message", function (data){ -> socket.on("message", function (data) {

The same, also, for the if statements below.

socket.send(msg2)
}
if (data == msg2){
socket.send(msg3)
}
if (data == msg3){
allMsgsRecvd = true
socket.close()
}
});
});

if (!allMsgsRecvd) {
throw new Error ("messages 1,2,3 in sequence, was not received from server");
}
`))
assert.NoError(t, err)
})

samplesBuf = stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo-multi"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSMessagesSentName, samplesBuf, sr("WSBIN_URL/ws-echo-multi"), 3)
assertMetricEmittedCount(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSBIN_URL/ws-echo-multi"), 3)

t.Run("send_receive_multiple_wss", func(t *testing.T) {
_, err := rt.RunString(sr(`
var msg1 = "test1"
var msg2 = "test2"
var secondMsgReceived = false
var res = ws.connect("WSSBIN_URL/ws-echo-multi", (socket) => {
socket.on("open", () => {
socket.send(msg1)
})
socket.on("message", (data) => {
if (data == msg1){
socket.send(msg2)
}
if (data == msg2){
secondMsgReceived = true
socket.close()
}
});
});

if (!secondMsgReceived) {
throw new Error ("second test message was not received from server!");
}
`))
assert.NoError(t, err)
})

samplesBuf = stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSSBIN_URL/ws-echo-multi"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSMessagesSentName, samplesBuf, sr("WSSBIN_URL/ws-echo-multi"), 2)
assertMetricEmittedCount(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSSBIN_URL/ws-echo-multi"), 2)

t.Run("send_receive_text_binary", func(t *testing.T) {
_, err := rt.RunString(sr(`
var msg1 = "test1"
var msg2 = new Uint8Array([116, 101, 115, 116, 50]); // 'test2'
var secondMsgReceived = false
var res = ws.connect("WSBIN_URL/ws-echo-multi", (socket) => {
socket.on("open", () => {
socket.send(msg1)
})
socket.on("message", (data) => {
if (data == msg1){
socket.sendBinary(msg2.buffer)
}
});
socket.on("binaryMessage", (data) => {
let data2 = new Uint8Array(data)
if(JSON.stringify(msg2) == JSON.stringify(data2)){
secondMsgReceived = true
}
socket.close()
})
});

if (!secondMsgReceived) {
throw new Error ("second test message was not received from server!");
}
`))
assert.NoError(t, err)
})

samplesBuf = stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo-multi"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSMessagesSentName, samplesBuf, sr("WSBIN_URL/ws-echo-multi"), 2)
assertMetricEmittedCount(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSBIN_URL/ws-echo-multi"), 2)
})
}

func TestSocketSendBinary(t *testing.T) { //nolint: tparallel
Expand Down Expand Up @@ -519,7 +653,7 @@ func TestErrors(t *testing.T) {
}
`))
assert.NoError(t, err)
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), statusProtocolSwitch, "")
})

t.Run("error on close", func(t *testing.T) {
Expand Down Expand Up @@ -548,7 +682,7 @@ func TestErrors(t *testing.T) {
});
`))
assert.NoError(t, err)
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-close"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-close"), statusProtocolSwitch, "")
})
}

Expand Down Expand Up @@ -659,7 +793,7 @@ func TestTLSConfig(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), statusProtocolSwitch, "")

t.Run("custom certificates", func(t *testing.T) {
state.TLSConfig = tb.TLSClientConfig
Expand All @@ -674,7 +808,7 @@ func TestTLSConfig(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), statusProtocolSwitch, "")
}

func TestReadPump(t *testing.T) {
Expand Down Expand Up @@ -800,5 +934,5 @@ func TestUserAgent(t *testing.T) {
`))
assert.NoError(t, err)

assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-useragent"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-useragent"), statusProtocolSwitch, "")
}