Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for transform API's #1011

Closed
wants to merge 17 commits into from
Closed
26 changes: 19 additions & 7 deletions docs/telemetry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ You probably want to gain additional insights from a race. Therefore, we have ad

Command Name Description
-------------- --------------------- --------------------------------------------------------------------
jit JIT Compiler Profiler Enables JIT compiler logs.
gc GC log Enables GC logs.
jfr Flight Recorder Enables Java Flight Recorder (requires an Oracle JDK or OpenJDK 11+)
heapdump Heap Dump Captures a heap dump.
node-stats Node Stats Regularly samples node stats
recovery-stats Recovery Stats Regularly samples shard recovery stats
ccr-stats CCR Stats Regularly samples Cross Cluster Replication (CCR) related stats
jit JIT Compiler Profiler Enables JIT compiler logs.
gc GC log Enables GC logs.
jfr Flight Recorder Enables Java Flight Recorder (requires an Oracle JDK or OpenJDK 11+)
heapdump Heap Dump Captures a heap dump.
node-stats Node Stats Regularly samples node stats
recovery-stats Recovery Stats Regularly samples shard recovery stats
ccr-stats CCR Stats Regularly samples Cross Cluster Replication (CCR) related stats
segment-stats Segment Stats Determines segment stats at the end of the benchmark.
transform-stats Transform Stats Regularly samples transform stats

Keep in mind that each telemetry device may incur a runtime overhead which can skew results.

Expand Down Expand Up @@ -134,3 +136,13 @@ Supported telemetry parameters:

* ``ccr-stats-indices`` (default: all indices): An index pattern for which ccr stats should be checked.
* ``ccr-stats-sample-interval`` (default 1): A positive number greater than zero denoting the sampling interval in seconds.

transform-stats
---------------

The transform-stats telemetry device regularly calls the `transform stats API <https://www.elastic.co/guide/en/elasticsearch/reference/current/get-transform-stats.html>` and records one metrics document per transform.

Supported telemetry parameters:

* ``transform-stats-transforms`` (default: all transforms): A list of transforms per cluster for which transform stats should be checked.
* ``transform-stats-sample-interval`` (default 1): A positive number greater than zero denoting the sampling interval in seconds.
63 changes: 63 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,69 @@ With the operation ``wait-for-recovery`` you can wait until an ongoing shard rec

This operation is :ref:`retryable <track_operations>`.

create-transform
~~~~~~~~~~~~~~~~

With the operation ``create-transform`` you can execute the `create transform API <https://www.elastic.co/guide/en/elasticsearch/reference/current/put-transform.html>`_. It supports the following parameters:

* ``transform-id`` (mandatory): The id of the transform to create.
* ``body`` (mandatory): Request body containing the configuration of the transform. Please see the `create transform API <https://www.elastic.co/guide/en/elasticsearch/reference/current/put-transform.html>`__ documentation for more details.
* ``defer-validation`` (optional, defaults to false): When true, deferrable validations are not run. This behavior may be desired if the source index does not exist until after the transform is created.

This operation requires at least Elasticsearch 7.5.0 (non-OSS). This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

start-transform
~~~~~~~~~~~~~~~

With the operation ``start-transform`` you can execute the `start transform API <https://www.elastic.co/guide/en/elasticsearch/reference/current/start-transform.html>`_. It supports the following parameters:

* ``transform-id`` (mandatory): The id of the transform to start.
* ``timeout`` (optional, defaults to empty): Amount of time to wait until a transform starts.

This operation requires at least Elasticsearch 7.5.0 (non-OSS). This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

stop-transform
~~~~~~~~~~~~~~

With the operation ``stop-transform`` you can execute the `stop transform API <https://www.elastic.co/guide/en/elasticsearch/reference/current/stop-transform.html>`_. It supports the following parameters:

* ``transform-id`` (mandatory): The id of the transform to stop.
* ``force`` (optional, defaults to false): Whether to forcefully stop the transform.
* ``timeout`` (optional, defaults to empty): Amount of time to wait until a transform stops.
* ``wait-for-completion`` (optional, defaults to false) If set to true, causes the API to block until the indexer state completely stops.
* ``wait-for-checkpoint`` (optional, defaults to false) If set to true, the transform will not completely stop until the current checkpoint is completed.

This operation requires at least Elasticsearch 7.5.0 (non-OSS). This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

delete-transform
~~~~~~~~~~~~~~~~

With the operation ``delete-transform`` you can execute the `delete transform API <https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-transform.html>`_. It supports the following parameters:

* ``transform-id`` (mandatory): The id of the transform to delete.
* ``force`` (optional, defaults to false): Whether to delete the transform regardless of its current state.

This operation requires at least Elasticsearch 7.5.0 (non-OSS). This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

execute-transform
~~~~~~~~~~~~~~~~~

With the operation ``execute-transform`` you can execute a full run of data transformation, it starts the transform and stops it once all data has been transformed. The transform must be created upfront. The ``execute-transform`` operation supports the following parameters:

* ``transform-id`` (mandatory): The id of the transform to execute.
* ``poll-interval`` (optional, defaults to `0.5`) How often transform stats are polled, used to set progress and check the state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to specify the time unit for poll-interval? i.e. 0.5 (seconds)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, would the poll-interval also affect the performance? Not sure if we want to mention that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it might affect performance, but I can't say how much. It shouldn't be expensive.

FWIW: I debugged the progress reporting and it seems progress isn't working in rally. Is there an existing issue? Is there a plan to fix this? If not, it might be better to remove the progress report.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Progress reporting might be a bit of a trade of
Functions that implement completed and percent_completed are called until _completed isn't set to True. The issue is that execute-transform just has a while loop and blocks there until completed is set to True, so percent_completed is never updated. For it to work we would need to have start_transform outside of the execute_transform and execute_transform should just check get_transform_stats and update _percent_completed outside of the loop and after the sleep and and once it's done set _completed to True. In this case it probably should be named something like wait-for-transformation. And in the track we would need to have both operations start_transformation followed by wait-for-transformation. We used to have an example of that for snapshot recovery operation, but it got changed to just waiting for the restore to complete in a loop without updating percent_completion to get more accurate throughout and how long it took to complete information as otherwise it could be off by the sleep time. Here is the change 9f73d5d where you can see both how it was before and after. Although i think in your case it should be okay to use percentage completed, since you are relaying on information from get_transform_stats to get the performance..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the explanation! Lets discuss this later, I lean towards keeping execute-transform the way it is and rather remove the progress info.

* ``transform-timeout`` (optional, defaults to `1800` (`1h`)) Overall runtime timeout of the batch transform in seconds.

This operation requires at least Elasticsearch 7.5.0 (non-OSS).

Examples
========

Expand Down
3 changes: 2 additions & 1 deletion esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ def prepare_telemetry(self, es):
telemetry.MlBucketProcessingTime(es_default, self.metrics_store),
telemetry.SegmentStats(log_root, es_default),
telemetry.CcrStats(telemetry_params, es, self.metrics_store),
telemetry.RecoveryStats(telemetry_params, es, self.metrics_store)
telemetry.RecoveryStats(telemetry_params, es, self.metrics_store),
telemetry.TransformStats(telemetry_params, es, self.metrics_store)
])

def wait_for_rest_api(self, es):
Expand Down
168 changes: 168 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ def register_default_runners():
register_runner(track.OperationType.CreateSnapshotRepository.name, Retry(CreateSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.WaitForRecovery.name, Retry(IndicesRecovery()), async_runner=True)
register_runner(track.OperationType.PutSettings.name, Retry(PutSettings()), async_runner=True)
register_runner(track.OperationType.CreateTransform.name, Retry(CreateTransform()), async_runner=True)
register_runner(track.OperationType.StartTransform.name, Retry(StartTransform()), async_runner=True)
register_runner(track.OperationType.StopTransform.name, Retry(StopTransform()), async_runner=True)
register_runner(track.OperationType.ExecuteTransform.name, ExecuteTransform(), async_runner=True)
register_runner(track.OperationType.DeleteTransform.name, Retry(DeleteTransform()), async_runner=True)


def runner_for(operation_type):
Expand Down Expand Up @@ -1563,6 +1568,169 @@ def __repr__(self, *args, **kwargs):
return "put-settings"


class CreateTransform(Runner):
"""
Execute the `create transform API https://www.elastic.co/guide/en/elasticsearch/reference/current/put-transform.html`_.
"""

async def __call__(self, es, params):
transform_id = mandatory(params, "transform-id", self)
body = mandatory(params, "body", self)
defer_validation = params.get("defer-validation", False)
await es.transform.put_transform(transform_id=transform_id, body=body, defer_validation=defer_validation)

def __repr__(self, *args, **kwargs):
return "create-transform"


class StartTransform(Runner):
"""
Execute the `start transform API
https://www.elastic.co/guide/en/elasticsearch/reference/current/start-transform.html`_.
"""

async def __call__(self, es, params):
transform_id = mandatory(params, "transform-id", self)
timeout = params.get("timeout")

await es.transform.start_transform(transform_id=transform_id, timeout=timeout)

def __repr__(self, *args, **kwargs):
return "start-transform"


class StopTransform(Runner):
"""
Execute the `stop transform API
https://www.elastic.co/guide/en/elasticsearch/reference/current/stop-transform.html`_.
"""

async def __call__(self, es, params):
transform_id = mandatory(params, "transform-id", self)
force = params.get("force", False)
timeout = params.get("timeout")
wait_for_completion = params.get("wait-for-completion", False)
wait_for_checkpoint = params.get("wait-for-checkpoint", False)

await es.transform.stop_transform(transform_id=transform_id,
force=force,
timeout=timeout,
wait_for_completion=wait_for_completion,
wait_for_checkpoint=wait_for_checkpoint)

def __repr__(self, *args, **kwargs):
return "stop-transform"


class ExecuteTransform(Runner):
"""
Execute - start and wait for stop - a batch transform and report its stats.
"""

def __init__(self):
super().__init__()
self._completed = False
self._percent_completed = 0.0

@property
def completed(self):
return self._completed

# todo: progress reporting does not work at the moment, although self._percent_completed gets set
@property
def percent_completed(self):
return self._percent_completed

async def __call__(self, es, params):
"""
Runs a batch transform

:param es: The Elasticsearch client.
:param params: A hash with all parameters. See below for details.
:return: A hash with stats from the run.

It expects a parameter dict with the following mandatory keys:

* ``transform_id``: the transform id to start, the transform must have been created upfront.

The following keys are optional:

* ``poll-interval``: how often transform stats are polled, used to set progress and check the state,
default 0.5.
* ``transform-timeout``: overall runtime timeout of the batch transform in seconds, default 1800 (1h)
"""
import time
transform_id = mandatory(params, "transform-id", self)
poll_interval = params.get("poll-interval", 0.5)
transform_timeout = params.get("transform-timeout", 60.0 * 60.0)
start_time = time.time()
await es.transform.start_transform(transform_id=transform_id)

stats_response = await es.transform.get_transform_stats(transform_id=transform_id)
state = stats_response['transforms'][0].get("state")

while state in ('started', 'indexing'):
if (time.time() - start_time) > transform_timeout:
raise exceptions.RallyAssertionError(
"Transform [{}] timed out after [{}] seconds. "
"Please consider increasing the timeout in the track.".format(
transform_id, transform_timeout))
await asyncio.sleep(poll_interval)
stats_response = await es.transform.get_transform_stats(transform_id=transform_id)
state = stats_response['transforms'][0].get("state")

self._percent_completed = stats_response['transforms'][0].get("checkpointing", {}).get("next", {}).get(
"checkpoint_progress", {}).get("percent_complete", 0.0)

if state == "failed":
raise exceptions.RallyAssertionError(
"Transform [{}] failed with [{}].".format(transform_id,
stats_response['transforms'][0].get("reason", "unknown")))

self._completed = True
transform_stats = stats_response['transforms'][0].get("stats", {})

ops = {
"pages_processed": transform_stats.get("pages_processed", 0),
"documents_processed": transform_stats.get("documents_processed", 0),
"documents_indexed": transform_stats.get("documents_indexed", 0),
"index_total": transform_stats.get("index_total", 0),
"transform_index_failures": transform_stats.get("index_failures", 0),
"transform_search_total": transform_stats.get("search_total", 0),
"transform_search_failures": transform_stats.get("search_failures", 0),
"transform_processing_total": transform_stats.get("processing_total", 0)
}
stats = {
"search_time": transform_stats.get("search_time_in_ms", 0),
"processing_time": transform_stats.get("processing_time_in_ms", 0),
"index_time": transform_stats.get("index_time_in_ms", 0),
"unit": "ms",
"ops": ops
}

return stats


def __repr__(self, *args, **kwargs):
return "execute-transform"


class DeleteTransform(Runner):
"""
Execute the `delete transform API
https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-transform.html`_.
"""

async def __call__(self, es, params):
transform_id = mandatory(params, "transform-id", self)
force = params.get("force", False)
# we don't want to fail if a job does not exist, thus we ignore 404s.
await es.transform.delete_transform(transform_id=transform_id, force=force, ignore=[404])

def __repr__(self, *args, **kwargs):
return "delete-transform"


# TODO: Allow to use this from (selected) regular runners and add user documentation.
# TODO: It would maybe be interesting to add meta-data on how many retries there were.
class Retry(Runner, Delegator):
Expand Down
31 changes: 31 additions & 0 deletions esrally/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,19 @@ def __call__(self):
# convert to int, fraction counts are senseless
median_segment_count = self.median("segments_count")
result.segment_count = int(median_segment_count) if median_segment_count is not None else median_segment_count

result.transform_pages_processed = self.max("transform_pages_processed")
result.transform_documents_processed = self.max("transform_documents_processed")
result.transform_documents_indexed = self.max("transform_documents_indexed")
result.transform_search_time_in_ms = self.max("transform_search_time_in_ms")
result.transform_index_time_in_ms = self.max("transform_index_time_in_ms")
result.transform_processing_time_in_ms = self.max("transform_processing_time_in_ms")
result.transform_index_total = self.max("transform_index_total")
result.transform_index_failures = self.max("transform_index_failures")
result.transform_search_total = self.max("transform_search_total")
result.transform_search_failures = self.max("transform_search_failures")
result.transform_processing_total = self.max("transform_processing_total")

return result

def merge(self, *args):
Expand Down Expand Up @@ -1747,6 +1760,12 @@ def error_rate(self, task_name):
def median(self, metric_name, task_name=None, operation_type=None, sample_type=None):
return self.store.get_median(metric_name, task=task_name, operation_type=operation_type, sample_type=sample_type)

def max(self, metric_name, task_name=None, operation_type=None, sample_type=None):
stats = self.store.get_stats(metric_name, task=task_name, operation_type=operation_type, sample_type=sample_type)
if stats is not None:
return stats["max"]
return 0

def single_latency(self, task, metric_name="latency"):
sample_type = SampleType.Normal
sample_size = self.store.get_count(metric_name, task=task, sample_type=sample_type)
Expand Down Expand Up @@ -1805,6 +1824,18 @@ def __init__(self, d=None):
self.translog_size = self.v(d, "translog_size")
self.segment_count = self.v(d, "segment_count")

self.transform_pages_processed = self.v(d, "transform_pages_processed")
self.transform_documents_processed = self.v(d, "transform_documents_processed")
self.transform_documents_indexed = self.v(d, "transform_documents_indexed")
self.transform_search_time_in_ms = self.v(d, "transform_search_time_in_ms")
self.transform_index_time_in_ms = self.v(d, "transform_index_time_in_ms")
self.transform_processing_time_in_ms = self.v(d, "transform_processing_time_in_ms")
self.transform_index_total = self.v(d, "transform_index_total")
self.transform_index_failures = self.v(d, "transform_index_failures")
self.transform_search_total = self.v(d, "transform_search_total")
self.transform_search_failures = self.v(d, "transform_search_failures")
self.transform_processing_total = self.v(d, "transform_processing_total")

def as_dict(self):
return self.__dict__

Expand Down
16 changes: 16 additions & 0 deletions esrally/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ def report(self):
metrics_table.extend(self.report_segment_memory(stats))
metrics_table.extend(self.report_segment_counts(stats))

metrics_table.extend(self.report_transform_stats(stats))

for record in stats.op_metrics:
task = record["task"]
metrics_table.extend(self.report_throughput(record, task))
Expand Down Expand Up @@ -267,6 +269,20 @@ def report_segment_counts(self, stats):
self.line("Segment count", "", stats.segment_count, "")
)

def report_transform_stats(self, stats):
if stats.transform_documents_processed is None or stats.transform_documents_processed == 0:
return []

return self.join(
self.line("Transform documents_processed", "", stats.transform_documents_processed, ""),
self.line("Transform documents_indexed", "", stats.transform_documents_indexed, ""),
self.line("Transform transform_search_time", "", stats.transform_search_time_in_ms, "ms"),
self.line("Transform transform_index_time", "", stats.transform_index_time_in_ms, "ms"),
self.line("Transform transform_processing_time", "", stats.transform_processing_time_in_ms, "ms"),
self.line("Transform transform_index_failures", "", stats.transform_index_failures, ""),
self.line("Transform transform_search_failures", "", stats.transform_search_failures, "")
)

def join(self, *args):
lines = []
for arg in args:
Expand Down
Loading