Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data streams telemetry device #1296

Merged
merged 9 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/telemetry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ You probably want to gain additional insights from a race. Therefore, we have ad
transform-stats Transform Stats Regularly samples transform stats
searchable-snapshots-stats Searchable Snapshots Stats Regularly samples searchable snapshots stats
shard-stats Shard Stats Regularly samples nodes stats at shard level
data-stream-stats Data Streams Stats Regularly samples data streams stats

Keep in mind that each telemetry device may incur a runtime overhead which can skew results.

Expand Down Expand Up @@ -188,3 +189,41 @@ Example of a recorded document::
Supported telemetry parameters:

* ``shard-stats-sample-interval`` (default 60): A positive number greater than zero denoting the sampling interval in seconds.

data-stream-stats
-----------------

The data-stream-stats telemetry device regularly calls the `data stream stats API <https://https://www.elastic.co/guide/en/elasticsearch/reference/master/data-stream-stats-api.html>`_ and records one metrics document for cluster level stats (``_all``), and one metrics document per data stream.

Example of recorded documents given two data streams in the cluster::

{
"data_stream": "_all",
"name": "data-stream-stats",
"shards": {
"total": 4,
"successful_shards": 2,
"failed_shards": 0
},
"data_stream_count": 2,
"backing_indices": 2,
"total_store_size_bytes": 878336
},
{
"name": "data-stream-stats",
"data_stream": "my-data-stream-1",
"backing_indices": 1,
"store_size_bytes": 439137,
"maximum_timestamp": 1579936446448
},
{
"name": "data-stream-stats",
"data_stream": "my-data-stream-2",
"backing_indices": 1,
"store_size_bytes": 439199,
"maximum_timestamp": 1579936446448
}

Supported telemetry parameters:

* ``data-stream-stats-sample-interval`` (default 10): A positive number greater than zero denoting the sampling interval in seconds.
1 change: 1 addition & 0 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ def prepare_telemetry(self, es, enable):
telemetry.ShardStats(telemetry_params, es, self.metrics_store),
telemetry.TransformStats(telemetry_params, es, self.metrics_store),
telemetry.SearchableSnapshotsStats(telemetry_params, es, self.metrics_store),
telemetry.DataStreamStats(telemetry_params, es, self.metrics_store),
]
else:
devices = []
Expand Down
129 changes: 125 additions & 4 deletions esrally/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from esrally import exceptions, metrics, time
from esrally.metrics import MetaInfoScope
from esrally.utils import console, io, opts, process, sysstats
from esrally.utils.versions import components
from esrally.utils.versions import Version


def list_telemetry():
Expand All @@ -45,6 +45,7 @@ def list_telemetry():
TransformStats,
SearchableSnapshotsStats,
ShardStats,
DataStreamStats,
]
]
console.println(tabulate.tabulate(devices, ["Command", "Name", "Description"]))
Expand Down Expand Up @@ -728,9 +729,7 @@ def __init__(self, telemetry_params, clients, metrics_store):
def on_benchmark_start(self):
default_client = self.clients["default"]
distribution_version = default_client.info()["version"]["number"]
major, minor = components(distribution_version)[:2]

if major < 7 or (major == 7 and minor < 2):
if Version.from_string(distribution_version) < Version(major=7, minor=2, patch=0):
console.warn(NodeStats.warning, logger=self.logger)

for cluster_name in self.specified_cluster_names:
Expand Down Expand Up @@ -1267,6 +1266,128 @@ def _match_list_or_pattern(self, idx):
return False


class DataStreamStats(TelemetryDevice):
"""
Collects and pushes data stream stats for the specified cluster to the metric store.
"""

internal = False
command = "data-stream-stats"
human_name = "Data Stream Stats"
help = "Regularly samples data stream stats"

def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
May optionally specify:
``data-stream-stats-sample-interval``: An integer controlling the interval, in seconds,
between collecting samples. Default: 10s.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()

self.telemetry_params = telemetry_params
self.clients = clients
self.specified_cluster_names = self.clients.keys()
self.sample_interval = telemetry_params.get("data-stream-stats-sample-interval", 10)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'data-stream-stats-sample-interval' must be greater than zero " f"but was {self.sample_interval}."
)
self.metrics_store = metrics_store
self.samplers = []

def on_benchmark_start(self):
for cluster_name in self.specified_cluster_names:
recorder = DataStreamStatsRecorder(cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval)
client_info = self.clients[cluster_name].info()
distribution_version = client_info["version"]["number"]
distribution_flavor = client_info["version"].get("build_flavor", "oss")
if Version.from_string(distribution_version) < Version(major=7, minor=9, patch=0):
raise exceptions.SystemSetupError(
"The data-stream-stats telemetry device can only be used with clusters from version 7.9 onwards"
)
if distribution_flavor == "oss":
raise exceptions.SystemSetupError(
"The data-stream-stats telemetry device cannot be used with an OSS distribution of Elasticsearch"
)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.daemon = True
# we don't require starting recorders precisely at the same time
sampler.start()

def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()


class DataStreamStatsRecorder:
"""
Collects and pushes data stream stats for the specified cluster to the metric store.
"""

def __init__(self, cluster_name, client, metrics_store, sample_interval):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: An integer controlling the interval, in seconds, between collecting samples.
"""

self.cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.logger = logging.getLogger(__name__)

def __str__(self):
return "data stream stats"

def record(self):
"""
Collect _data_stream/stats and push to metrics store.
"""
# pylint: disable=import-outside-toplevel
import elasticsearch

try:
sample = self.client.indices.data_streams_stats(name="")
except elasticsearch.TransportError:
msg = f"A transport error occurred while collecting data stream stats on cluster [{self.cluster_name}]"
self.logger.exception(msg)
raise exceptions.RallyError(msg)

data_stream_metadata = {"cluster": self.cluster_name}

doc = {
"data_stream": "_all",
"name": "data-stream-stats",
"shards": {
"total": sample["_shards"]["total"],
"successful_shards": sample["_shards"]["successful"],
"failed_shards": sample["_shards"]["failed"],
},
"data_stream_count": sample["data_stream_count"],
"backing_indices": sample["backing_indices"],
"total_store_size_bytes": sample["total_store_size_bytes"],
}

self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=data_stream_metadata)

for ds in sample["data_streams"]:
doc = {
"name": "data-stream-stats",
"data_stream": ds["data_stream"],
"backing_indices": ds["backing_indices"],
"store_size_bytes": ds["store_size_bytes"],
"maximum_timestamp": ds["maximum_timestamp"],
}
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=data_stream_metadata)


class StartupTime(InternalTelemetryDevice):
def __init__(self, stopwatch=time.StopWatch):
super().__init__()
Expand Down
130 changes: 129 additions & 1 deletion tests/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,12 @@ def info(self):


class SubClient:
def __init__(self, stats=None, info=None, recovery=None, transform_stats=None):
def __init__(self, stats=None, info=None, recovery=None, transform_stats=None, data_streams_stats=None):
self._stats = wrap(stats)
self._info = wrap(info)
self._recovery = wrap(recovery)
self._transform_stats = wrap(transform_stats)
self._data_streams_stats = wrap(data_streams_stats)

def stats(self, *args, **kwargs):
return self._stats()
Expand All @@ -134,6 +135,9 @@ def recovery(self, *args, **kwargs):
def get_transform_stats(self, *args, **kwargs):
return self._transform_stats()

def data_streams_stats(self, *args, **kwargs):
return self._data_streams_stats()


def wrap(it):
return it if callable(it) else ResponseSupplier(it)
Expand Down Expand Up @@ -941,6 +945,130 @@ def test_stores_multi_index_multi_shard_stats(self, metrics_store_put_doc):
)


class DataStreamStatsTests(TestCase):
def test_failure_if_feature_not_implemented_in_version(self):
# Data Streams aren't available prior to 7.9
clients = {"default": Client(info={"version": {"number": "7.6.0"}})}
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
telemetry_params = {"data-stream-stats-sample-interval": random.randint(1, 100)}
t = telemetry.DataStreamStats(telemetry_params, clients, metrics_store)
with self.assertRaisesRegex(
exceptions.SystemSetupError, r"The data-stream-stats telemetry device can only be used with clusters from version 7.9 onwards"
):
t.on_benchmark_start()

def test_failure_if_feature_not_implemented_in_distribution(self):
# Data Streams aren't available with the OSS distribution
clients = {"default": Client(info={"version": {"number": "7.9.0", "build_flavor": "oss"}})}
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
telemetry_params = {"data-stream-stats-sample-interval": random.randint(1, 100)}
t = telemetry.DataStreamStats(telemetry_params, clients, metrics_store)
with self.assertRaisesRegex(
exceptions.SystemSetupError, r"The data-stream-stats telemetry device cannot be used with an OSS distribution of Elasticsearch"
):
t.on_benchmark_start()

def test_negative_sample_interval_forbidden(self):
clients = {"default": Client(), "cluster_b": Client()}
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
telemetry_params = {"data-stream-stats-sample-interval": -1 * random.random()}
with self.assertRaisesRegex(
exceptions.SystemSetupError,
r"The telemetry parameter 'data-stream-stats-sample-interval' must be greater than zero but was .*\.",
):
telemetry.DataStreamStats(telemetry_params, clients, metrics_store)


class DataStreamStatsRecorderTests(TestCase):
data_streams_stats_response = {
"_shards": {"total": 4, "successful": 2, "failed": 0},
"data_stream_count": 2,
"backing_indices": 2,
"total_store_size_bytes": 878336,
"data_streams": [
{"data_stream": "my-data-stream-1", "backing_indices": 1, "store_size_bytes": 439137, "maximum_timestamp": 1579936446448},
{"data_stream": "my-data-stream-2", "backing_indices": 1, "store_size_bytes": 439199, "maximum_timestamp": 1579936446448},
],
}

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_store_multiple_data_stream_stats(self, metrics_store_put_doc):
client = Client(indices=SubClient(data_streams_stats=self.data_streams_stats_response))
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
recorder = telemetry.DataStreamStatsRecorder(
cluster_name="remote", client=client, metrics_store=metrics_store, sample_interval=1 * random.randint(1, 100)
)
recorder.record()

data_stream_metadata = {"cluster": "remote"}

metrics_store_put_doc.assert_has_calls(
[
mock.call(
{
"data_stream": "_all",
"name": "data-stream-stats",
"shards": {"total": 4, "successful_shards": 2, "failed_shards": 0},
"data_stream_count": 2,
"backing_indices": 2,
"total_store_size_bytes": 878336,
},
level=MetaInfoScope.cluster,
meta_data=data_stream_metadata,
),
mock.call(
{
"name": "data-stream-stats",
"data_stream": "my-data-stream-1",
"backing_indices": 1,
"store_size_bytes": 439137,
"maximum_timestamp": 1579936446448,
},
level=MetaInfoScope.cluster,
meta_data=data_stream_metadata,
),
mock.call(
{
"name": "data-stream-stats",
"data_stream": "my-data-stream-2",
"backing_indices": 1,
"store_size_bytes": 439199,
"maximum_timestamp": 1579936446448,
},
level=MetaInfoScope.cluster,
meta_data=data_stream_metadata,
),
],
any_order=True,
)

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_empty_data_streams_list(self, metrics_store_put_doc):
response = {
"_shards": {"total": 0, "successful": 0, "failed": 0},
"data_stream_count": 0,
"backing_indices": 0,
"total_store_size_bytes": 0,
"data_streams": [],
}

client = Client(indices=SubClient(data_streams_stats=response))
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
recorder = telemetry.DataStreamStatsRecorder(
cluster_name="default", client=client, metrics_store=metrics_store, sample_interval=1 * random.randint(1, 100)
)
recorder.record()

# Given an empty list of 'data_streams' we should only be
# sending a total of one metric document containing both the `_shards` and overall stats
self.assertEqual(1, metrics_store_put_doc.call_count)


class ShardStatsTests(TestCase):
@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_stores_single_shard_stats(self, metrics_store_put_doc):
Expand Down