Skip to content

Commit

Permalink
Bulk index all data by default
Browse files Browse the repository at this point in the history
With this commit we also query the parameter source when determining the
default number of iterations. Previously, when the user did not specify
any time-period nor any number of iterations we always defaulted to zero
warmup iterations and one measurement iteration. This lead to surprising
behavior for bulk-indexing when the user forgot to add a warmup time
period because we only issued one bulk request.

Closes elastic#377
  • Loading branch information
danielmitterdorfer committed Mar 9, 2018
1 parent 1ca746a commit d73b733
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 12 deletions.
11 changes: 9 additions & 2 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1230,9 +1230,16 @@ def schedule_for(current_track, task, client_index):
"time period of [%s] seconds." % (task.schedule, task, str(warmup_time_period), str(task.time_period)))
return time_period_based(sched, warmup_time_period, task.time_period, runner_for_op, params_for_op)
else:
warmup_iterations = task.warmup_iterations if task.warmup_iterations else 0
if task.iterations:
iterations = task.iterations
elif params_for_op.size():
iterations = params_for_op.size() - warmup_iterations
else:
iterations = 1
logger.info("Creating iteration-count based schedule with [%s] distribution for [%s] with [%d] warmup iterations and "
"[%d] iterations." % (task.schedule, op, task.warmup_iterations, task.iterations))
return iteration_count_based(sched, task.warmup_iterations, task.iterations, runner_for_op, params_for_op)
"[%d] iterations." % (task.schedule, op, warmup_iterations, iterations))
return iteration_count_based(sched, warmup_iterations, iterations, runner_for_op, params_for_op)


def time_period_based(sched, warmup_time_period, time_period, runner, params):
Expand Down
14 changes: 7 additions & 7 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,11 +625,11 @@ def post_process_for_test_mode(t):
# we need iterate over leaf tasks and await iterating over possible intermediate 'parallel' elements
for leaf_task in task:
# iteration-based schedules are divided among all clients and we should provide at least one iteration for each client.
if leaf_task.warmup_iterations > leaf_task.clients:
if leaf_task.warmup_iterations is not None and leaf_task.warmup_iterations > leaf_task.clients:
count = leaf_task.clients
logger.info("Resetting warmup iterations to %d for [%s]" % (count, str(leaf_task)))
leaf_task.warmup_iterations = count
if leaf_task.iterations > leaf_task.clients:
if leaf_task.iterations is not None and leaf_task.iterations > leaf_task.clients:
count = leaf_task.clients
logger.info("Resetting measurement iterations to %d for [%s]" % (count, str(leaf_task)))
leaf_task.iterations = count
Expand Down Expand Up @@ -1054,8 +1054,8 @@ def _get_challenge_specs(self, track_spec):

def parse_parallel(self, ops_spec, ops, challenge_name):
# use same default values as #parseTask() in case the 'parallel' element did not specify anything
default_warmup_iterations = self._r(ops_spec, "warmup-iterations", error_ctx="parallel", mandatory=False, default_value=0)
default_iterations = self._r(ops_spec, "iterations", error_ctx="parallel", mandatory=False, default_value=1)
default_warmup_iterations = self._r(ops_spec, "warmup-iterations", error_ctx="parallel", mandatory=False)
default_iterations = self._r(ops_spec, "iterations", error_ctx="parallel", mandatory=False)
default_warmup_time_period = self._r(ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False)
default_time_period = self._r(ops_spec, "time-period", error_ctx="parallel", mandatory=False)
clients = self._r(ops_spec, "clients", error_ctx="parallel", mandatory=False)
Expand All @@ -1079,7 +1079,7 @@ def parse_parallel(self, ops_spec, ops, challenge_name):
"this name exists." % (challenge_name, completed_by))
return track.Parallel(tasks, clients)

def parse_task(self, task_spec, ops, challenge_name, default_warmup_iterations=0, default_iterations=1,
def parse_task(self, task_spec, ops, challenge_name, default_warmup_iterations=None, default_iterations=None,
default_warmup_time_period=None, default_time_period=None, completed_by_name=None):

op_spec = task_spec["operation"]
Expand All @@ -1106,10 +1106,10 @@ def parse_task(self, task_spec, ops, challenge_name, default_warmup_iterations=0
schedule=schedule,
# this is to provide scheduler-specific parameters for custom schedulers.
params=task_spec)
if task.warmup_iterations != default_warmup_iterations and task.time_period is not None:
if task.warmup_iterations is not None and task.time_period is not None:
self._error("Operation '%s' in challenge '%s' defines '%d' warmup iterations and a time period of '%d' seconds. Please do not "
"mix time periods and iterations." % (op.name, challenge_name, task.warmup_iterations, task.time_period))
elif task.warmup_time_period is not None and task.iterations != default_iterations:
elif task.warmup_time_period is not None and task.iterations is not None:
self._error("Operation '%s' in challenge '%s' defines a warmup time period of '%d' seconds and '%d' iterations. Please do not "
"mix time periods and iterations." % (op.name, challenge_name, task.warmup_time_period, task.iterations))

Expand Down
4 changes: 2 additions & 2 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ def size(self):
* It will either run an operation for a pre-determined number of times or
* It can run until the parameter source is exhausted.
In the former case, return just 1. In the latter case, you should determine the number of times that `#params()` will be invoked.
With that number, Rally can show the progress made so far to the user.
In the former case, you should determine the number of times that `#params()` will be invoked. With that number, Rally can show
the progress made so far to the user. In the latter case, return ``None``.
:return: The "size" of this parameter source or ``None`` if should run eternally.
"""
Expand Down
2 changes: 1 addition & 1 deletion esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ def __eq__(self, other):


class Task:
def __init__(self, name, operation, meta_data=None, warmup_iterations=0, iterations=1, warmup_time_period=None, time_period=None,
def __init__(self, name, operation, meta_data=None, warmup_iterations=None, iterations=None, warmup_time_period=None, time_period=None,
clients=1,
completes_parent=False, schedule="deterministic", params=None):
self.name = name
Expand Down
41 changes: 41 additions & 0 deletions tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,47 @@ def test_search_task_two_clients(self):
]
self.assert_schedule(expected_schedule, schedule)

def test_schedule_param_source_determines_iterations_no_warmup(self):
# we neither define any time-period nor any iteration count on the task.
task = track.Task("bulk-index", track.Operation("bulk-index", track.OperationType.Bulk.name, params={"body": ["a"], "size": 3},
param_source="driver-test-param-source"),
clients=1, params={"target-throughput": 4, "clients": 4})

invocations = driver.schedule_for(self.test_track, task, 0)

self.assert_schedule([
(0.0, metrics.SampleType.Normal, 1 / 3, {"body": ["a"], "size": 3}),
(1.0, metrics.SampleType.Normal, 2 / 3, {"body": ["a"], "size": 3}),
(2.0, metrics.SampleType.Normal, 3 / 3, {"body": ["a"], "size": 3}),
], list(invocations))

def test_schedule_param_source_determines_iterations_including_warmup(self):
task = track.Task("bulk-index", track.Operation("bulk-index", track.OperationType.Bulk.name, params={"body": ["a"], "size": 5},
param_source="driver-test-param-source"),
warmup_iterations=2, clients=1, params={"target-throughput": 4, "clients": 4})

invocations = driver.schedule_for(self.test_track, task, 0)

self.assert_schedule([
(0.0, metrics.SampleType.Warmup, 1 / 5, {"body": ["a"], "size": 5}),
(1.0, metrics.SampleType.Warmup, 2 / 5, {"body": ["a"], "size": 5}),
(2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "size": 5}),
(3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "size": 5}),
(4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "size": 5}),
], list(invocations))

def test_schedule_defaults_to_iteration_based(self):
# no time-period and no iterations specified on the task. Also, the parameter source does not define a size.
task = track.Task("bulk-index", track.Operation("bulk-index", track.OperationType.Bulk.name, params={"body": ["a"]},
param_source="driver-test-param-source"),
clients=1, params={"target-throughput": 4, "clients": 4})

invocations = driver.schedule_for(self.test_track, task, 0)

self.assert_schedule([
(0.0, metrics.SampleType.Normal, 1 / 1, {"body": ["a"]}),
], list(invocations))

def test_schedule_for_warmup_time_based(self):
task = track.Task("time-based", track.Operation("time-based", track.OperationType.Bulk.name, params={"body": ["a"], "size": 11},
param_source="driver-test-param-source"),
Expand Down

0 comments on commit d73b733

Please sign in to comment.