Skip to content

Commit

Permalink
Allow to define operation parameters dynamically
Browse files Browse the repository at this point in the history
Closes #107
  • Loading branch information
danielmitterdorfer committed Oct 26, 2016
1 parent 98344ea commit 0b28c00
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 90 deletions.
89 changes: 89 additions & 0 deletions docs/adding_benchmarks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,95 @@ You can find an example in the logging track::

The data set that is used in the logging track starts on 26-04-1998 but we want to ignore the first few days for this query, so we start on 15-05-1998. The expression ``{{'15-05-1998' | days_ago(now)}}`` yields the difference in days between now and the fixed start date and allows us to benchmark time range queries relative to now with a predetermined data set.

Custom parameter sources
^^^^^^^^^^^^^^^^^^^^^^^^

.. note::

This is a rather new feature and the API may change! However, the effort to use custom parameter sources is very low.

Consider the following operation definition::

{
"name": "term",
"operation-type": "search",
"body": {
"query": {
"term": {
"body": "physician"
}
}
}
}

This query is defined statically in the track specification but sometimes you may want to vary parameters, e.g. search also for "mechanic" or "nurse". In this case, you can write your own "parameter source" with a little bit of Python code.

First, define the name of your parameter source in the operation definition::

{
"name": "term",
"operation-type": "search",
"param-source": {
"name": "my-custom-term-param-source"
}
}

Rally will recognize the parameter source and looks then for a file ``track.py`` in the same directory as the corresponding JSON file. This file contains the implementation of the parameter source::

import random


class TermParamSource:
PROFESSIONS = ["mechanic", "physician", "nurse"]

def __init__(self, *args, **kwargs):
pass

def partition(self, partition_index, total_partitions):
return self

def size(self):
return 1

def params(self):
# you must provide all parameters that the runner expects
return {
"body": {
"query": {
"term": {
"body": "%s" % self.random_profession()
}
}
},
"index": None,
"type": None,
"use_request_cache": False
}

def random_profession(self):
return random.choice(TermParamSource.PROFESSIONS)


def register(registry):
registry.register_param_source("my-custom-term-param-source", TermParamSource)


Let's walk through this code step by step:

* Note the method ``register`` where you need to bind the name in the track specification to your parameter source implementation class. It is called by Rally before the track is executed.
* The class ``TermParamSource`` is the actual parameter source and needs to fulfill a few requirements:

* It needs to have a constructor with the signature ``__init__(self, *args, **kwargs)``
* ``partition(self, partition_index, total_partitions)`` is called by Rally to "assign" the parameter source across multiple clients. Typically you can just return ``self`` but in certain cases you need to do something more sophisticated. If each clients needs to act differently then you can provide different parameter source instances here.
* ``size(self)``: This method is needed to help Rally provide a proper progress indication to users if you use a warmup time period. For bulk indexing, this would return the number of bulks (for a given client). As searches are typically executed with a pre-determined amount of iterations, just return ``1`` in this case.
* ``params(self)``: This method needs to return a dictionary with all parameters that the corresponding "runner" expects. For the standard case, Rally provides most of these parameters as a convenience, but here you need to define all of them yourself. This method will be invoked once for every iteration during the race. We can see that we randomly select a profession from a list which will be then be executed by the corresponding runner.

.. note::

Be aware that ``params(self)`` is called on a performance-critical path so don't do anything in this method that takes a lot of time (avoid any I/O). For searches, you should usually throttle throughput anyway and there it does not matter that much but if the corresponding operation is run without throughput throttling, please double-check that you did not introduce a bottleneck in the load test driver with your custom parameter source.

In the implementation of custom parameter sources you can access the Python standard API. Using any additional libraries is not supported.

Running tasks in parallel
^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
14 changes: 13 additions & 1 deletion docs/command_line_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Activates the provided :doc:`telemetry devices </telemetry>` for this race.
This activates Java flight recorder and the JIT compiler telemetry devices.

.. _command_line_reference_revision:

``revision``
~~~~~~~~~~~~

Expand Down Expand Up @@ -200,7 +201,7 @@ Tells Rally that it should assume it has no connection to the Internet when chec
Rally usually installs and launches an Elasticsearch cluster internally and wipes the entire directory after the benchmark is done. Sometimes you want to keep this cluster including all data after the benchmark has finished and that's what you can do with this flag. Note that depending on the track that has been run, the cluster can eat up a very significant amount of disk space (at least dozens of GB). The default value is configurable in the advanced configuration but usually ``false``.

.. note::
This option does only affect clusters that are provisioned by Rally. More specifically, if you use the pipeline ``benchmark-only``, this option is ineffective as Rally does not provision a cluster in this case.
This option does only affect clusters that are provisioned by Rally. More specifically, if you use the pipeline ``benchmark-only``, this option is ineffective as Rally does not provision a cluster in this case.


``advanced-config``
Expand All @@ -215,6 +216,17 @@ This flag determines whether Rally should present additional (advanced) configur
esrally configure --advanced-config


``assume-defaults``
~~~~~~~~~~~~~~~~~~~

This flag determines whether Rally should automatically accept all values for configuration options that provide a default. This is mainly intended to configure Rally automatically in CI runs. The default value is ``false``.

**Example**

::

esrally configure --assume-defaults=true

``user-tag``
~~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ When using the advanced configuration, Rally stores its metrics not in-memory bu

* Elasticsearch: a dedicated Elasticsearch instance which acts as the metrics store for Rally. If you don't want to set it up yourself you can also use `Elastic Cloud <https://www.elastic.co/cloud>`_.
* Optional: Kibana (also included in `Elastic Cloud <https://www.elastic.co/cloud>`_).

Preparation
~~~~~~~~~~~

Expand Down
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ When the benchmark is done, a summary report is written to the command line:::
Nodes Stats(100.0 percentile) [ms] 5.22527

.. note::
You can save this report also to a file by using ``--report-file=/path/to/your/report.md`` and write it also as CSV with ``--report-format=csv``.
You can save this report also to a file by using ``--report-file=/path/to/your/report.md`` and write it also as CSV with ``--report-format=csv``.


Before relying too much on the numbers, please double-check that you did not introduce any bottlenecks and that your hardware is sufficient (e.g. spinning disks are not a good idea, better use SSDs). For additional insights and metrics you can activate different :doc:`telemetry devices </telemetry>` in Rally.
Expand Down
2 changes: 1 addition & 1 deletion docs/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ Rally captures also some meta information for each metric record:
* Host name
* Node name: If Rally provisions the cluster, it will choose a unique name for each node.
* Source revision: We always record the git hash of the version of Elasticsearch that is benchmarked. This is even done if you benchmark an official binary release.
* Custom tag: You can define one custom tag with the command line flag ``--user-tag``. The tag is prefixed by "tag_" in order to avoid accidental clashes with Rally internal tags.
* Custom tag: You can define one custom tag with the command line flag ``--user-tag``. The tag is prefixed by ``tag_`` in order to avoid accidental clashes with Rally internal tags.

Note that depending on the "level" of a metric record, certain meta information might be missing. It makes no sense to record host level meta info for a cluster wide metric record, like a query latency (as it cannot be attributed to a single node).

Expand Down
2 changes: 1 addition & 1 deletion docs/pipelines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ You should use this pipeline when you want to build and benchmark Elasticsearch

esrally --pipeline=from-sources-complete --revision=latest

You have to specify a :ref:`revision </command_line_reference>`.
You have to specify a :ref:`revision <command_line_reference_revision>`.

from-sources-skip-build
~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
2 changes: 1 addition & 1 deletion esrally/driver/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .driver import Driver, StartBenchmark, BenchmarkComplete, BenchmarkFailure
from .driver import Driver, StartBenchmark, BenchmarkComplete, BenchmarkFailure
30 changes: 16 additions & 14 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ class StartLoadGenerator:
Starts a load generator.
"""

def __init__(self, client_id, config, tasks):
def __init__(self, client_id, config, track, tasks):
"""
:param client_id: Client id of the load generator.
:param config: Rally internal configuration object.
:param track: The track to use.
:param tasks: Tasks to run.
"""
self.client_id = client_id
self.config = config
self.track = track
self.tasks = tasks


Expand Down Expand Up @@ -189,7 +191,7 @@ def start_benchmark(self, msg, sender):
for client_id in range(allocator.clients):
self.drivers.append(self.createActor(LoadGenerator))
for client_id, driver in enumerate(self.drivers):
self.send(driver, StartLoadGenerator(client_id, self.config, self.allocations[client_id]))
self.send(driver, StartLoadGenerator(client_id, self.config, current_track, self.allocations[client_id]))

self.update_progress_message()
self.wakeupAfter(datetime.timedelta(seconds=Driver.WAKEUP_INTERVAL_SECONDS))
Expand Down Expand Up @@ -221,11 +223,11 @@ def joinpoint_reached(self, msg):
self.metrics_store.close()
self.send(self.myAddress, thespian.actors.ActorExitRequest())
else:
# start the next task in three seconds (relative to master's timestamp)
# start the next task in five seconds (relative to master's timestamp)
#
# Assumption: We don't have a lot of clock skew between reaching the join point and sending the next task
# (it doesn't matter too much if we're a few ms off).
start_next_task = time.perf_counter() + 3.0
start_next_task = time.perf_counter() + 5.0
for client_id, driver in enumerate(self.drivers):
client_ended_task_at, master_received_msg_at = clients_curr_step[client_id]
client_start_timestamp = client_ended_task_at + (start_next_task - master_received_msg_at)
Expand Down Expand Up @@ -289,6 +291,7 @@ def __init__(self):
self.client_id = None
self.es = None
self.config = None
self.track = None
self.tasks = None
self.current_task = 0
self.start_timestamp = None
Expand All @@ -305,9 +308,11 @@ def receiveMessage(self, msg, sender):
self.client_id = msg.client_id
self.es = client.EsClientFactory(msg.config.opts("client", "hosts"), msg.config.opts("client", "options")).create()
self.config = msg.config
self.track = msg.track
self.tasks = msg.tasks
self.current_task = 0
self.start_timestamp = time.perf_counter()
track.load_track_plugins(self.config)
self.drive()
elif isinstance(msg, Drive):
logger.debug("Client [%d] is continuing its work at task index [%d] on [%f]." %
Expand Down Expand Up @@ -357,7 +362,7 @@ def drive(self):
elif isinstance(task, track.Task):
logger.info("Client [%d] is executing [%s]." % (self.client_id, task))
self.sampler = Sampler(self.client_id, task.operation, self.start_timestamp)
schedule = schedule_for(task, self.client_id)
schedule = schedule_for(self.track, task, self.client_id)
self.executor_future = self.pool.submit(execute_schedule, schedule, self.es, self.sampler)
self.wakeupAfter(datetime.timedelta(seconds=LoadGenerator.WAKEUP_INTERVAL_SECONDS))
else:
Expand Down Expand Up @@ -610,11 +615,6 @@ def execute_schedule(schedule, es, sampler):
curr_total_it = 1
# noinspection PyBroadException
try:

# yield (wait_time * it,
# lambda start: metrics.SampleType.Warmup if time.perf_counter() - start < warmup_time_period else metrics.SampleType.Normal,
# it, iterations, iterations, runner, p)

for expected_scheduled_time, sample_type_calculator, curr_iteration, total_it_for_task, runner, params in schedule:
sample_type = sample_type_calculator(total_start)
# restart the relative time when the sample type changes. This way all warmup samples and measurement samples will start at
Expand Down Expand Up @@ -773,10 +773,11 @@ def clients(self):

# Runs a concrete schedule on one worker client
# Needs to determine the runners and concrete iterations per client.
def schedule_for(task, client_index):
def schedule_for(current_track, task, client_index):
"""
Calculates a client's schedule for a given task.
:param current_track: The current track.
:param task: The task that should be executed.
:param client_index: The current client index. Must be in the range [0, `task.clients').
:return: A generator for the operations the given client needs to perform for this task.
Expand All @@ -785,15 +786,16 @@ def schedule_for(task, client_index):
num_clients = task.clients
target_throughput = task.target_throughput / num_clients if task.target_throughput else None
runner_for_op = runner.runner_for(op.type)
params_for_op = track.operation_parameters(current_track, op).partition(client_index, num_clients)

if task.warmup_time_period is not None:
logger.info("Creating time period based schedule for [%s] with a warmup period of [%d] seconds." % (op, task.warmup_time_period))
return time_period_based(target_throughput, task.warmup_time_period, runner_for_op, op.params.partition(client_index, num_clients))
return time_period_based(target_throughput, task.warmup_time_period, runner_for_op, params_for_op)
else:
logger.info("Creating iteration-count based schedule for [%s] with [%d] warmup iterations and [%d] iterations." %
(op, task.warmup_iterations, task.iterations))
return iteration_count_based(target_throughput, task.warmup_iterations // num_clients, task.iterations // num_clients,
runner_for_op, op.params.partition(client_index, num_clients))
runner_for_op, params_for_op)


def time_period_based(target_throughput, warmup_time_period, runner, params):
Expand All @@ -807,7 +809,7 @@ def time_period_based(target_throughput, warmup_time_period, runner, params):
:return: A generator for the corresponding parameters.
"""
wait_time = 1 / target_throughput if target_throughput else 0
iterations = params.variation_count()
iterations = params.size()
for it in range(0, iterations):
yield (wait_time * it,
lambda start: metrics.SampleType.Warmup if time.perf_counter() - start < warmup_time_period else metrics.SampleType.Normal,
Expand Down
2 changes: 1 addition & 1 deletion esrally/track/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .loader import list_tracks, load_track, prepare_track
from .loader import list_tracks, load_track, load_track_plugins, prepare_track, operation_parameters

# expose the complete track API
from .track import *
Loading

0 comments on commit 0b28c00

Please sign in to comment.