From 57d4b2332b6394c46691f8cb54a7a8fda8520c72 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 29 Jan 2020 12:16:35 +0100 Subject: [PATCH 1/2] Make REST API check stricter So far we have used the info API to determine whether the REST API of Elasticsearch is available. However, we might get lucky that a quorum (but not all) of the target hosts are available yet. While certain nodes would then respond to HTTP requests, others might not which can lead to situations where the REST API check succeeds but we run into connection errors later on (because we hit a different host from the connection pool). With this commit we make this check stricter by using the cluster health API and blocking until at least the number of target hosts in the cluster is available. --- esrally/client.py | 25 ++++++++++++++++++------- tests/client_test.py | 29 ++++++++++++++++++++--------- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/esrally/client.py b/esrally/client.py index 61248d3a9..04dc3dfbd 100644 --- a/esrally/client.py +++ b/esrally/client.py @@ -134,23 +134,34 @@ def wait_for_rest_layer(es, max_attempts=20): :param es: Elasticsearch client to use for connecting. :param max_attempts: The maximum number of attempts to check whether the REST API is available. - :return: True iff Elasticsearch is available. + :return: True iff Elasticsearch's REST API is available. """ + # assume that at least the hosts that we expect to contact should be available. Note that this is not 100% + # bullet-proof as a cluster could have e.g. dedicated masters which are not contained in our list of target hosts + # but this is still better than just checking for any random node's REST API being reachable. + expected_node_count = len(es.transport.hosts) + logger = logging.getLogger(__name__) for attempt in range(max_attempts): + logger.debug("REST API is available after %s attempts", attempt) import elasticsearch try: - es.info() + # see also WaitForHttpResource in Elasticsearch tests. Contrary to the ES tests we consider the API also + # available when the cluster status is RED (as long as all required nodes are present) + es.cluster.health(wait_for_nodes=">={}".format(expected_node_count)) + logger.info("REST API is available for >= [%s] nodes after [%s] attempts.", expected_node_count, attempt) return True except elasticsearch.ConnectionError as e: if "SSL: UNKNOWN_PROTOCOL" in str(e): raise exceptions.SystemSetupError("Could not connect to cluster via https. Is this an https endpoint?", e) else: - time.sleep(1) + logger.debug("Got connection error on attempt [%s]. Sleeping...", attempt) + time.sleep(3) except elasticsearch.TransportError as e: - if e.status_code == 503: - time.sleep(1) - elif e.status_code == 401: - time.sleep(1) + # cluster block, x-pack not initialized yet, our wait condition is not reached + if e.status_code in (503, 401, 408): + logger.debug("Got status code [%s] on attempt [%s]. Sleeping...", e.status_code, attempt) + time.sleep(3) else: + logger.warning("Got unexpected status code [%s] on attempt [%s].", e.status_code, attempt) raise e return False diff --git a/tests/client_test.py b/tests/client_test.py index 8f4727e0b..c71436e15 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -277,19 +277,30 @@ def test_create_https_connection_unverified_certificate_present_client_certifica class RestLayerTests(TestCase): - @mock.patch("elasticsearch.Elasticsearch", autospec=True) + @mock.patch("elasticsearch.Elasticsearch") def test_successfully_waits_for_rest_layer(self, es): + es.transport.hosts = [ + {"host": "node-a.example.org", "port": 9200}, + {"host": "node-b.example.org", "port": 9200} + ] + self.assertTrue(client.wait_for_rest_layer(es, max_attempts=3)) + es.cluster.health.assert_has_calls([ + mock.call(wait_for_nodes=">=2"), + ]) + # don't sleep in realtime @mock.patch("time.sleep") - @mock.patch("elasticsearch.Elasticsearch", autospec=True) + @mock.patch("elasticsearch.Elasticsearch") def test_retries_on_transport_errors(self, es, sleep): import elasticsearch - es.info.side_effect = [ + es.cluster.health.side_effect = [ elasticsearch.TransportError(503, "Service Unavailable"), elasticsearch.TransportError(401, "Unauthorized"), + elasticsearch.TransportError(408, "Timed Out"), + elasticsearch.TransportError(408, "Timed Out"), { "version": { "number": "5.0.0", @@ -297,23 +308,23 @@ def test_retries_on_transport_errors(self, es, sleep): } } ] - self.assertTrue(client.wait_for_rest_layer(es, max_attempts=3)) + self.assertTrue(client.wait_for_rest_layer(es, max_attempts=5)) # don't sleep in realtime @mock.patch("time.sleep") - @mock.patch("elasticsearch.Elasticsearch", autospec=True) - def test_dont_retries_eternally_on_transport_errors(self, es, sleep): + @mock.patch("elasticsearch.Elasticsearch") + def test_dont_retry_eternally_on_transport_errors(self, es, sleep): import elasticsearch - es.info.side_effect = elasticsearch.TransportError(401, "Unauthorized") + es.cluster.health.side_effect = elasticsearch.TransportError(401, "Unauthorized") self.assertFalse(client.wait_for_rest_layer(es, max_attempts=3)) - @mock.patch("elasticsearch.Elasticsearch", autospec=True) + @mock.patch("elasticsearch.Elasticsearch") def test_ssl_error(self, es): import elasticsearch import urllib3.exceptions - es.info.side_effect = elasticsearch.ConnectionError("N/A", + es.cluster.health.side_effect = elasticsearch.ConnectionError("N/A", "[SSL: UNKNOWN_PROTOCOL] unknown protocol (_ssl.c:719)", urllib3.exceptions.SSLError( "[SSL: UNKNOWN_PROTOCOL] unknown protocol (_ssl.c:719)")) From cbf6decc1dd68f2a5ff8da3d32e18e9279ce1961 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 29 Jan 2020 13:06:47 +0100 Subject: [PATCH 2/2] Double the number of retries --- esrally/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esrally/client.py b/esrally/client.py index 04dc3dfbd..43bed68ab 100644 --- a/esrally/client.py +++ b/esrally/client.py @@ -128,7 +128,7 @@ def create(self): return elasticsearch.Elasticsearch(hosts=self.hosts, ssl_context=self.ssl_context, **self.client_options) -def wait_for_rest_layer(es, max_attempts=20): +def wait_for_rest_layer(es, max_attempts=40): """ Waits for ``max_attempts`` until Elasticsearch's REST API is available.