diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index cfb506220..112471b46 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -1230,9 +1230,16 @@ def schedule_for(current_track, task, client_index): "time period of [%s] seconds." % (task.schedule, task, str(warmup_time_period), str(task.time_period))) return time_period_based(sched, warmup_time_period, task.time_period, runner_for_op, params_for_op) else: + warmup_iterations = task.warmup_iterations if task.warmup_iterations else 0 + if task.iterations: + iterations = task.iterations + elif params_for_op.size(): + iterations = params_for_op.size() - warmup_iterations + else: + iterations = 1 logger.info("Creating iteration-count based schedule with [%s] distribution for [%s] with [%d] warmup iterations and " - "[%d] iterations." % (task.schedule, op, task.warmup_iterations, task.iterations)) - return iteration_count_based(sched, task.warmup_iterations, task.iterations, runner_for_op, params_for_op) + "[%d] iterations." % (task.schedule, op, warmup_iterations, iterations)) + return iteration_count_based(sched, warmup_iterations, iterations, runner_for_op, params_for_op) def time_period_based(sched, warmup_time_period, time_period, runner, params): diff --git a/esrally/track/loader.py b/esrally/track/loader.py index 013a9875b..c8e7b401f 100644 --- a/esrally/track/loader.py +++ b/esrally/track/loader.py @@ -625,11 +625,11 @@ def post_process_for_test_mode(t): # we need iterate over leaf tasks and await iterating over possible intermediate 'parallel' elements for leaf_task in task: # iteration-based schedules are divided among all clients and we should provide at least one iteration for each client. - if leaf_task.warmup_iterations > leaf_task.clients: + if leaf_task.warmup_iterations is not None and leaf_task.warmup_iterations > leaf_task.clients: count = leaf_task.clients logger.info("Resetting warmup iterations to %d for [%s]" % (count, str(leaf_task))) leaf_task.warmup_iterations = count - if leaf_task.iterations > leaf_task.clients: + if leaf_task.iterations is not None and leaf_task.iterations > leaf_task.clients: count = leaf_task.clients logger.info("Resetting measurement iterations to %d for [%s]" % (count, str(leaf_task))) leaf_task.iterations = count @@ -1054,8 +1054,8 @@ def _get_challenge_specs(self, track_spec): def parse_parallel(self, ops_spec, ops, challenge_name): # use same default values as #parseTask() in case the 'parallel' element did not specify anything - default_warmup_iterations = self._r(ops_spec, "warmup-iterations", error_ctx="parallel", mandatory=False, default_value=0) - default_iterations = self._r(ops_spec, "iterations", error_ctx="parallel", mandatory=False, default_value=1) + default_warmup_iterations = self._r(ops_spec, "warmup-iterations", error_ctx="parallel", mandatory=False) + default_iterations = self._r(ops_spec, "iterations", error_ctx="parallel", mandatory=False) default_warmup_time_period = self._r(ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False) default_time_period = self._r(ops_spec, "time-period", error_ctx="parallel", mandatory=False) clients = self._r(ops_spec, "clients", error_ctx="parallel", mandatory=False) @@ -1079,7 +1079,7 @@ def parse_parallel(self, ops_spec, ops, challenge_name): "this name exists." % (challenge_name, completed_by)) return track.Parallel(tasks, clients) - def parse_task(self, task_spec, ops, challenge_name, default_warmup_iterations=0, default_iterations=1, + def parse_task(self, task_spec, ops, challenge_name, default_warmup_iterations=None, default_iterations=None, default_warmup_time_period=None, default_time_period=None, completed_by_name=None): op_spec = task_spec["operation"] @@ -1106,10 +1106,10 @@ def parse_task(self, task_spec, ops, challenge_name, default_warmup_iterations=0 schedule=schedule, # this is to provide scheduler-specific parameters for custom schedulers. params=task_spec) - if task.warmup_iterations != default_warmup_iterations and task.time_period is not None: + if task.warmup_iterations is not None and task.time_period is not None: self._error("Operation '%s' in challenge '%s' defines '%d' warmup iterations and a time period of '%d' seconds. Please do not " "mix time periods and iterations." % (op.name, challenge_name, task.warmup_iterations, task.time_period)) - elif task.warmup_time_period is not None and task.iterations != default_iterations: + elif task.warmup_time_period is not None and task.iterations is not None: self._error("Operation '%s' in challenge '%s' defines a warmup time period of '%d' seconds and '%d' iterations. Please do not " "mix time periods and iterations." % (op.name, challenge_name, task.warmup_time_period, task.iterations)) diff --git a/esrally/track/params.py b/esrally/track/params.py index 65257f386..c8b662c28 100644 --- a/esrally/track/params.py +++ b/esrally/track/params.py @@ -113,8 +113,8 @@ def size(self): * It will either run an operation for a pre-determined number of times or * It can run until the parameter source is exhausted. - In the former case, return just 1. In the latter case, you should determine the number of times that `#params()` will be invoked. - With that number, Rally can show the progress made so far to the user. + In the former case, you should determine the number of times that `#params()` will be invoked. With that number, Rally can show + the progress made so far to the user. In the latter case, return ``None``. :return: The "size" of this parameter source or ``None`` if should run eternally. """ diff --git a/esrally/track/track.py b/esrally/track/track.py index 9aafbf8c4..cfd1ba0a8 100644 --- a/esrally/track/track.py +++ b/esrally/track/track.py @@ -588,7 +588,7 @@ def __eq__(self, other): class Task: - def __init__(self, name, operation, meta_data=None, warmup_iterations=0, iterations=1, warmup_time_period=None, time_period=None, + def __init__(self, name, operation, meta_data=None, warmup_iterations=None, iterations=None, warmup_time_period=None, time_period=None, clients=1, completes_parent=False, schedule="deterministic", params=None): self.name = name diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index fbfcc08af..981546e71 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -452,6 +452,47 @@ def test_search_task_two_clients(self): ] self.assert_schedule(expected_schedule, schedule) + def test_schedule_param_source_determines_iterations_no_warmup(self): + # we neither define any time-period nor any iteration count on the task. + task = track.Task("bulk-index", track.Operation("bulk-index", track.OperationType.Bulk.name, params={"body": ["a"], "size": 3}, + param_source="driver-test-param-source"), + clients=1, params={"target-throughput": 4, "clients": 4}) + + invocations = driver.schedule_for(self.test_track, task, 0) + + self.assert_schedule([ + (0.0, metrics.SampleType.Normal, 1 / 3, {"body": ["a"], "size": 3}), + (1.0, metrics.SampleType.Normal, 2 / 3, {"body": ["a"], "size": 3}), + (2.0, metrics.SampleType.Normal, 3 / 3, {"body": ["a"], "size": 3}), + ], list(invocations)) + + def test_schedule_param_source_determines_iterations_including_warmup(self): + task = track.Task("bulk-index", track.Operation("bulk-index", track.OperationType.Bulk.name, params={"body": ["a"], "size": 5}, + param_source="driver-test-param-source"), + warmup_iterations=2, clients=1, params={"target-throughput": 4, "clients": 4}) + + invocations = driver.schedule_for(self.test_track, task, 0) + + self.assert_schedule([ + (0.0, metrics.SampleType.Warmup, 1 / 5, {"body": ["a"], "size": 5}), + (1.0, metrics.SampleType.Warmup, 2 / 5, {"body": ["a"], "size": 5}), + (2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "size": 5}), + (3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "size": 5}), + (4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "size": 5}), + ], list(invocations)) + + def test_schedule_defaults_to_iteration_based(self): + # no time-period and no iterations specified on the task. Also, the parameter source does not define a size. + task = track.Task("bulk-index", track.Operation("bulk-index", track.OperationType.Bulk.name, params={"body": ["a"]}, + param_source="driver-test-param-source"), + clients=1, params={"target-throughput": 4, "clients": 4}) + + invocations = driver.schedule_for(self.test_track, task, 0) + + self.assert_schedule([ + (0.0, metrics.SampleType.Normal, 1 / 1, {"body": ["a"]}), + ], list(invocations)) + def test_schedule_for_warmup_time_based(self): task = track.Task("time-based", track.Operation("time-based", track.OperationType.Bulk.name, params={"body": ["a"], "size": 11}, param_source="driver-test-param-source"),