From 35df543ca58ad0d1e933ef1f0ddade9ec5df8ce1 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Mon, 9 Jan 2023 17:51:29 +0200 Subject: [PATCH] http: add asyncRequest This is implementation of http.request, but it returns a promise and does all the networking off the event loop. The code is pretty simple and seems fairly safe given that most possible race conditions would also happen in the case of `http.batch`. Even with that there is at least one test that previously couldn't have happened with http.batch. closes #2825 --- js/modules/k6/http/async_request_test.go | 290 +++++++++++++++++++ js/modules/k6/http/http.go | 1 + js/modules/k6/http/request.go | 84 ++++-- js/modules/k6/http/response_callback_test.go | 22 +- 4 files changed, 374 insertions(+), 23 deletions(-) create mode 100644 js/modules/k6/http/async_request_test.go diff --git a/js/modules/k6/http/async_request_test.go b/js/modules/k6/http/async_request_test.go new file mode 100644 index 00000000000..79be3d1d121 --- /dev/null +++ b/js/modules/k6/http/async_request_test.go @@ -0,0 +1,290 @@ +package http + +import ( + "testing" + "time" + + "github.com/sirupsen/logrus" + logtest "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/js/modulestest" +) + +func wrapInAsyncLambda(input string) string { + // This makes it possible to use `await` freely on the "top" level + return "(async () => {\n " + input + "\n })()" +} + +func runOnEventLoop(runtime *modulestest.Runtime, code string) error { + // TODO move this in modulestest.Runtime and extend it + err := runtime.EventLoop.Start(func() error { + _, err := runtime.VU.Runtime().RunString(wrapInAsyncLambda(code)) + return err + }) + runtime.EventLoop.WaitOnRegistered() + return err +} + +func TestAsyncRequest(t *testing.T) { + t.Parallel() + t.Run("EmptyBody", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + + sr := ts.tb.Replacer.Replace + err := runOnEventLoop(ts.runtime, sr(` + var reqUrl = "HTTPBIN_URL/cookies" + var res = await http.asyncRequest("GET", reqUrl); + var jar = new http.CookieJar(); + + jar.set("HTTPBIN_URL/cookies", "key", "value"); + res = await http.asyncRequest("GET", reqUrl, null, { cookies: { key2: "value2" }, jar: jar }); + + if (res.json().key != "value") { throw new Error("wrong cookie value: " + res.json().key); } + + if (res.status != 200) { throw new Error("wrong status: " + res.status); } + if (res.request["method"] !== "GET") { throw new Error("http request method was not \"GET\": " + JSON.stringify(res.request)) } + if (res.request["body"].length != 0) { throw new Error("http request body was not null: " + JSON.stringify(res.request["body"])) } + if (res.request["url"] != reqUrl) { + throw new Error("wrong http request url: " + JSON.stringify(res.request)) + } + if (res.request["cookies"]["key2"][0].name != "key2") { throw new Error("wrong http request cookies: " + JSON.stringify(JSON.stringify(res.request["cookies"]["key2"]))) } + if (res.request["headers"]["User-Agent"][0] != "TestUserAgent") { throw new Error("wrong http request headers: " + JSON.stringify(res.request)) } + `)) + assert.NoError(t, err) + }) + t.Run("NonEmptyBody", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + + sr := ts.tb.Replacer.Replace + err := runOnEventLoop(ts.runtime, sr(` + var res = await http.asyncRequest("POST", "HTTPBIN_URL/post", {a: "a", b: 2}, {headers: {"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"}}); + if (res.status != 200) { throw new Error("wrong status: " + res.status); } + if (res.request["body"] != "a=a&b=2") { throw new Error("http request body was not set properly: " + JSON.stringify(res.request))} + `)) + assert.NoError(t, err) + }) + t.Run("Concurrent", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + sr := ts.tb.Replacer.Replace + err := runOnEventLoop(ts.runtime, sr(` + let start = Date.now() + let p1 = http.asyncRequest("GET", "HTTPBIN_URL/delay/200ms").then(() => { return Date.now() - start}) + let p2 = http.asyncRequest("GET", "HTTPBIN_URL/delay/100ms").then(() => { return Date.now() - start}) + let time1 = await p1; + let time2 = await p2; + if (time1 < time2) { + throw("request that should've taken 200ms took less time then one that should take 100ms " + time1 +">" + time2 ) + } + + `)) + assert.NoError(t, err) + }) +} + +func TestAsyncRequestResponseCallbackRace(t *testing.T) { + // This test is here only to tease out race conditions + t.Parallel() + ts := newTestCase(t) + err := ts.runtime.VU.Runtime().Set("q", func(f func()) { + rg := ts.runtime.EventLoop.RegisterCallback() + time.AfterFunc(time.Millisecond*5, func() { + rg(func() error { + f() + return nil + }) + }) + }) + require.NoError(t, err) + err = ts.runtime.VU.Runtime().Set("log", func(s string) { + // t.Log(s) // uncomment for debugging + }) + require.NoError(t, err) + err = runOnEventLoop(ts.runtime, ts.tb.Replacer.Replace(` + let call = (i) => { + log("s"+i) + if (i > 200) { return null; } + http.setResponseCallback(http.expectedStatuses(i)) + q(() => call(i+1)) // don't use promises as they resolve before eventloop callbacks such as the one from asyncRequest + } + for (let j = 0; j< 50; j++) { + call(0) + await http.asyncRequest("GET", "HTTPBIN_URL/redirect/20").then(() => log("!!!!!!!!!!!!!!!"+j)) + } + `)) + require.NoError(t, err) +} + +func TestAsyncRequestErrors(t *testing.T) { + // This likely should have a way to do the same for http.request and http.asyncRequest with the same tests + t.Parallel() + t.Run("Invalid", func(t *testing.T) { + t.Parallel() + t.Run("unsupported protocol", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + state := ts.runtime.VU.State() + + hook := logtest.NewLocal(state.Logger) + defer hook.Reset() + + err := runOnEventLoop(ts.runtime, ` + try { + http.asyncRequest("", "").catch((e) => globalThis.promiseRejected = e ) + } catch (e) { + globalThis.exceptionThrown = e + } + `) + require.NoError(t, err) + promiseRejected := ts.runtime.VU.Runtime().Get("promiseRejected") + exceptionThrown := ts.runtime.VU.Runtime().Get("exceptionThrown") + require.NotNil(t, promiseRejected) + require.True(t, promiseRejected.ToBoolean()) + require.Nil(t, exceptionThrown) + assert.Contains(t, promiseRejected.ToString(), "unsupported protocol scheme") + + logEntry := hook.LastEntry() + assert.Nil(t, logEntry) + }) + + t.Run("throw=false", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + state := ts.runtime.VU.State() + hook := logtest.NewLocal(state.Logger) + defer hook.Reset() + + err := runOnEventLoop(ts.runtime, ` + var res = await http.asyncRequest("GET", "some://example.com", null, { throw: false }); + if (res.error.search('unsupported protocol scheme "some"') == -1) { + throw new Error("wrong error:" + res.error); + } + throw new Error("another error"); + `) + require.ErrorContains(t, err, "another error") + + logEntry := hook.LastEntry() + if assert.NotNil(t, logEntry) { + assert.Equal(t, logrus.WarnLevel, logEntry.Level) + err, ok := logEntry.Data["error"].(error) + require.True(t, ok) + assert.ErrorContains(t, err, "unsupported protocol scheme") + assert.Equal(t, "Request Failed", logEntry.Message) + } + }) + }) + t.Run("InvalidURL", func(t *testing.T) { + t.Parallel() + + expErr := `invalid URL: parse "https:// test.k6.io": invalid character " " in host name` + t.Run("throw=true", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + + js := ` + try { + http.asyncRequest("GET", "https:// test.k6.io").catch((e) => globalThis.promiseRejected = e ) + } catch (e) { + globalThis.exceptionThrown = e + } + ` + err := runOnEventLoop(ts.runtime, js) + require.NoError(t, err) + promiseRejected := ts.runtime.VU.Runtime().Get("promiseRejected") + exceptionThrown := ts.runtime.VU.Runtime().Get("exceptionThrown") + require.NotNil(t, promiseRejected) + require.True(t, promiseRejected.ToBoolean()) + require.Nil(t, exceptionThrown) + assert.Contains(t, promiseRejected.ToString(), expErr) + }) + + t.Run("throw=false", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + rt := ts.runtime.VU.Runtime() + state := ts.runtime.VU.State() + state.Options.Throw.Bool = false + defer func() { state.Options.Throw.Bool = true }() + + hook := logtest.NewLocal(state.Logger) + defer hook.Reset() + + js := ` + var r = await http.asyncRequest("GET", "https:// test.k6.io"); + globalThis.ret = {error: r.error, error_code: r.error_code}; + ` + err := runOnEventLoop(ts.runtime, js) + require.NoError(t, err) + ret := rt.GlobalObject().Get("ret") + var retobj map[string]interface{} + var ok bool + if retobj, ok = ret.Export().(map[string]interface{}); !ok { + require.Fail(t, "got wrong return object: %#+v", retobj) + } + require.Equal(t, int64(1020), retobj["error_code"]) + require.Equal(t, expErr, retobj["error"]) + + logEntry := hook.LastEntry() + require.NotNil(t, logEntry) + assert.Equal(t, logrus.WarnLevel, logEntry.Level) + err, ok = logEntry.Data["error"].(error) + require.True(t, ok) + assert.ErrorContains(t, err, expErr) + assert.Equal(t, "Request Failed", logEntry.Message) + }) + + t.Run("throw=false,nopanic", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + rt := ts.runtime.VU.Runtime() + state := ts.runtime.VU.State() + state.Options.Throw.Bool = false + defer func() { state.Options.Throw.Bool = true }() + + hook := logtest.NewLocal(state.Logger) + defer hook.Reset() + + js := ` + var r = await http.asyncRequest("GET", "https:// test.k6.io"); + r.html(); + r.json(); + globalThis.ret = r.error_code; // not reached because of json() + ` + err := runOnEventLoop(ts.runtime, js) + ret := rt.GlobalObject().Get("ret") + require.Error(t, err) + assert.Nil(t, ret) + assert.Contains(t, err.Error(), "unexpected end of JSON input") + + logEntry := hook.LastEntry() + require.NotNil(t, logEntry) + assert.Equal(t, logrus.WarnLevel, logEntry.Level) + err, ok := logEntry.Data["error"].(error) + require.True(t, ok) + assert.ErrorContains(t, err, expErr) + assert.Equal(t, "Request Failed", logEntry.Message) + }) + }) + + t.Run("Unroutable", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + err := runOnEventLoop(ts.runtime, ` + try { + http.asyncRequest("GET", "http://sdafsgdhfjg/").catch((e) => globalThis.promiseRejected = e ) + } catch (e) { + globalThis.exceptionThrown = e + }`) + expErr := "lookup sdafsgdhfjg" + require.NoError(t, err) + promiseRejected := ts.runtime.VU.Runtime().Get("promiseRejected") + exceptionThrown := ts.runtime.VU.Runtime().Get("exceptionThrown") + require.NotNil(t, promiseRejected) + require.True(t, promiseRejected.ToBoolean()) + require.Nil(t, exceptionThrown) + assert.Contains(t, promiseRejected.ToString(), expErr) + }) +} diff --git a/js/modules/k6/http/http.go b/js/modules/k6/http/http.go index be95412f597..87f65670328 100644 --- a/js/modules/k6/http/http.go +++ b/js/modules/k6/http/http.go @@ -86,6 +86,7 @@ func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { mustExport("del", mi.defaultClient.getMethodClosure(http.MethodDelete)) mustExport("options", mi.defaultClient.getMethodClosure(http.MethodOptions)) mustExport("request", mi.defaultClient.Request) + mustExport("asyncRequest", mi.defaultClient.asyncRequest) mustExport("batch", mi.defaultClient.Batch) mustExport("setResponseCallback", mi.defaultClient.SetResponseCallback) diff --git a/js/modules/k6/http/request.go b/js/modules/k6/http/request.go index 6abca5d40ec..fc54e820af2 100644 --- a/js/modules/k6/http/request.go +++ b/js/modules/k6/http/request.go @@ -39,38 +39,85 @@ func (c *Client) Request(method string, url goja.Value, args ...goja.Value) (*Re if state == nil { return nil, ErrHTTPForbiddenInInitContext } + body, params := splitRequestArgs(args) - var body interface{} - var params goja.Value + req, err := c.parseRequest(method, url, body, params) + if err != nil { + return c.handleParseRequestError(err) + } + resp, err := httpext.MakeRequest(c.moduleInstance.vu.Context(), state, req) + if err != nil { + return nil, err + } + c.processResponse(resp, req.ResponseType) + return c.responseFromHTTPext(resp), nil +} + +func splitRequestArgs(args []goja.Value) (body interface{}, params goja.Value) { if len(args) > 0 { body = args[0].Export() } if len(args) > 1 { params = args[1] } + return body, params +} + +func (c *Client) handleParseRequestError(err error) (*Response, error) { + state := c.moduleInstance.vu.State() + + if state.Options.Throw.Bool { + return nil, err + } + state.Logger.WithField("error", err).Warn("Request Failed") + r := httpext.NewResponse() + r.Error = err.Error() + var k6e httpext.K6Error + if errors.As(err, &k6e) { + r.ErrorCode = int(k6e.Code) + } + return &Response{Response: r, client: c}, nil +} +// asyncRequest makes an http request of the provided `method` and returns a promise. All the networking is done off +// the event loop and the returned promise will be resolved with the response or rejected with an error +func (c *Client) asyncRequest(method string, url goja.Value, args ...goja.Value) (*goja.Promise, error) { + state := c.moduleInstance.vu.State() + if c.moduleInstance.vu.State() == nil { + return nil, ErrHTTPForbiddenInInitContext + } + + body, params := splitRequestArgs(args) + rt := c.moduleInstance.vu.Runtime() req, err := c.parseRequest(method, url, body, params) + p, resolve, reject := rt.NewPromise() if err != nil { - if state.Options.Throw.Bool { - return nil, err + var resp *Response + if resp, err = c.handleParseRequestError(err); err != nil { + reject(err) + } else { + resolve(resp) } - state.Logger.WithField("error", err).Warn("Request Failed") - r := httpext.NewResponse() - r.Error = err.Error() - var k6e httpext.K6Error - if errors.As(err, &k6e) { - r.ErrorCode = int(k6e.Code) - } - return &Response{Response: r, client: c}, nil + return p, nil } - resp, err := httpext.MakeRequest(c.moduleInstance.vu.Context(), state, req) - if err != nil { - return nil, err - } - c.processResponse(resp, req.ResponseType) - return c.responseFromHTTPext(resp), nil + callback := c.moduleInstance.vu.RegisterCallback() + + go func() { + resp, err := httpext.MakeRequest(c.moduleInstance.vu.Context(), state, req) + callback(func() error { + if err != nil { + reject(err) + return nil //nolint:nilerr // we want to reject the promise in this case + } + c.processResponse(resp, req.ResponseType) + resolve(c.responseFromHTTPext(resp)) + return nil + }) + }() + + return p, nil } // processResponse stores the body as an ArrayBuffer if indicated by @@ -87,6 +134,7 @@ func (c *Client) responseFromHTTPext(resp *httpext.Response) *Response { } // TODO: break this function up +// //nolint:gocyclo, cyclop, funlen, gocognit func (c *Client) parseRequest( method string, reqURL, body interface{}, params goja.Value, diff --git a/js/modules/k6/http/response_callback_test.go b/js/modules/k6/http/response_callback_test.go index 7dfeb1c09f5..c333173262a 100644 --- a/js/modules/k6/http/response_callback_test.go +++ b/js/modules/k6/http/response_callback_test.go @@ -3,6 +3,7 @@ package http import ( "fmt" "sort" + "strings" "testing" "github.com/dop251/goja" @@ -85,7 +86,6 @@ func TestResponseCallbackInAction(t *testing.T) { ts := newTestCase(t) tb := ts.tb samples := ts.samples - rt := ts.runtime.VU.Runtime() sr := tb.Replacer.Replace @@ -138,7 +138,7 @@ func TestResponseCallbackInAction(t *testing.T) { "overwrite per request": { code: ` http.setResponseCallback(http.expectedStatuses(200)); - res = http.request("GET", "HTTPBIN_URL/redirect/1"); + http.request("GET", "HTTPBIN_URL/redirect/1"); `, expectedSamples: []expectedSample{ { @@ -227,7 +227,7 @@ func TestResponseCallbackInAction(t *testing.T) { "global overwrite with null": { code: ` http.setResponseCallback(null); - res = http.request("GET", "HTTPBIN_URL/redirect/1"); + http.request("GET", "HTTPBIN_URL/redirect/1"); `, expectedSamples: []expectedSample{ { @@ -257,10 +257,16 @@ func TestResponseCallbackInAction(t *testing.T) { } for name, testCase := range testCases { testCase := testCase - t.Run(name, func(t *testing.T) { + + runCode := func(code string) { + t.Helper() ts.instance.defaultClient.responseCallback = defaultExpectedStatuses.match - _, err := rt.RunString(sr(testCase.code)) + err := ts.runtime.EventLoop.Start(func() error { + _, err := ts.runtime.VU.Runtime().RunString(sr(code)) + return err + }) + ts.runtime.EventLoop.WaitOnRegistered() assert.NoError(t, err) bufSamples := metrics.GetBufferedSamples(samples) @@ -278,6 +284,12 @@ func TestResponseCallbackInAction(t *testing.T) { for i, expectedSample := range testCase.expectedSamples { assertRequestMetricsEmittedSingle(t, bufSamples[i], expectedSample.tags, expectedSample.metrics, nil) } + } + t.Run(name, func(t *testing.T) { + runCode(testCase.code) + }) + t.Run("async_"+name, func(t *testing.T) { + runCode(strings.ReplaceAll(testCase.code, "http.request", "http.asyncRequest")) }) } }