Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for transform API's #1011

Closed
wants to merge 17 commits into from
Closed
25 changes: 18 additions & 7 deletions docs/telemetry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ You probably want to gain additional insights from a race. Therefore, we have ad

Command Name Description
-------------- --------------------- --------------------------------------------------------------------
jit JIT Compiler Profiler Enables JIT compiler logs.
gc GC log Enables GC logs.
jfr Flight Recorder Enables Java Flight Recorder (requires an Oracle JDK or OpenJDK 11+)
heapdump Heap Dump Captures a heap dump.
node-stats Node Stats Regularly samples node stats
recovery-stats Recovery Stats Regularly samples shard recovery stats
ccr-stats CCR Stats Regularly samples Cross Cluster Replication (CCR) related stats
jit JIT Compiler Profiler Enables JIT compiler logs.
gc GC log Enables GC logs.
jfr Flight Recorder Enables Java Flight Recorder (requires an Oracle JDK or OpenJDK 11+)
heapdump Heap Dump Captures a heap dump.
node-stats Node Stats Regularly samples node stats
recovery-stats Recovery Stats Regularly samples shard recovery stats
ccr-stats CCR Stats Regularly samples Cross Cluster Replication (CCR) related stats
transform-stats Transform Stats Regularly samples transform stats

Keep in mind that each telemetry device may incur a runtime overhead which can skew results.

Expand Down Expand Up @@ -134,3 +135,13 @@ Supported telemetry parameters:

* ``ccr-stats-indices`` (default: all indices): An index pattern for which ccr stats should be checked.
* ``ccr-stats-sample-interval`` (default 1): A positive number greater than zero denoting the sampling interval in seconds.

transform-stats
---------------

The transform-stats telemetry device regularly calls the `transform stats API <https://www.elastic.co/guide/en/elasticsearch/reference/current/get-transform-stats.html>` and records one metrics document per transform.

Supported telemetry parameters:

* ``transform-stats-transforms`` (default: all transforms): A list of transforms per cluster for which transform stats should be checked.
* ``transform-stats-sample-interval`` (default 1): A positive number greater than zero denoting the sampling interval in seconds.
63 changes: 63 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,69 @@ With the operation ``wait-for-recovery`` you can wait until an ongoing shard rec

This operation is :ref:`retryable <track_operations>`.

create-transform
~~~~~~~~~~~~~~~~

With the operation ``create-transform`` you can execute the `create transform API <https://www.elastic.co/guide/en/elasticsearch/reference/current/put-transform.html>`_. It supports the following parameters:

* ``transform-id`` (mandatory): The id of the transform to create.
* ``body`` (mandatory): Request body containing the configuration of the transform. Please see the `create transform API <https://www.elastic.co/guide/en/elasticsearch/reference/current/put-transform.html>`__ documentation for more details.
* ``defer-validation`` (optional, defaults to false): When true, deferrable validations are not run. This behavior may be desired if the source index does not exist until after the transform is created.

This operation requires at least Elasticsearch 7.5.0 (non-OSS). This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

start-transform
~~~~~~~~~~~~~~~

With the operation ``start-transform`` you can execute the `start transform API <https://www.elastic.co/guide/en/elasticsearch/reference/current/start-transform.html>`_. It supports the following parameters:

* ``transform-id`` (mandatory): The id of the transform to start.
* ``timeout`` (optional, defaults to empty): Amount of time to wait until a transform starts.

This operation requires at least Elasticsearch 7.5.0 (non-OSS). This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

stop-transform
~~~~~~~~~~~~~~

With the operation ``stop-transform`` you can execute the `stop transform API <https://www.elastic.co/guide/en/elasticsearch/reference/current/stop-transform.html>`_. It supports the following parameters:

* ``transform-id`` (mandatory): The id of the transform to stop.
* ``force`` (optional, defaults to false): Whether to forcefully stop the transform.
* ``timeout`` (optional, defaults to empty): Amount of time to wait until a transform stops.
* ``wait-for-completion`` (optional, defaults to false) If set to true, causes the API to block until the indexer state completely stops.
* ``wait-for-checkpoint`` (optional, defaults to false) If set to true, the transform will not completely stop until the current checkpoint is completed.

This operation requires at least Elasticsearch 7.5.0 (non-OSS). This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

delete-transform
~~~~~~~~~~~~~~~~

With the operation ``delete-transform`` you can execute the `delete transform API <https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-transform.html>`_. It supports the following parameters:

* ``transform-id`` (mandatory): The id of the transform to delete.
* ``force`` (optional, defaults to false): Whether to delete the transform regardless of its current state.

This operation requires at least Elasticsearch 7.5.0 (non-OSS). This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

execute-transform
~~~~~~~~~~~~~~~~~

With the operation ``execute-transform`` you can execute a full run of data transformation, it starts the transform and stops it once all data has been transformed. The transform must be created upfront. The ``execute-transform`` operation supports the following parameters:

* ``transform-id`` (mandatory): The id of the transform to execute.
* ``poll-interval`` (optional, defaults to `0.5`) How often transform stats are polled, used to set progress and check the state. You should not set this too low, because polling can skew the result.
* ``transform-timeout`` (optional, defaults to `1800` (`1h`)) Overall runtime timeout of the batch transform in seconds.

This operation requires at least Elasticsearch 7.5.0 (non-OSS).

Examples
========

Expand Down
3 changes: 2 additions & 1 deletion esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ def prepare_telemetry(self, es):
telemetry.MlBucketProcessingTime(es_default, self.metrics_store),
telemetry.SegmentStats(log_root, es_default),
telemetry.CcrStats(telemetry_params, es, self.metrics_store),
telemetry.RecoveryStats(telemetry_params, es, self.metrics_store)
telemetry.RecoveryStats(telemetry_params, es, self.metrics_store),
telemetry.TransformStats(telemetry_params, es, self.metrics_store)
])

def wait_for_rest_api(self, es):
Expand Down
168 changes: 168 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ def register_default_runners():
register_runner(track.OperationType.CreateSnapshotRepository.name, Retry(CreateSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.WaitForRecovery.name, Retry(IndicesRecovery()), async_runner=True)
register_runner(track.OperationType.PutSettings.name, Retry(PutSettings()), async_runner=True)
register_runner(track.OperationType.CreateTransform.name, Retry(CreateTransform()), async_runner=True)
register_runner(track.OperationType.StartTransform.name, Retry(StartTransform()), async_runner=True)
register_runner(track.OperationType.StopTransform.name, Retry(StopTransform()), async_runner=True)
register_runner(track.OperationType.ExecuteTransform.name, ExecuteTransform(), async_runner=True)
register_runner(track.OperationType.DeleteTransform.name, Retry(DeleteTransform()), async_runner=True)


def runner_for(operation_type):
Expand Down Expand Up @@ -1563,6 +1568,169 @@ def __repr__(self, *args, **kwargs):
return "put-settings"


class CreateTransform(Runner):
"""
Execute the `create transform API https://www.elastic.co/guide/en/elasticsearch/reference/current/put-transform.html`_.
"""

async def __call__(self, es, params):
transform_id = mandatory(params, "transform-id", self)
body = mandatory(params, "body", self)
defer_validation = params.get("defer-validation", False)
await es.transform.put_transform(transform_id=transform_id, body=body, defer_validation=defer_validation)

def __repr__(self, *args, **kwargs):
return "create-transform"


class StartTransform(Runner):
"""
Execute the `start transform API
https://www.elastic.co/guide/en/elasticsearch/reference/current/start-transform.html`_.
"""

async def __call__(self, es, params):
transform_id = mandatory(params, "transform-id", self)
timeout = params.get("timeout")

await es.transform.start_transform(transform_id=transform_id, timeout=timeout)

def __repr__(self, *args, **kwargs):
return "start-transform"


class StopTransform(Runner):
"""
Execute the `stop transform API
https://www.elastic.co/guide/en/elasticsearch/reference/current/stop-transform.html`_.
"""

async def __call__(self, es, params):
transform_id = mandatory(params, "transform-id", self)
force = params.get("force", False)
timeout = params.get("timeout")
wait_for_completion = params.get("wait-for-completion", False)
wait_for_checkpoint = params.get("wait-for-checkpoint", False)

await es.transform.stop_transform(transform_id=transform_id,
force=force,
timeout=timeout,
wait_for_completion=wait_for_completion,
wait_for_checkpoint=wait_for_checkpoint)

def __repr__(self, *args, **kwargs):
return "stop-transform"


class ExecuteTransform(Runner):
"""
Execute - start and wait for stop - a batch transform and report its stats.
"""

def __init__(self):
super().__init__()
self._completed = False
self._percent_completed = 0.0

@property
def completed(self):
return self._completed

# todo: progress reporting does not work at the moment, although self._percent_completed gets set
@property
def percent_completed(self):
return self._percent_completed

async def __call__(self, es, params):
"""
Runs a batch transform

:param es: The Elasticsearch client.
:param params: A hash with all parameters. See below for details.
:return: A hash with stats from the run.

It expects a parameter dict with the following mandatory keys:

* ``transform_id``: the transform id to start, the transform must have been created upfront.

The following keys are optional:

* ``poll-interval``: how often transform stats are polled, used to set progress and check the state,
default 0.5.
* ``transform-timeout``: overall runtime timeout of the batch transform in seconds, default 1800 (1h)
"""
import time
transform_id = mandatory(params, "transform-id", self)
poll_interval = params.get("poll-interval", 0.5)
transform_timeout = params.get("transform-timeout", 60.0 * 60.0)
start_time = time.time()
await es.transform.start_transform(transform_id=transform_id)

stats_response = await es.transform.get_transform_stats(transform_id=transform_id)
state = stats_response["transforms"][0].get("state")

while state in ("started", "indexing"):
if (time.time() - start_time) > transform_timeout:
raise exceptions.RallyAssertionError(
f"Transform [{transform_id}] timed out after [{transform_timeout}] seconds. "
"Please consider increasing the timeout in the track.")
await asyncio.sleep(poll_interval)
stats_response = await es.transform.get_transform_stats(transform_id=transform_id)
state = stats_response["transforms"][0].get("state")

self._percent_completed = stats_response["transforms"][0].get("checkpointing", {}).get("next", {}).get(
"checkpoint_progress", {}).get("percent_complete", 0.0)

if state == "failed":
failure_reason = stats_response["transforms"][0].get("reason", "unknown")
raise exceptions.RallyAssertionError(
f"Transform [{transform_id}] failed with [{failure_reason}].")

self._completed = True
transform_stats = stats_response["transforms"][0].get("stats", {})

ops = {
"pages_processed": transform_stats.get("pages_processed", 0),
"documents_processed": transform_stats.get("documents_processed", 0),
"documents_indexed": transform_stats.get("documents_indexed", 0),
"index_total": transform_stats.get("index_total", 0),
"transform_index_failures": transform_stats.get("index_failures", 0),
"transform_search_total": transform_stats.get("search_total", 0),
"transform_search_failures": transform_stats.get("search_failures", 0),
"transform_processing_total": transform_stats.get("processing_total", 0)
}
stats = {
"search_time": transform_stats.get("search_time_in_ms", 0),
"processing_time": transform_stats.get("processing_time_in_ms", 0),
"index_time": transform_stats.get("index_time_in_ms", 0),
"weight": transform_stats.get("documents_processed", 0),
"unit": "docs",
"ops": ops
}

return stats


def __repr__(self, *args, **kwargs):
return "execute-transform"


class DeleteTransform(Runner):
"""
Execute the `delete transform API
https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-transform.html`_.
"""

async def __call__(self, es, params):
transform_id = mandatory(params, "transform-id", self)
force = params.get("force", False)
# we don't want to fail if a job does not exist, thus we ignore 404s.
await es.transform.delete_transform(transform_id=transform_id, force=force, ignore=[404])

def __repr__(self, *args, **kwargs):
return "delete-transform"


# TODO: Allow to use this from (selected) regular runners and add user documentation.
# TODO: It would maybe be interesting to add meta-data on how many retries there were.
class Retry(Runner, Delegator):
Expand Down
31 changes: 31 additions & 0 deletions esrally/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,19 @@ def __call__(self):
# convert to int, fraction counts are senseless
median_segment_count = self.median("segments_count")
result.segment_count = int(median_segment_count) if median_segment_count is not None else median_segment_count

result.transform_pages_processed = self.max("transform_pages_processed")
result.transform_documents_processed = self.max("transform_documents_processed")
result.transform_documents_indexed = self.max("transform_documents_indexed")
result.transform_search_time_in_ms = self.max("transform_search_time_in_ms")
result.transform_index_time_in_ms = self.max("transform_index_time_in_ms")
result.transform_processing_time_in_ms = self.max("transform_processing_time_in_ms")
result.transform_index_total = self.max("transform_index_total")
result.transform_index_failures = self.max("transform_index_failures")
result.transform_search_total = self.max("transform_search_total")
result.transform_search_failures = self.max("transform_search_failures")
result.transform_processing_total = self.max("transform_processing_total")

return result

def merge(self, *args):
Expand Down Expand Up @@ -1747,6 +1760,12 @@ def error_rate(self, task_name):
def median(self, metric_name, task_name=None, operation_type=None, sample_type=None):
return self.store.get_median(metric_name, task=task_name, operation_type=operation_type, sample_type=sample_type)

def max(self, metric_name, task_name=None, operation_type=None, sample_type=None):
stats = self.store.get_stats(metric_name, task=task_name, operation_type=operation_type, sample_type=sample_type)
if stats is not None:
return stats["max"]
return 0

def single_latency(self, task, metric_name="latency"):
sample_type = SampleType.Normal
sample_size = self.store.get_count(metric_name, task=task, sample_type=sample_type)
Expand Down Expand Up @@ -1805,6 +1824,18 @@ def __init__(self, d=None):
self.translog_size = self.v(d, "translog_size")
self.segment_count = self.v(d, "segment_count")

self.transform_pages_processed = self.v(d, "transform_pages_processed")
self.transform_documents_processed = self.v(d, "transform_documents_processed")
self.transform_documents_indexed = self.v(d, "transform_documents_indexed")
self.transform_search_time_in_ms = self.v(d, "transform_search_time_in_ms")
self.transform_index_time_in_ms = self.v(d, "transform_index_time_in_ms")
self.transform_processing_time_in_ms = self.v(d, "transform_processing_time_in_ms")
self.transform_index_total = self.v(d, "transform_index_total")
self.transform_index_failures = self.v(d, "transform_index_failures")
self.transform_search_total = self.v(d, "transform_search_total")
self.transform_search_failures = self.v(d, "transform_search_failures")
self.transform_processing_total = self.v(d, "transform_processing_total")

def as_dict(self):
return self.__dict__

Expand Down
16 changes: 16 additions & 0 deletions esrally/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ def report(self):
metrics_table.extend(self.report_segment_memory(stats))
metrics_table.extend(self.report_segment_counts(stats))

metrics_table.extend(self.report_transform_stats(stats))

for record in stats.op_metrics:
task = record["task"]
metrics_table.extend(self.report_throughput(record, task))
Expand Down Expand Up @@ -267,6 +269,20 @@ def report_segment_counts(self, stats):
self.line("Segment count", "", stats.segment_count, "")
)

def report_transform_stats(self, stats):
if stats.transform_documents_processed is None or stats.transform_documents_processed == 0:
return []

return self.join(
self.line("Transform documents_processed", "", stats.transform_documents_processed, ""),
self.line("Transform documents_indexed", "", stats.transform_documents_indexed, ""),
self.line("Transform transform_search_time", "", stats.transform_search_time_in_ms, "ms"),
self.line("Transform transform_index_time", "", stats.transform_index_time_in_ms, "ms"),
self.line("Transform transform_processing_time", "", stats.transform_processing_time_in_ms, "ms"),
self.line("Transform transform_index_failures", "", stats.transform_index_failures, ""),
self.line("Transform transform_search_failures", "", stats.transform_search_failures, "")
)

def join(self, *args):
lines = []
for arg in args:
Expand Down
Loading