diff --git a/docs/adding_tracks.rst b/docs/adding_tracks.rst index 939c10edc..9139cc42a 100644 --- a/docs/adding_tracks.rst +++ b/docs/adding_tracks.rst @@ -772,13 +772,13 @@ If you need more control, you need to implement a class. Below is the implementa self._cache = params.get("cache", False) # ... but we need to resolve "profession" lazily on each invocation later self._params = params + # Determines whether this parameter source will be "exhausted" at some point or + # Rally can draw values infinitely from it. + self.infinite = True def partition(self, partition_index, total_partitions): return self - def size(self): - return 1 - def params(self): # you must provide all parameters that the runner expects return { @@ -803,17 +803,17 @@ In ``register`` you bind the name in the track specification to your parameter s * The constructor needs to have the signature ``__init__(self, track, params, **kwargs)``. * ``partition(self, partition_index, total_partitions)`` is called by Rally to "assign" the parameter source across multiple clients. Typically you can just return ``self``. If each client needs to act differently then you can provide different parameter source instances here as well. -* ``size(self)``: This method helps Rally to provide a proper progress indication to users if you use a warmup time period. For bulk indexing, return the number of bulks (for a given client). As searches are typically executed with a pre-determined amount of iterations, just return ``1`` in this case. * ``params(self)``: This method returns a dictionary with all parameters that the corresponding "runner" expects. This method will be invoked once for every iteration during the race. In the example, we parameterize the query by randomly selecting a profession from a list. +* ``infinite``: This property helps Rally to determine whether to let the parameter source determine when a task should be finished (when ``infinite`` is ``False``) or whether the task properties (e.g. ``iterations`` or ``time-period``) determine when a task should be finished. In the former case, the parameter source needs to raise ``StopIteration`` to indicate when it is finished. -For cases, where you want to provide a progress indication but cannot calculate ``size`` up-front (e.g. when you generate bulk requests on-the fly up to a certain total size), you can implement a property ``percent_completed`` which returns a floating point value between ``0.0`` and ``1.0``. Rally will query this value before each call to ``params()`` and uses it to indicate progress. However: +For cases, where you want to provide a progress indication (this is typically the case when ``infinite`` is ``False``), you can implement a property ``percent_completed`` which returns a floating point value between ``0.0`` and ``1.0``. Rally will query this value before each call to ``params()`` and uses it to indicate progress. However: * Rally will not check ``percent_completed`` if it can derive progress in any other way. * The value of ``percent_completed`` is purely informational and does not influence when Rally considers an operation to be completed. .. note:: - The method ``params(self)`` is called on a performance-critical path. Don't do anything in this method that takes a lot of time (avoid any I/O). For searches, you should usually throttle throughput anyway and there it does not matter that much but if the corresponding operation is run without throughput throttling, double-check that your custom parameter source does not introduce a bottleneck. + The method ``params(self)`` as well as the property ``percent_completed`` are called on a performance-critical path. Don't do anything that takes a lot of time (avoid any I/O). For searches, you should usually throttle throughput anyway and there it does not matter that much but if the corresponding operation is run without throughput throttling, double-check that your custom parameter source does not introduce a bottleneck. Custom parameter sources can use the Python standard API but using any additional libraries is not supported. diff --git a/docs/migrate.rst b/docs/migrate.rst index 18d268387..56c6d5dbc 100644 --- a/docs/migrate.rst +++ b/docs/migrate.rst @@ -1,6 +1,67 @@ Migration Guide =============== +Migrating to Rally 1.4.0 +------------------------ + +Custom Parameter Sources +^^^^^^^^^^^^^^^^^^^^^^^^ + +With Rally 1.4.0, we have changed the API for custom parameter sources. The ``size()`` method is now deprecated and is instead replaced with a new property called ``infinite``. If you have previously returned ``None`` in ``size()``, ``infinite`` should be set to ``True``, otherwise ``False``. Also, we recommend to implement the property ``percent_completed`` as Rally might not be able to determine progress in some cases. See below for some examples. + +Old:: + + class CustomFiniteParamSource: + # ... + def size(): + return calculate_size() + + def params(): + return next_parameters() + + class CustomInfiniteParamSource: + # ... + def size(): + return None + + # ... + + +New:: + + class CustomFiniteParamSource: + def __init__(self, track, params, **kwargs): + self.infinite = False + # to track progress + self.current_invocation = 0 + + # ... + # Note that we have removed the size() method + + def params(): + self.current_invocation += 1 + return next_parameters() + + # Implementing this is optional but recommended for proper progress reports + @property + def percent_completed(self): + # for demonstration purposes we use calculate_size() here + # to determine the expected number of invocations. However, if + # it is possible to determine this value upfront, it is best + # to cache it in a field and just reuse the value + return self.current_invocation / calculate_size() + + + class CustomInfiniteParamSource: + def __init__(self, track, params, **kwargs): + self.infinite = True + # ... + + # ... + # Note that we have removed the size() method + # ... + + Migrating to Rally 1.3.0 ------------------------ Races now stored by ID instead of timestamp diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index e9d8137a5..5a3acea5a 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -1260,105 +1260,159 @@ def schedule_for(current_track, task, client_index): runner_for_op = runner.runner_for(op.type) params_for_op = track.operation_parameters(current_track, op).partition(client_index, num_clients) - if task.warmup_time_period is not None or task.time_period is not None: + if requires_time_period_schedule(task, params_for_op): warmup_time_period = task.warmup_time_period if task.warmup_time_period else 0 - logger.info("Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] seconds and a " - "time period of [%s] seconds.", task.schedule, task, str(warmup_time_period), str(task.time_period)) - return time_period_based(sched, warmup_time_period, task.time_period, runner_for_op, params_for_op) + logger.info("Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] " + "seconds and a time period of [%s] seconds.", task.schedule, task.name, + str(warmup_time_period), str(task.time_period)) + loop_control = TimePeriodBased(warmup_time_period, task.time_period) else: warmup_iterations = task.warmup_iterations if task.warmup_iterations else 0 if task.iterations: iterations = task.iterations - elif params_for_op.size(): - iterations = params_for_op.size() - warmup_iterations - else: + elif params_for_op.infinite: + # this is usually the case if the parameter source provides a constant iterations = 1 - logger.info("Creating iteration-count based schedule with [%s] distribution for [%s] with [%d] warmup iterations and " - "[%d] iterations." % (task.schedule, op, warmup_iterations, iterations)) - return iteration_count_based(sched, warmup_iterations, iterations, runner_for_op, params_for_op) + else: + iterations = None + logger.info("Creating iteration-count based schedule with [%s] distribution for [%s] with [%s] warmup " + "iterations and [%s] iterations.", task.schedule, task.name, str(warmup_iterations), str(iterations)) + loop_control = IterationBased(warmup_iterations, iterations) + + return generator_for_schedule(task.name, sched, loop_control, runner_for_op, params_for_op) + + +def requires_time_period_schedule(task, params): + if task.warmup_time_period is not None or task.time_period is not None: + return True + # user has explicitly requested iterations + if task.warmup_iterations is not None or task.iterations is not None: + return False + # If the parameter source ends after a finite amount of iterations, we will run with a time-based schedule + return not params.infinite -def time_period_based(sched, warmup_time_period, time_period, runner, params): +def generator_for_schedule(task_name, sched, task_progress_control, runner, params): """ - Calculates the necessary schedule for time period based operations. + Creates a generator that will yield individual task invocations for the provided schedule. - :param sched: The scheduler for this task. Must not be None. - :param warmup_time_period: The time period in seconds that is considered for warmup. Must not be None; provide zero instead. - :param time_period: The time period in seconds that is considered for measurement. May be None. + :param task_name: The name of the task for which the schedule is generated. + :param sched: The scheduler for this task. + :param task_progress_control: Controls how and how often this generator will loop. :param runner: The runner for a given operation. :param params: The parameter source for a given operation. :return: A generator for the corresponding parameters. """ next_scheduled = 0 - start = time.perf_counter() logger = logging.getLogger(__name__) - if time_period is None: - iterations = params.size() - if iterations: - logger.info("No time-period property specified. Will run for at most [%d] iterations.", iterations) - for it in range(0, iterations): - try: - sample_type = metrics.SampleType.Warmup if time.perf_counter() - start < warmup_time_period else metrics.SampleType.Normal - percent_completed = (it + 1) / iterations - yield (next_scheduled, sample_type, percent_completed, runner, params.params()) - next_scheduled = sched.next(next_scheduled) - except StopIteration: - logger.info("Time-period-based schedule stopped due to StopIteration.") - return - logger.info("Time-period-based schedule stopped after the specified number of [%d] iterations.", iterations) - else: - logger.info("No time-period property specified. Will run as long as the parameter source provides values.") - param_source_knows_progress = hasattr(params, "percent_completed") - while True: - try: - sample_type = metrics.SampleType.Warmup if time.perf_counter() - start < warmup_time_period else metrics.SampleType.Normal - # does not contribute at all to completion. Hence, we cannot define completion. - percent_completed = params.percent_completed if param_source_knows_progress else None - yield (next_scheduled, sample_type, percent_completed, runner, params.params()) - next_scheduled = sched.next(next_scheduled) - except StopIteration: - logger.info("Time-period-based schedule stopped due to StopIteration.") - return + if task_progress_control.infinite: + logger.info("Parameter source will determine when the schedule for [%s] terminates.", task_name) + param_source_knows_progress = hasattr(params, "percent_completed") + task_progress_control.start() + while True: + try: + # does not contribute at all to completion. Hence, we cannot define completion. + percent_completed = params.percent_completed if param_source_knows_progress else None + yield (next_scheduled, task_progress_control.sample_type, percent_completed, runner, params.params()) + next_scheduled = sched.next(next_scheduled) + task_progress_control.next() + except StopIteration: + logger.info("%s schedule for [%s] stopped due to StopIteration.", str(task_progress_control), task_name) + return else: - duration = warmup_time_period + time_period - end = start + duration - logger.info("Time-period-based schedule will run for a total of [%d] seconds.", duration) - while time.perf_counter() < end: + task_progress_control.start() + logger.info("%s schedule will determine when the schedule for [%s] terminates.", + str(task_progress_control), task_name) + while not task_progress_control.completed: try: - elapsed = time.perf_counter() - start - sample_type = metrics.SampleType.Warmup if elapsed < warmup_time_period else metrics.SampleType.Normal - percent_completed = elapsed / duration - yield (next_scheduled, sample_type, percent_completed, runner, params.params()) + yield (next_scheduled, + task_progress_control.sample_type, + task_progress_control.percent_completed, + runner, + params.params()) next_scheduled = sched.next(next_scheduled) + task_progress_control.next() except StopIteration: + logger.info("%s schedule for [%s] stopped due to StopIteration.", str(task_progress_control), task_name) return - logger.info("Time-period-based schedule stopped after the specified time period of [%d] seconds.", duration) + logger.info("%s schedule for [%s] stopped regularly.", str(task_progress_control), task_name) -def iteration_count_based(sched, warmup_iterations, iterations, runner, params): - """ - Calculates the necessary schedule based on a given number of iterations. +class TimePeriodBased: + def __init__(self, warmup_time_period, time_period): + self._warmup_time_period = warmup_time_period + self._time_period = time_period + if warmup_time_period is not None and time_period is not None: + self._duration = self._warmup_time_period + self._time_period + else: + self._duration = None + self._start = None + self._now = None - :param sched: The scheduler for this task. Must not be None. - :param warmup_iterations: The number of warmup iterations to run. 0 if no warmup should be performed. - :param iterations: The number of measurement iterations to run. - :param runner: The runner for a given operation. - :param params: The parameter source for a given operation. - :return: A generator for the corresponding parameters. - """ - next_scheduled = 0 - total_iterations = warmup_iterations + iterations - logger = logging.getLogger(__name__) - if total_iterations == 0: - raise exceptions.RallyAssertionError("Operation must run at least for one iteration.") - for it in range(0, total_iterations): - try: - sample_type = metrics.SampleType.Warmup if it < warmup_iterations else metrics.SampleType.Normal - percent_completed = (it + 1) / total_iterations - yield (next_scheduled, sample_type, percent_completed, runner, params.params()) - next_scheduled = sched.next(next_scheduled) - except StopIteration: - logger.info("Iteration-count-based schedule stopped due to StopIteration.") - return - logger.info("Iteration-count-based schedule stopped after [%d] warmup iterations and [%d] iterations.", - warmup_iterations, iterations) + def start(self): + self._now = time.perf_counter() + self._start = self._now + + @property + def _elapsed(self): + return self._now - self._start + + @property + def sample_type(self): + return metrics.SampleType.Warmup if self._elapsed < self._warmup_time_period else metrics.SampleType.Normal + + @property + def infinite(self): + return self._time_period is None + + @property + def percent_completed(self): + return self._elapsed / self._duration + + @property + def completed(self): + return self._now >= (self._start + self._duration) + + def next(self): + self._now = time.perf_counter() + + def __str__(self): + return "time-period-based" + + +class IterationBased: + def __init__(self, warmup_iterations, iterations): + self._warmup_iterations = warmup_iterations + self._iterations = iterations + if warmup_iterations is not None and iterations is not None: + self._total_iterations = self._warmup_iterations + self._iterations + if self._total_iterations == 0: + raise exceptions.RallyAssertionError("Operation must run at least for one iteration.") + else: + self._total_iterations = None + self._it = None + + def start(self): + self._it = 0 + + @property + def sample_type(self): + return metrics.SampleType.Warmup if self._it < self._warmup_iterations else metrics.SampleType.Normal + + @property + def infinite(self): + return self._iterations is None + + @property + def percent_completed(self): + return (self._it + 1) / self._total_iterations + + @property + def completed(self): + return self._it >= self._total_iterations + + def next(self): + self._it += 1 + + def __str__(self): + return "iteration-count-based" diff --git a/esrally/track/params.py b/esrally/track/params.py index abb807f9e..2856ce63a 100644 --- a/esrally/track/params.py +++ b/esrally/track/params.py @@ -100,6 +100,12 @@ def partition(self, partition_index, total_partitions): """ return self + @property + def infinite(self): + # for bwc + return self.size() is None + + # Deprecated def size(self): """ Rally has two modes in which it can run: @@ -520,9 +526,6 @@ def partition(self, partition_index, total_partitions): def params(self): raise exceptions.RallyError("Do not use a BulkIndexParamSource without partitioning") - def size(self): - raise exceptions.RallyError("Do not use a BulkIndexParamSource without partitioning") - class PartitionBulkIndexParamSource: def __init__(self, corpora, partition_index, total_partitions, batch_size, bulk_size, ingest_percentage, @@ -554,16 +557,21 @@ def __init__(self, corpora, partition_index, total_partitions, batch_size, bulk_ self.internal_params = bulk_data_based(total_partitions, partition_index, corpora, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency, pipeline, original_params) + self.current_bulk = 0 + all_bulks = number_of_bulks(self.corpora, self.partition_index, self.total_partitions, self.bulk_size) + self.total_bulks = math.ceil((all_bulks * self.ingest_percentage) / 100) + self.infinite = False def partition(self, partition_index, total_partitions): raise exceptions.RallyError("Cannot partition a PartitionBulkIndexParamSource further") def params(self): + self.current_bulk += 1 return next(self.internal_params) - def size(self): - all_bulks = number_of_bulks(self.corpora, self.partition_index, self.total_partitions, self.bulk_size) - return math.ceil((all_bulks * self.ingest_percentage) / 100) + @property + def percent_completed(self): + return self.current_bulk / self.total_bulks def number_of_bulks(corpora, partition_index, total_partitions, bulk_size): diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index f0d6a7d05..ced3213af 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -33,35 +33,23 @@ def __init__(self, track=None, params=None, **kwargs): params = {} self._indices = track.indices self._params = params + self._current = 1 + self._total = params.get("size") + self.infinite = self._total is None def partition(self, partition_index, total_partitions): return self - def size(self): - return self._params["size"] if "size" in self._params else None + @property + def percent_completed(self): + if self.infinite: + return None + return self._current / self._total def params(self): - return self._params - - -class DriverTestParamSourceWithProgress: - def __init__(self, track=None, params=None, **kwargs): - if params is None: - params = {} - self._indices = track.indices - self._params = params - self.percent_completed = 0.0 - - def partition(self, partition_index, total_partitions): - return self - - def size(self): - return self._params["size"] if "size" in self._params else None - - def params(self): - # just provide a simple progress indication. The important point is - # that we define it at all not so much what the actual values is. - self.percent_completed += 0.01 + if not self.infinite and self._current > self._total: + raise StopIteration() + self._current += 1 return self._params @@ -206,7 +194,10 @@ def test_client_reaches_join_point_which_completes_parent(self): class ScheduleTestCase(TestCase): - def assert_schedule(self, expected_schedule, schedule, eternal_schedule=False): + def assert_schedule(self, expected_schedule, schedule, infinite_schedule=False): + if not infinite_schedule: + self.assertEqual(len(expected_schedule), len(schedule), + msg="Number of elements in the schedules do not match") idx = 0 for invocation_time, sample_type, progress_percent, runner, params in schedule: exp_invocation_time, exp_sample_type, exp_progress_percent, exp_params = expected_schedule[idx] @@ -216,8 +207,8 @@ def assert_schedule(self, expected_schedule, schedule, eternal_schedule=False): self.assertIsNotNone(runner, "runner must be defined") self.assertEqual(exp_params, params, "Parameters do not match") idx += 1 - # for eternal schedules we only check the first few elements - if eternal_schedule and idx == len(expected_schedule): + # for infinite schedules we only check the first few elements + if infinite_schedule and idx == len(expected_schedule): break @@ -435,7 +426,6 @@ def calculate_global_throughput(self, samples): class SchedulerTests(ScheduleTestCase): def setUp(self): params.register_param_source_for_name("driver-test-param-source", DriverTestParamSource) - params.register_param_source_for_name("driver-test-param-source-with-progress", DriverTestParamSourceWithProgress) runner.register_default_runners() self.test_track = track.Track(name="unittest") @@ -454,7 +444,7 @@ def test_search_task_one_client(self): (0.6, metrics.SampleType.Normal, 7 / 8, {}), (0.7, metrics.SampleType.Normal, 8 / 8, {}), ] - self.assert_schedule(expected_schedule, schedule) + self.assert_schedule(expected_schedule, list(schedule)) def test_search_task_two_clients(self): task = track.Task("search", track.Operation("search", track.OperationType.Search.name, param_source="driver-test-param-source"), @@ -469,7 +459,7 @@ def test_search_task_two_clients(self): (0.8, metrics.SampleType.Normal, 5 / 6, {}), (1.0, metrics.SampleType.Normal, 6 / 6, {}), ] - self.assert_schedule(expected_schedule, schedule) + self.assert_schedule(expected_schedule, list(schedule)) def test_schedule_param_source_determines_iterations_no_warmup(self): # we neither define any time-period nor any iteration count on the task. @@ -533,7 +523,7 @@ def test_schedule_for_warmup_time_based(self): (10.0, metrics.SampleType.Normal, 11 / 11, {"body": ["a"], "size": 11}), ], list(invocations)) - def test_eternal_schedule_without_progress_indication(self): + def test_infinite_schedule_without_progress_indication(self): task = track.Task("time-based", track.Operation("time-based", track.OperationType.Bulk.name, params={"body": ["a"]}, param_source="driver-test-param-source"), warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4}) @@ -546,22 +536,22 @@ def test_eternal_schedule_without_progress_indication(self): (2.0, metrics.SampleType.Normal, None, {"body": ["a"]}), (3.0, metrics.SampleType.Normal, None, {"body": ["a"]}), (4.0, metrics.SampleType.Normal, None, {"body": ["a"]}), - ], invocations, eternal_schedule=True) + ], invocations, infinite_schedule=True) - def test_eternal_schedule_with_progress_indication(self): - task = track.Task("time-based", track.Operation("time-based", track.OperationType.Bulk.name, params={"body": ["a"]}, - param_source="driver-test-param-source-with-progress"), + def test_finite_schedule_with_progress_indication(self): + task = track.Task("time-based", track.Operation("time-based", track.OperationType.Bulk.name, params={"body": ["a"], "size": 5}, + param_source="driver-test-param-source"), warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4}) invocations = driver.schedule_for(self.test_track, task, 0) self.assert_schedule([ - (0.0, metrics.SampleType.Normal, 0.0, {"body": ["a"]}), - (1.0, metrics.SampleType.Normal, 0.01, {"body": ["a"]}), - (2.0, metrics.SampleType.Normal, 0.02, {"body": ["a"]}), - (3.0, metrics.SampleType.Normal, 0.03, {"body": ["a"]}), - (4.0, metrics.SampleType.Normal, 0.04, {"body": ["a"]}), - ], invocations, eternal_schedule=True) + (0.0, metrics.SampleType.Normal, 1 / 5, {"body": ["a"], "size": 5}), + (1.0, metrics.SampleType.Normal, 2 / 5, {"body": ["a"], "size": 5}), + (2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "size": 5}), + (3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "size": 5}), + (4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "size": 5}), + ], list(invocations), infinite_schedule=False) def test_schedule_for_time_based(self): task = track.Task("time-based", track.Operation("time-based", track.OperationType.Bulk.name, params={"body": ["a"], "size": 11}, diff --git a/tests/track/params_test.py b/tests/track/params_test.py index 2c4ebbd0a..9faa8c2ca 100644 --- a/tests/track/params_test.py +++ b/tests/track/params_test.py @@ -862,7 +862,7 @@ def test_ingests_all_documents_by_default(self): partition = source.partition(0, 1) # # no ingest-percentage specified, should issue all one hundred bulk requests - self.assertEqual(100, partition.size()) + self.assertEqual(100, partition.total_bulks) def test_restricts_number_of_bulks_if_required(self): corpora = [ @@ -891,7 +891,7 @@ def test_restricts_number_of_bulks_if_required(self): partition = source.partition(0, 1) # should issue three bulks of size 10.000 - self.assertEqual(3, partition.size()) + self.assertEqual(3, partition.total_bulks) def test_create_with_conflict_probability_zero(self): params.BulkIndexParamSource(track=track.Track(name="unit-test"), params={