Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve response processing #941

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ With the operation type ``search`` you can execute `request body searches <http:
* ``detailed-results`` (optional, defaults to ``false``): Records more detailed meta-data about queries. As it analyzes the corresponding response in more detail, this might incur additional overhead which can skew measurement results. This flag is ineffective for scroll queries.
* ``pages`` (optional): Number of pages to retrieve. If this parameter is present, a scroll query will be executed. If you want to retrieve all result pages, use the value "all".
* ``results-per-page`` (optional): Number of documents to retrieve per page for scroll queries.
* ``response-compression-enabled`` (optional, defaults to ``true``): Allows to disable HTTP compression of scroll responses. As these responses are sometimes large and decompression may be a bottleneck on the client, it is possible to turn off response compression. This option is ineffective for regular queries.

If ``detailed-results`` is set to ``true``, the following meta-data properties will be determined and stored:

Expand Down
15 changes: 14 additions & 1 deletion esrally/async_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@
from elasticsearch.connection.http_urllib3 import create_ssl_context


class RawClientResponse(aiohttp.ClientResponse):
"""
Returns the body as bytes object (instead of a str) to avoid decoding overhead.
"""
async def text(self, encoding=None, errors="strict"):
"""Read response payload and decode."""
if self._body is None:
await self.read()

return self._body


# This is only needed because https://github.com/elastic/elasticsearch-py-async/pull/68 is not merged yet
# In addition we have raised the connection limit in TCPConnector from 100 to 10000.

Expand Down Expand Up @@ -82,7 +94,8 @@ def __init__(self, host='localhost', port=9200, http_auth=None,
limit=100000
),
headers=headers,
trace_configs=trace_configs
trace_configs=trace_configs,
response_class=RawClientResponse
)

self.base_url = 'http%s://%s:%d%s' % (
Expand Down
2 changes: 1 addition & 1 deletion esrally/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class LazyJSONSerializer(JSONSerializer):
def loads(self, s):
meta = RallyAsyncElasticsearch.request_context.get()
if "raw_response" in meta:
return io.StringIO(s)
return io.BytesIO(s)
else:
return super().loads(s)

Expand Down
36 changes: 22 additions & 14 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ async def request_body_query(self, es, params):
es.return_raw_response()

if doc_type is not None:
r = await self._search_type_fallback(es, doc_type, index, body, params)
r = await self._raw_search(es, doc_type, index, body, params)
else:
r = await es.search(index=index, body=body, params=params)

Expand Down Expand Up @@ -816,6 +816,11 @@ async def scroll_query(self, es, params):
# explicitly convert to int to provoke an error otherwise
total_pages = sys.maxsize if params["pages"] == "all" else int(params["pages"])
size = params.get("results-per-page")
# reduces overhead due to decompression of very large responses
if params.get("response-compression-enabled", True):
headers = None
else:
headers = {"Accept-Encoding": "identity"}
scroll_id = None

# disable eager response parsing - responses might be huge thus skewing results
Expand All @@ -830,13 +835,10 @@ async def scroll_query(self, es, params):
scroll = "10s"
doc_type = params.get("type")
params = request_params
if doc_type is not None:
params["sort"] = sort
params["scroll"] = scroll
params["size"] = size
r = await self._search_type_fallback(es, doc_type, index, body, params)
else:
r = await es.search(index=index, body=body, params=params, sort=sort, scroll=scroll, size=size)
params["sort"] = sort
params["scroll"] = scroll
params["size"] = size
r = await self._raw_search(es, doc_type, index, body, params, headers=headers)

props = parse(r,
["_scroll_id", "hits.total", "hits.total.value", "hits.total.relation", "timed_out", "took"],
Expand All @@ -848,7 +850,9 @@ async def scroll_query(self, es, params):
took = props.get("took", 0)
all_results_collected = (size is not None and hits < size) or hits == 0
else:
r = await es.scroll(body={"scroll_id": scroll_id, "scroll": "10s"})
r = await es.transport.perform_request("GET", "/_search/scroll",
body={"scroll_id": scroll_id, "scroll": "10s"},
headers=headers)
props = parse(r, ["hits.total", "hits.total.value", "hits.total.relation", "timed_out", "took"], ["hits.hits"])
timed_out = timed_out or props.get("timed_out", False)
took += props.get("took", 0)
Expand Down Expand Up @@ -876,11 +880,15 @@ async def scroll_query(self, es, params):
"took": took
}

async def _search_type_fallback(self, es, doc_type, index, body, params):
if doc_type and not index:
index = "_all"
path = "/%s/%s/_search" % (index, doc_type)
return await es.transport.perform_request("GET", path, params=params, body=body)
async def _raw_search(self, es, doc_type, index, body, params, headers=None):
components = []
if index:
components.append(index)
if doc_type:
components.append(doc_type)
components.append("_search")
path = "/".join(components)
return await es.transport.perform_request("GET", "/" + path, params=params, body=body, headers=headers)

def _default_request_params(self, params):
request_params = params.get("request-params", {})
Expand Down
65 changes: 35 additions & 30 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1380,9 +1380,10 @@ async def test_query_match_all_doc_type_fallback(self, es):
self.assertFalse("error-type" in result)

es.transport.perform_request.assert_called_once_with(
'GET', '/unittest/type/_search',
"GET", "/unittest/type/_search",
body=params["body"],
params={}
params={},
headers=None
)
es.clear_scroll.assert_not_called()

Expand Down Expand Up @@ -1410,7 +1411,7 @@ async def test_scroll_query_only_one_page(self, es):
}
}

es.search.return_value = as_future(io.StringIO(json.dumps(search_response)))
es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response)))
es.clear_scroll.return_value = as_future(io.StringIO('{"acknowledged": true}'))

query_runner = runner.Query()
Expand Down Expand Up @@ -1439,15 +1440,12 @@ async def test_scroll_query_only_one_page(self, es):
self.assertFalse(results["timed_out"])
self.assertFalse("error-type" in results)

es.search.assert_called_once_with(
index="unittest",
es.transport.perform_request.assert_called_once_with(
"GET",
"/unittest/_search",
params={"request_cache": "true", "sort": "_doc", "scroll": "10s", "size": 100},
body=params["body"],
scroll="10s",
size=100,
sort='_doc',
params={
"request_cache": "true"
}
headers=None
)
es.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]})

Expand Down Expand Up @@ -1475,7 +1473,7 @@ async def test_scroll_query_no_request_cache(self, es):
}
}

es.search.return_value = as_future(io.StringIO(json.dumps(search_response)))
es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response)))
es.clear_scroll.return_value = as_future(io.StringIO('{"acknowledged": true}'))

query_runner = runner.Query()
Expand All @@ -1484,6 +1482,7 @@ async def test_scroll_query_no_request_cache(self, es):
"pages": 1,
"results-per-page": 100,
"index": "unittest",
"response-compression-enabled": False,
"body": {
"query": {
"match_all": {}
Expand All @@ -1503,13 +1502,12 @@ async def test_scroll_query_no_request_cache(self, es):
self.assertFalse(results["timed_out"])
self.assertFalse("error-type" in results)

es.search.assert_called_once_with(
index="unittest",
es.transport.perform_request.assert_called_once_with(
"GET",
"/unittest/_search",
params={"sort": "_doc", "scroll": "10s", "size": 100},
body=params["body"],
scroll="10s",
size=100,
sort='_doc',
params={}
headers={"Accept-Encoding": "identity"}
)
es.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]})

Expand Down Expand Up @@ -1537,7 +1535,7 @@ async def test_scroll_query_only_one_page_only_request_body_defined(self, es):
}
}

es.search.return_value = as_future(io.StringIO(json.dumps(search_response)))
es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response)))
es.clear_scroll.return_value = as_future(io.StringIO('{"acknowledged": true}'))

query_runner = runner.Query()
Expand All @@ -1564,13 +1562,12 @@ async def test_scroll_query_only_one_page_only_request_body_defined(self, es):
self.assertFalse(results["timed_out"])
self.assertFalse("error-type" in results)

es.search.assert_called_once_with(
index="_all",
es.transport.perform_request.assert_called_once_with(
"GET",
"/_all/_search",
params={"sort": "_doc", "scroll": "10s", "size": 100},
body=params["body"],
scroll="10s",
size=100,
sort='_doc',
params={}
headers=None
)

es.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]})
Expand Down Expand Up @@ -1599,7 +1596,6 @@ async def test_scroll_query_with_explicit_number_of_pages(self, es):
]
}
}
es.search.return_value = as_future(io.StringIO(json.dumps(search_response)))

# page 2
scroll_response = {
Expand All @@ -1614,7 +1610,12 @@ async def test_scroll_query_with_explicit_number_of_pages(self, es):
]
}
}
es.scroll.return_value = as_future(io.StringIO(json.dumps(scroll_response)))

es.transport.perform_request.side_effect = [
as_future(io.StringIO(json.dumps(search_response))),
as_future(io.StringIO(json.dumps(scroll_response)))
]

es.clear_scroll.return_value = as_future(io.StringIO('{"acknowledged": true}'))

query_runner = runner.Query()
Expand Down Expand Up @@ -1667,7 +1668,7 @@ async def test_scroll_query_cannot_clear_scroll(self, es):
}
}

es.search.return_value = as_future(io.StringIO(json.dumps(search_response)))
es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response)))
es.clear_scroll.return_value = as_future(exception=elasticsearch.ConnectionTimeout())

query_runner = runner.Query()
Expand Down Expand Up @@ -1726,7 +1727,7 @@ async def test_scroll_query_request_all_pages(self, es):
]
}
}
es.search.return_value = as_future(io.StringIO(json.dumps(search_response)))

# page 2 has no results
scroll_response = {
"_scroll_id": "some-scroll-id",
Expand All @@ -1736,7 +1737,11 @@ async def test_scroll_query_request_all_pages(self, es):
"hits": []
}
}
es.scroll.return_value = as_future(io.StringIO(json.dumps(scroll_response)))

es.transport.perform_request.side_effect = [
as_future(io.StringIO(json.dumps(search_response))),
as_future(io.StringIO(json.dumps(scroll_response)))
]
es.clear_scroll.return_value = as_future(io.StringIO('{"acknowledged": true}'))

query_runner = runner.Query()
Expand Down