Skip to content

Commit

Permalink
Be more lenient with custom parameter sources
Browse files Browse the repository at this point in the history
Closes #354
  • Loading branch information
danielmitterdorfer committed Nov 9, 2017
1 parent 3ef40d9 commit fb31a30
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 17 deletions.
38 changes: 21 additions & 17 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ def __repr__(self, *args, **kwargs):
return "user-defined runner for [%s]" % self.name


def mandatory(params, key, op):
try:
return params[key]
except KeyError:
raise exceptions.DataError("Parameter source for operation '%s' did not provide the mandatory parameter '%s'. Please add it to your"
" parameter source." % (op, key))


class BulkIndex(Runner):
"""
Bulk indexes the given documents.
Expand Down Expand Up @@ -245,12 +253,8 @@ def __call__(self, es, params):
if "pipeline" in params:
bulk_params["pipeline"] = params["pipeline"]

with_action_metadata = params["action_metadata_present"]
try:
bulk_size = params["bulk-size"]
except KeyError:
raise exceptions.DataError(
"Bulk parameter source did not provide a 'bulk-size' parameter. Please add it to your parameter source.")
with_action_metadata = mandatory(params, "action_metadata_present", "bulk-index")
bulk_size = mandatory(params, "bulk-size", "bulk-index")

if with_action_metadata:
# only half of the lines are documents
Expand Down Expand Up @@ -372,15 +376,14 @@ def __call__(self, es, params):
logger.info("Force merging all indices.")
import elasticsearch
try:
if ("max_num_segments" in params):
if "max_num_segments" in params:
es.indices.forcemerge(index="_all", max_num_segments=params["max_num_segments"])
else:
es.indices.forcemerge(index="_all")
except elasticsearch.TransportError as e:
# this is caused by older versions of Elasticsearch (< 2.1), fall back to optimize
if e.status_code == 400:
# es.indices.optimize(index="_all")
if ("max_num_segments" in params):
if "max_num_segments" in params:
es.transport.perform_request("POST", "/_optimize?max_num_segments=%s" % (params["max_num_segments"]))
else:
es.transport.perform_request("POST", "/_optimize")
Expand Down Expand Up @@ -460,11 +463,12 @@ def __call__(self, es, params):

def request_body_query(self, es, params):
request_params = params.get("request_params", {})
if "use_request_cache" in params:
request_params["request_cache"] = params["use_request_cache"]
r = es.search(
index=params["index"],
doc_type=params["type"],
request_cache=params["use_request_cache"],
body=params["body"],
index=params.get("index", "_all"),
doc_type=params.get("type"),
body=mandatory(params, "body", "query"),
**request_params)
hits = r["hits"]["total"]
return {
Expand All @@ -488,13 +492,13 @@ def scroll_query(self, es, params):
for page in range(total_pages):
if page == 0:
r = es.search(
index=params["index"],
doc_type=params["type"],
body=params["body"],
index=params.get("index", "_all"),
doc_type=params.get("type"),
body=mandatory(params, "body", "query"),
sort="_doc",
scroll="10s",
size=params["items_per_page"],
request_cache=params["use_request_cache"],
request_cache=params.get("use_request_cache"),
**request_params
)
# This should only happen if we concurrently create an index and start searching
Expand Down
134 changes: 134 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest import TestCase

from esrally.driver import runner
from esrally import exceptions


class RegisterRunnerTests(TestCase):
Expand Down Expand Up @@ -48,6 +49,53 @@ def __str__(self):


class BulkIndexRunnerTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
def test_bulk_index_missing_params(self, es):
es.bulk.return_value = {
"errors": False
}
bulk = runner.BulkIndex()

bulk_params = {
"body": [
"action_meta_data",
"index_line",
"action_meta_data",
"index_line",
"action_meta_data",
"index_line"
]
}

with self.assertRaises(exceptions.DataError) as ctx:
bulk(es, bulk_params)
self.assertEqual("Parameter source for operation 'bulk-index' did not provide the mandatory parameter 'action_metadata_present'. "
"Please add it to your parameter source.", ctx.exception.args[0])

@mock.patch("elasticsearch.Elasticsearch")
def test_bulk_index_missing_params(self, es):
es.bulk.return_value = {
"errors": False
}
bulk = runner.BulkIndex()

bulk_params = {
"body": [
"action_meta_data",
"index_line",
"action_meta_data",
"index_line",
"action_meta_data",
"index_line"
],
"action_metadata_present": True,
}

with self.assertRaises(exceptions.DataError) as ctx:
bulk(es, bulk_params)
self.assertEqual("Parameter source for operation 'bulk-index' did not provide the mandatory parameter 'bulk-size'. "
"Please add it to your parameter source.", ctx.exception.args[0])

@mock.patch("elasticsearch.Elasticsearch")
def test_bulk_index_success_with_metadata(self, es):
es.bulk.return_value = {
Expand Down Expand Up @@ -469,6 +517,44 @@ def test_mixed_bulk_with_detailed_stats(self, es):


class QueryRunnerTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
def test_query_match_only_request_body_defined(self, es):
es.search.return_value = {
"timed_out": False,
"took": 5,
"hits": {
"total": 2,
"hits": [
{
"some-doc-1"
},
{
"some-doc-2"
}
]
}
}

query_runner = runner.Query()

params = {
"body": {
"query": {
"match_all": {}
}
}
}

with query_runner:
result = query_runner(es, params)

self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertEqual(2, result["hits"])
self.assertFalse(result["timed_out"])
self.assertEqual(5, result["took"])
self.assertFalse("error-type" in result)

@mock.patch("elasticsearch.Elasticsearch")
def test_query_match_all(self, es):
es.search.return_value = {
Expand Down Expand Up @@ -561,6 +647,54 @@ def test_scroll_query_only_one_page(self, es):
self.assertFalse(results["timed_out"])
self.assertFalse("error-type" in results)

@mock.patch("elasticsearch.Elasticsearch")
def test_scroll_query_only_one_page_only_request_body_defined(self, es):
# page 1
es.search.return_value = {
"_scroll_id": "some-scroll-id",
"took": 4,
"timed_out": False,
"hits": {
"hits": [
{
"some-doc-1"
},
{
"some-doc-2"
}
]
}
}
es.transport.perform_request.side_effect = [
# delete scroll id response
{
"acknowledged": True
}
]

query_runner = runner.Query()

params = {
"pages": 1,
"items_per_page": 100,
"body": {
"query": {
"match_all": {}
}
}
}

with query_runner:
results = query_runner(es, params)

self.assertEqual(1, results["weight"])
self.assertEqual(1, results["pages"])
self.assertEqual(2, results["hits"])
self.assertEqual(4, results["took"])
self.assertEqual("ops", results["unit"])
self.assertFalse(results["timed_out"])
self.assertFalse("error-type" in results)

@mock.patch("elasticsearch.Elasticsearch")
def test_scroll_query_with_explicit_number_of_pages(self, es):
# page 1
Expand Down

0 comments on commit fb31a30

Please sign in to comment.