From 6d2c5a6a4ac3040ac5487d2684f88a47ac6c3998 Mon Sep 17 00:00:00 2001 From: Mehmet Nuri Deveci <5735811+mndeveci@users.noreply.github.com> Date: Tue, 30 Jan 2024 17:22:08 -0800 Subject: [PATCH 1/4] fix: lock invocation per function for not having concurrent requests --- samcli/local/docker/container.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/samcli/local/docker/container.py b/samcli/local/docker/container.py index 1497475f18..a95d951c1a 100644 --- a/samcli/local/docker/container.py +++ b/samcli/local/docker/container.py @@ -31,6 +31,11 @@ CONTAINER_CONNECTION_TIMEOUT = float(os.environ.get("SAM_CLI_CONTAINER_CONNECTION_TIMEOUT", 20)) DEFAULT_CONTAINER_HOST_INTERFACE = "127.0.0.1" +# Keep a lock instance to access the locks for individual containers (see dict below) +CONCURRENT_CALL_MANAGER_LOCK = threading.Lock() +# Keeps locks per container (aka per function) so that one function can be invoked one at a time +CONCURRENT_CALL_MANAGER: Dict[str, threading.Lock] = {} + class ContainerResponseException(Exception): """ @@ -378,11 +383,22 @@ def wait_for_http_response(self, name, event, stdout) -> Tuple[Union[str, bytes] # NOTE(sriram-mv): There is a connection timeout set on the http call to `aws-lambda-rie`, however there is not # a read time out for the response received from the server. - resp = requests.post( - self.URL.format(host=self._container_host, port=self.rapid_port_host, function_name="function"), - data=event.encode("utf-8"), - timeout=(self.RAPID_CONNECTION_TIMEOUT, None), - ) + # generate a lock key with host-port combination which is unique per function + lock_key = f"{self._container_host}-{self.rapid_port_host}" + LOG.debug("Getting lock for the key %s", lock_key) + with CONCURRENT_CALL_MANAGER_LOCK: + lock = CONCURRENT_CALL_MANAGER.get(lock_key) + if not lock: + lock = threading.Lock() + CONCURRENT_CALL_MANAGER[lock_key] = lock + LOG.debug("Waiting to retrieve the lock (%s) to start invocation", lock_key) + with lock: + resp = requests.post( + self.URL.format(host=self._container_host, port=self.rapid_port_host, function_name="function"), + data=event.encode("utf-8"), + timeout=(self.RAPID_CONNECTION_TIMEOUT, None), + ) + try: # if response is an image then json.loads/dumps will throw a UnicodeDecodeError so return raw content if "image" in resp.headers["Content-Type"]: From 5e1290f453d7b4d750a8a0f9f4b136dea2e62b44 Mon Sep 17 00:00:00 2001 From: Mehmet Nuri Deveci <5735811+mndeveci@users.noreply.github.com> Date: Wed, 31 Jan 2024 16:37:22 -0800 Subject: [PATCH 2/4] update integration tests to run with both eager and lazy mode --- .../local/start_api/test_start_api.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/integration/local/start_api/test_start_api.py b/tests/integration/local/start_api/test_start_api.py index c0b5fb44e2..77286b6794 100644 --- a/tests/integration/local/start_api/test_start_api.py +++ b/tests/integration/local/start_api/test_start_api.py @@ -149,10 +149,11 @@ def test_large_input_request_http10(self): @parameterized_class( - ("template_path",), + ("template_path", "container_mode"), [ - ("/testdata/start_api/template.yaml",), - ("/testdata/start_api/cdk/template_cdk.yaml",), + ("/testdata/start_api/template.yaml", "LAZY"), + ("/testdata/start_api/template.yaml", "EAGER"), + ("/testdata/start_api/cdk/template_cdk.yaml", "LAZY"), ], ) class TestParallelRequests(StartApiIntegBaseClass): @@ -164,7 +165,7 @@ def setUp(self): self.url = "http://127.0.0.1:{}".format(self.port) HTTPConnection._http_vsn_str = "HTTP/1.1" - @pytest.mark.flaky(reruns=3) + #@pytest.mark.flaky(reruns=3) @pytest.mark.timeout(timeout=600, method="thread") def test_same_endpoint(self): """ @@ -179,16 +180,15 @@ def test_same_endpoint(self): for _ in range(0, number_of_requests) ] results = [r.result() for r in as_completed(futures)] - end_time = time() - self.assertEqual(len(results), 10) - self.assertGreater(end_time - start_time, 10) - for result in results: self.assertEqual(result.status_code, 200) self.assertEqual(result.json(), {"message": "HelloWorld! I just slept and waking up."}) self.assertEqual(result.raw.version, 11) # Checks if the response is HTTP/1.1 version + # after checking responses now check the time to complete + self.assertEqual(len(results), 10) + self.assertGreater(end_time - start_time, 10) @pytest.mark.flaky(reruns=3) @pytest.mark.timeout(timeout=600, method="thread") From cada0a706486d8ef36d81a07636e74b4b03da0b1 Mon Sep 17 00:00:00 2001 From: Mehmet Nuri Deveci <5735811+mndeveci@users.noreply.github.com> Date: Wed, 31 Jan 2024 16:49:01 -0800 Subject: [PATCH 3/4] formatting --- tests/integration/local/start_api/test_start_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/local/start_api/test_start_api.py b/tests/integration/local/start_api/test_start_api.py index 77286b6794..9547e1c43d 100644 --- a/tests/integration/local/start_api/test_start_api.py +++ b/tests/integration/local/start_api/test_start_api.py @@ -165,7 +165,7 @@ def setUp(self): self.url = "http://127.0.0.1:{}".format(self.port) HTTPConnection._http_vsn_str = "HTTP/1.1" - #@pytest.mark.flaky(reruns=3) + # @pytest.mark.flaky(reruns=3) @pytest.mark.timeout(timeout=600, method="thread") def test_same_endpoint(self): """ From 64b522dd5545bed6ff5bbc19422934dcb51dd965 Mon Sep 17 00:00:00 2001 From: Mehmet Nuri Deveci <5735811+mndeveci@users.noreply.github.com> Date: Thu, 1 Feb 2024 11:34:02 -0800 Subject: [PATCH 4/4] Update tests/integration/local/start_api/test_start_api.py --- tests/integration/local/start_api/test_start_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/local/start_api/test_start_api.py b/tests/integration/local/start_api/test_start_api.py index 9547e1c43d..984a4c639e 100644 --- a/tests/integration/local/start_api/test_start_api.py +++ b/tests/integration/local/start_api/test_start_api.py @@ -165,7 +165,7 @@ def setUp(self): self.url = "http://127.0.0.1:{}".format(self.port) HTTPConnection._http_vsn_str = "HTTP/1.1" - # @pytest.mark.flaky(reruns=3) + @pytest.mark.flaky(reruns=3) @pytest.mark.timeout(timeout=600, method="thread") def test_same_endpoint(self): """