Skip to content

Commit

Permalink
Add data streams telemetry device (#1296)
Browse files Browse the repository at this point in the history
With this commit we add a data streams telemetry device that regularly
samples the count and store size of all data streams within a cluster.

Closes #1161
  • Loading branch information
b-deam authored Jul 13, 2021
1 parent 945aa82 commit 9b6eb28
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 5 deletions.
39 changes: 39 additions & 0 deletions docs/telemetry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ You probably want to gain additional insights from a race. Therefore, we have ad
transform-stats Transform Stats Regularly samples transform stats
searchable-snapshots-stats Searchable Snapshots Stats Regularly samples searchable snapshots stats
shard-stats Shard Stats Regularly samples nodes stats at shard level
data-stream-stats Data Streams Stats Regularly samples data streams stats

Keep in mind that each telemetry device may incur a runtime overhead which can skew results.

Expand Down Expand Up @@ -188,3 +189,41 @@ Example of a recorded document::
Supported telemetry parameters:

* ``shard-stats-sample-interval`` (default 60): A positive number greater than zero denoting the sampling interval in seconds.

data-stream-stats
-----------------

The data-stream-stats telemetry device regularly calls the `data stream stats API <https://https://www.elastic.co/guide/en/elasticsearch/reference/master/data-stream-stats-api.html>`_ and records one metrics document for cluster level stats (``_all``), and one metrics document per data stream.

Example of recorded documents given two data streams in the cluster::

{
"data_stream": "_all",
"name": "data-stream-stats",
"shards": {
"total": 4,
"successful_shards": 2,
"failed_shards": 0
},
"data_stream_count": 2,
"backing_indices": 2,
"total_store_size_bytes": 878336
},
{
"name": "data-stream-stats",
"data_stream": "my-data-stream-1",
"backing_indices": 1,
"store_size_bytes": 439137,
"maximum_timestamp": 1579936446448
},
{
"name": "data-stream-stats",
"data_stream": "my-data-stream-2",
"backing_indices": 1,
"store_size_bytes": 439199,
"maximum_timestamp": 1579936446448
}

Supported telemetry parameters:

* ``data-stream-stats-sample-interval`` (default 10): A positive number greater than zero denoting the sampling interval in seconds.
1 change: 1 addition & 0 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ def prepare_telemetry(self, es, enable):
telemetry.ShardStats(telemetry_params, es, self.metrics_store),
telemetry.TransformStats(telemetry_params, es, self.metrics_store),
telemetry.SearchableSnapshotsStats(telemetry_params, es, self.metrics_store),
telemetry.DataStreamStats(telemetry_params, es, self.metrics_store),
]
else:
devices = []
Expand Down
129 changes: 125 additions & 4 deletions esrally/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from esrally import exceptions, metrics, time
from esrally.metrics import MetaInfoScope
from esrally.utils import console, io, opts, process, sysstats
from esrally.utils.versions import components
from esrally.utils.versions import Version


def list_telemetry():
Expand All @@ -45,6 +45,7 @@ def list_telemetry():
TransformStats,
SearchableSnapshotsStats,
ShardStats,
DataStreamStats,
]
]
console.println(tabulate.tabulate(devices, ["Command", "Name", "Description"]))
Expand Down Expand Up @@ -728,9 +729,7 @@ def __init__(self, telemetry_params, clients, metrics_store):
def on_benchmark_start(self):
default_client = self.clients["default"]
distribution_version = default_client.info()["version"]["number"]
major, minor = components(distribution_version)[:2]

if major < 7 or (major == 7 and minor < 2):
if Version.from_string(distribution_version) < Version(major=7, minor=2, patch=0):
console.warn(NodeStats.warning, logger=self.logger)

for cluster_name in self.specified_cluster_names:
Expand Down Expand Up @@ -1267,6 +1266,128 @@ def _match_list_or_pattern(self, idx):
return False


class DataStreamStats(TelemetryDevice):
"""
Collects and pushes data stream stats for the specified cluster to the metric store.
"""

internal = False
command = "data-stream-stats"
human_name = "Data Stream Stats"
help = "Regularly samples data stream stats"

def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
May optionally specify:
``data-stream-stats-sample-interval``: An integer controlling the interval, in seconds,
between collecting samples. Default: 10s.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()

self.telemetry_params = telemetry_params
self.clients = clients
self.specified_cluster_names = self.clients.keys()
self.sample_interval = telemetry_params.get("data-stream-stats-sample-interval", 10)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'data-stream-stats-sample-interval' must be greater than zero " f"but was {self.sample_interval}."
)
self.metrics_store = metrics_store
self.samplers = []

def on_benchmark_start(self):
for cluster_name in self.specified_cluster_names:
recorder = DataStreamStatsRecorder(cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval)
client_info = self.clients[cluster_name].info()
distribution_version = client_info["version"]["number"]
distribution_flavor = client_info["version"].get("build_flavor", "oss")
if Version.from_string(distribution_version) < Version(major=7, minor=9, patch=0):
raise exceptions.SystemSetupError(
"The data-stream-stats telemetry device can only be used with clusters from version 7.9 onwards"
)
if distribution_flavor == "oss":
raise exceptions.SystemSetupError(
"The data-stream-stats telemetry device cannot be used with an OSS distribution of Elasticsearch"
)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.daemon = True
# we don't require starting recorders precisely at the same time
sampler.start()

def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()


class DataStreamStatsRecorder:
"""
Collects and pushes data stream stats for the specified cluster to the metric store.
"""

def __init__(self, cluster_name, client, metrics_store, sample_interval):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: An integer controlling the interval, in seconds, between collecting samples.
"""

self.cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.logger = logging.getLogger(__name__)

def __str__(self):
return "data stream stats"

def record(self):
"""
Collect _data_stream/stats and push to metrics store.
"""
# pylint: disable=import-outside-toplevel
import elasticsearch

try:
sample = self.client.indices.data_streams_stats(name="")
except elasticsearch.TransportError:
msg = f"A transport error occurred while collecting data stream stats on cluster [{self.cluster_name}]"
self.logger.exception(msg)
raise exceptions.RallyError(msg)

data_stream_metadata = {"cluster": self.cluster_name}

doc = {
"data_stream": "_all",
"name": "data-stream-stats",
"shards": {
"total": sample["_shards"]["total"],
"successful_shards": sample["_shards"]["successful"],
"failed_shards": sample["_shards"]["failed"],
},
"data_stream_count": sample["data_stream_count"],
"backing_indices": sample["backing_indices"],
"total_store_size_bytes": sample["total_store_size_bytes"],
}

self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=data_stream_metadata)

for ds in sample["data_streams"]:
doc = {
"name": "data-stream-stats",
"data_stream": ds["data_stream"],
"backing_indices": ds["backing_indices"],
"store_size_bytes": ds["store_size_bytes"],
"maximum_timestamp": ds["maximum_timestamp"],
}
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=data_stream_metadata)


class StartupTime(InternalTelemetryDevice):
def __init__(self, stopwatch=time.StopWatch):
super().__init__()
Expand Down
130 changes: 129 additions & 1 deletion tests/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,12 @@ def info(self):


class SubClient:
def __init__(self, stats=None, info=None, recovery=None, transform_stats=None):
def __init__(self, stats=None, info=None, recovery=None, transform_stats=None, data_streams_stats=None):
self._stats = wrap(stats)
self._info = wrap(info)
self._recovery = wrap(recovery)
self._transform_stats = wrap(transform_stats)
self._data_streams_stats = wrap(data_streams_stats)

def stats(self, *args, **kwargs):
return self._stats()
Expand All @@ -134,6 +135,9 @@ def recovery(self, *args, **kwargs):
def get_transform_stats(self, *args, **kwargs):
return self._transform_stats()

def data_streams_stats(self, *args, **kwargs):
return self._data_streams_stats()


def wrap(it):
return it if callable(it) else ResponseSupplier(it)
Expand Down Expand Up @@ -941,6 +945,130 @@ def test_stores_multi_index_multi_shard_stats(self, metrics_store_put_doc):
)


class DataStreamStatsTests(TestCase):
def test_failure_if_feature_not_implemented_in_version(self):
# Data Streams aren't available prior to 7.9
clients = {"default": Client(info={"version": {"number": "7.6.0"}})}
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
telemetry_params = {"data-stream-stats-sample-interval": random.randint(1, 100)}
t = telemetry.DataStreamStats(telemetry_params, clients, metrics_store)
with self.assertRaisesRegex(
exceptions.SystemSetupError, r"The data-stream-stats telemetry device can only be used with clusters from version 7.9 onwards"
):
t.on_benchmark_start()

def test_failure_if_feature_not_implemented_in_distribution(self):
# Data Streams aren't available with the OSS distribution
clients = {"default": Client(info={"version": {"number": "7.9.0", "build_flavor": "oss"}})}
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
telemetry_params = {"data-stream-stats-sample-interval": random.randint(1, 100)}
t = telemetry.DataStreamStats(telemetry_params, clients, metrics_store)
with self.assertRaisesRegex(
exceptions.SystemSetupError, r"The data-stream-stats telemetry device cannot be used with an OSS distribution of Elasticsearch"
):
t.on_benchmark_start()

def test_negative_sample_interval_forbidden(self):
clients = {"default": Client(), "cluster_b": Client()}
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
telemetry_params = {"data-stream-stats-sample-interval": -1 * random.random()}
with self.assertRaisesRegex(
exceptions.SystemSetupError,
r"The telemetry parameter 'data-stream-stats-sample-interval' must be greater than zero but was .*\.",
):
telemetry.DataStreamStats(telemetry_params, clients, metrics_store)


class DataStreamStatsRecorderTests(TestCase):
data_streams_stats_response = {
"_shards": {"total": 4, "successful": 2, "failed": 0},
"data_stream_count": 2,
"backing_indices": 2,
"total_store_size_bytes": 878336,
"data_streams": [
{"data_stream": "my-data-stream-1", "backing_indices": 1, "store_size_bytes": 439137, "maximum_timestamp": 1579936446448},
{"data_stream": "my-data-stream-2", "backing_indices": 1, "store_size_bytes": 439199, "maximum_timestamp": 1579936446448},
],
}

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_store_multiple_data_stream_stats(self, metrics_store_put_doc):
client = Client(indices=SubClient(data_streams_stats=self.data_streams_stats_response))
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
recorder = telemetry.DataStreamStatsRecorder(
cluster_name="remote", client=client, metrics_store=metrics_store, sample_interval=1 * random.randint(1, 100)
)
recorder.record()

data_stream_metadata = {"cluster": "remote"}

metrics_store_put_doc.assert_has_calls(
[
mock.call(
{
"data_stream": "_all",
"name": "data-stream-stats",
"shards": {"total": 4, "successful_shards": 2, "failed_shards": 0},
"data_stream_count": 2,
"backing_indices": 2,
"total_store_size_bytes": 878336,
},
level=MetaInfoScope.cluster,
meta_data=data_stream_metadata,
),
mock.call(
{
"name": "data-stream-stats",
"data_stream": "my-data-stream-1",
"backing_indices": 1,
"store_size_bytes": 439137,
"maximum_timestamp": 1579936446448,
},
level=MetaInfoScope.cluster,
meta_data=data_stream_metadata,
),
mock.call(
{
"name": "data-stream-stats",
"data_stream": "my-data-stream-2",
"backing_indices": 1,
"store_size_bytes": 439199,
"maximum_timestamp": 1579936446448,
},
level=MetaInfoScope.cluster,
meta_data=data_stream_metadata,
),
],
any_order=True,
)

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_empty_data_streams_list(self, metrics_store_put_doc):
response = {
"_shards": {"total": 0, "successful": 0, "failed": 0},
"data_stream_count": 0,
"backing_indices": 0,
"total_store_size_bytes": 0,
"data_streams": [],
}

client = Client(indices=SubClient(data_streams_stats=response))
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
recorder = telemetry.DataStreamStatsRecorder(
cluster_name="default", client=client, metrics_store=metrics_store, sample_interval=1 * random.randint(1, 100)
)
recorder.record()

# Given an empty list of 'data_streams' we should only be
# sending a total of one metric document containing both the `_shards` and overall stats
self.assertEqual(1, metrics_store_put_doc.call_count)


class ShardStatsTests(TestCase):
@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_stores_single_shard_stats(self, metrics_store_put_doc):
Expand Down

0 comments on commit 9b6eb28

Please sign in to comment.