Skip to content

Commit

Permalink
Simplify ML metrics structure
Browse files Browse the repository at this point in the history
With this commit we simplify the structure of metrics records as well as
their flattened representation in the results and race structures.

Relates #572
  • Loading branch information
danielmitterdorfer committed Sep 19, 2018
1 parent 87d7625 commit 89a1829
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 47 deletions.
11 changes: 6 additions & 5 deletions esrally/mechanic/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,11 +1166,12 @@ def detach_from_cluster(self, cluster):
for job in results["aggregations"]["jobs"]["buckets"]:
ml_job_stats = collections.OrderedDict()
ml_job_stats["name"] = "ml_processing_time"
ml_job_stats["job_name"] = job["key"]
ml_job_stats["min_millis"] = job["min_pt"]["value"]
ml_job_stats["mean_millis"] = job["mean_pt"]["value"]
ml_job_stats["median_millis"] = job["median_pt"]["values"]["50.0"]
ml_job_stats["max_millis"] = job["max_pt"]["value"]
ml_job_stats["job"] = job["key"]
ml_job_stats["min"] = job["min_pt"]["value"]
ml_job_stats["mean"] = job["mean_pt"]["value"]
ml_job_stats["median"] = job["median_pt"]["values"]["50.0"]
ml_job_stats["max"] = job["max_pt"]["value"]
ml_job_stats["unit"] = "ms"
self.metrics_store.put_doc(doc=dict(ml_job_stats), level=MetaInfoScope.cluster)
except KeyError:
# no ML running
Expand Down
66 changes: 46 additions & 20 deletions esrally/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def __call__(self):
result.merge_part_time_points = self.sum("merge_parts_total_time_points")

self.logger.debug("Gathering ML max processing times.")
result.ml_processing_time = self.all("ml_processing_time")
result.ml_processing_time = self.ml_processing_time_stats()

self.logger.debug("Gathering CPU usage metrics.")
result.median_cpu_usage = self.median("cpu_utilization_1s", sample_type=metrics.SampleType.Normal)
Expand Down Expand Up @@ -195,9 +195,6 @@ def sum(self, metric_name):
def one(self, metric_name):
return self.store.get_one(metric_name, lap=self.lap)

def all(self, metric_name):
return self.store.get_raw(metric_name, lap=self.lap)

def summary_stats(self, metric_name, task_name):
median = self.store.get_median(metric_name, task=task_name, sample_type=metrics.SampleType.Normal, lap=self.lap)
unit = self.store.get_unit(metric_name, task=task_name)
Expand Down Expand Up @@ -231,6 +228,21 @@ def shard_stats(self, metric_name):
else:
return {}

def ml_processing_time_stats(self):
values = self.store.get_raw("ml_processing_time")
result = []
if values:
for v in values:
result.append({
"job": v["job"],
"min": v["min"],
"mean": v["mean"],
"median": v["median"],
"max": v["max"],
"unit": v["unit"]
})
return result

def error_rate(self, task_name):
return self.store.get_error_rate(task=task_name, sample_type=metrics.SampleType.Normal, lap=self.lap)

Expand Down Expand Up @@ -272,7 +284,7 @@ def __init__(self, d=None):
self.flush_time_per_shard = self.v(d, "flush_time_per_shard", default={})
self.merge_throttle_time = self.v(d, "merge_throttle_time")
self.merge_throttle_time_per_shard = self.v(d, "merge_throttle_time_per_shard", default={})
self.ml_processing_time = self.v(d, "ml_processing_time")
self.ml_processing_time = self.v(d, "ml_processing_time", default=[])

self.merge_part_time_postings = self.v(d, "merge_part_time_postings")
self.merge_part_time_stored_fields = self.v(d, "merge_part_time_stored_fields")
Expand Down Expand Up @@ -321,6 +333,18 @@ def as_flat_list(self):
all_results.append(
{"task": item["task"], "operation": item["operation"], "name": "error_rate",
"value": {"single": item["error_rate"]}})
elif metric == "ml_processing_time":
for item in value:
all_results.append({
"job": item["job"],
"name": "ml_processing_time",
"value": {
"min": item["min"],
"mean": item["mean"],
"median": item["median"],
"max": item["max"]
}
})
elif metric == "node_metrics":
for item in value:
if "startup_time" in item:
Expand Down Expand Up @@ -525,11 +549,12 @@ def report_merge_part_times(self, stats):
def report_ml_processing_times(self, stats):
lines = []
for processing_time in stats.ml_processing_time:
job_name = processing_time["job_name"]
lines.append(self.line("Min ML processing time", job_name, processing_time["min_millis"], "s", convert.ms_to_seconds)),
lines.append(self.line("Mean ML processing time", job_name, processing_time["mean_millis"], "s", convert.ms_to_seconds)),
lines.append(self.line("Median ML processing time", job_name, processing_time["median_millis"], "s", convert.ms_to_seconds)),
lines.append(self.line("Max ML processing time", job_name, processing_time["max_millis"], "s", convert.ms_to_seconds))
job_name = processing_time["job"]
unit = processing_time["unit"]
lines.append(self.line("Min ML processing time", job_name, processing_time["min"], unit)),
lines.append(self.line("Mean ML processing time", job_name, processing_time["mean"], unit)),
lines.append(self.line("Median ML processing time", job_name, processing_time["median"], unit)),
lines.append(self.line("Max ML processing time", job_name, processing_time["max"], unit))
return lines

def report_cpu_usage(self, stats):
Expand Down Expand Up @@ -714,18 +739,19 @@ def report_merge_part_times(self, baseline_stats, contender_stats):
def report_ml_processing_times(self, baseline_stats, contender_stats):
lines = []
for baseline in baseline_stats.ml_processing_time:
job_name = baseline["job_name"]
job_name = baseline["job"]
unit = baseline["unit"]
# O(n^2) but we assume here only a *very* limited number of jobs (usually just one)
for contender in contender_stats.ml_processing_time:
if contender["job_name"] == job_name:
lines.append(self.line("Min ML processing time", baseline["min_millis"], contender["min_millis"],
job_name, "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds))
lines.append(self.line("Mean ML processing time", baseline["mean_millis"], contender["mean_millis"],
job_name, "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds))
lines.append(self.line("Median ML processing time", baseline["median_millis"], contender["median_millis"],
job_name, "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds))
lines.append(self.line("Max ML processing time", baseline["max_millis"], contender["max_millis"],
job_name, "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds))
if contender["job"] == job_name:
lines.append(self.line("Min ML processing time", baseline["min"], contender["min"],
job_name, unit, treat_increase_as_improvement=False))
lines.append(self.line("Mean ML processing time", baseline["mean"], contender["mean"],
job_name, unit, treat_increase_as_improvement=False))
lines.append(self.line("Median ML processing time", baseline["median"], contender["median"],
job_name, unit, treat_increase_as_improvement=False))
lines.append(self.line("Max ML processing time", baseline["max"], contender["max"],
job_name, unit, treat_increase_as_improvement=False))
return lines

def report_total_times(self, baseline_stats, contender_stats):
Expand Down
15 changes: 15 additions & 0 deletions esrally/resources/metrics-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@
"value": {
"type": "float"
},
"min": {
"type": "float"
},
"max": {
"type": "float"
},
"mean": {
"type": "float"
},
"median": {
"type": "float"
},
"unit": {
"type": "keyword"
},
Expand All @@ -80,6 +92,9 @@
},
"operation-type": {
"type": "keyword"
},
"job": {
"type": "keyword"
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions esrally/resources/results-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@
"operation": {
"type": "keyword"
},
"job": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
Expand All @@ -86,6 +89,9 @@
"min": {
"type": "double"
},
"mean": {
"type": "double"
},
"median": {
"type": "double"
},
Expand Down
22 changes: 12 additions & 10 deletions tests/mechanic/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2046,19 +2046,21 @@ def test_result_is_stored(self, es, metrics_store_put_doc):
metrics_store_put_doc.assert_has_calls([
mock.call(doc={
"name": "ml_processing_time",
"job_name": "benchmark_ml_job_1",
"min_millis": 2.2,
"mean_millis": 12.3,
"median_millis": 17.2,
"max_millis": 36.0
"job": "benchmark_ml_job_1",
"min": 2.2,
"mean": 12.3,
"median": 17.2,
"max": 36.0,
"unit": "ms"
}, level=metrics.MetaInfoScope.cluster),
mock.call(doc={
"name": "ml_processing_time",
"job_name": "benchmark_ml_job_2",
"min_millis": 32.2,
"mean_millis": 78.3,
"median_millis": 37.4,
"max_millis": 226.3
"job": "benchmark_ml_job_2",
"min": 32.2,
"mean": 78.3,
"median": 37.4,
"max": 226.3,
"unit": "ms"
}, level=metrics.MetaInfoScope.cluster)
])

Expand Down
64 changes: 52 additions & 12 deletions tests/reporter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ def test_calculate_simple_index_stats(self):
meta_data={"success": True})
store.put_doc(doc={
"name": "ml_processing_time",
"job_name": "benchmark_ml_job_1",
"min_millis": 2.2,
"mean_millis": 12.3,
"median_millis": 17.2,
"max_millis": 36.0
"job": "benchmark_ml_job_1",
"min": 2.2,
"mean": 12.3,
"median": 17.2,
"max": 36.0,
"unit": "ms"
}, level=metrics.MetaInfoScope.cluster)
store.put_count_node_level("rally-node-0", "final_index_size_bytes", 2048, unit="bytes")
store.put_count_node_level("rally-node-1", "final_index_size_bytes", 4096, unit="bytes")
Expand All @@ -68,16 +69,17 @@ def test_calculate_simple_index_stats(self):

self.assertEqual(6144, stats.index_size)
self.assertEqual(1, len(stats.ml_processing_time))
self.assertEqual("benchmark_ml_job_1", stats.ml_processing_time[0]["job_name"])
self.assertEqual(2.2, stats.ml_processing_time[0]["min_millis"])
self.assertEqual(12.3, stats.ml_processing_time[0]["mean_millis"])
self.assertEqual(17.2, stats.ml_processing_time[0]["median_millis"])
self.assertEqual(36.0, stats.ml_processing_time[0]["max_millis"])
self.assertEqual("benchmark_ml_job_1", stats.ml_processing_time[0]["job"])
self.assertEqual(2.2, stats.ml_processing_time[0]["min"])
self.assertEqual(12.3, stats.ml_processing_time[0]["mean"])
self.assertEqual(17.2, stats.ml_processing_time[0]["median"])
self.assertEqual(36.0, stats.ml_processing_time[0]["max"])
self.assertEqual("ms", stats.ml_processing_time[0]["unit"])


def select(l, name, operation=None, node=None):
def select(l, name, operation=None, job=None, node=None):
for item in l:
if item["name"] == name and item.get("operation") == operation and item.get("node") == node:
if item["name"] == name and item.get("operation") == operation and item.get("node") == node and item.get("job") == job:
return item
return None

Expand Down Expand Up @@ -116,6 +118,22 @@ def test_as_flat_list(self):
"startup_time": 4.2
}
],
"ml_processing_time": [
{
"job": "job_1",
"min": 3.3,
"mean": 5.2,
"median": 5.8,
"max": 12.34
},
{
"job": "job_2",
"min": 3.55,
"mean": 4.2,
"median": 4.9,
"max": 9.4
},
],
"young_gc_time": 68,
"old_gc_time": 0,
"refresh_time": 596,
Expand Down Expand Up @@ -188,6 +206,28 @@ def test_as_flat_list(self):
}
}, select(metric_list, "startup_time", node="rally-node-1"))

self.assertEqual({
"name": "ml_processing_time",
"job": "job_1",
"value": {
"min": 3.3,
"mean": 5.2,
"median": 5.8,
"max": 12.34
}
}, select(metric_list, "ml_processing_time", job="job_1"))

self.assertEqual({
"name": "ml_processing_time",
"job": "job_2",
"value": {
"min": 3.55,
"mean": 4.2,
"median": 4.9,
"max": 9.4
}
}, select(metric_list, "ml_processing_time", job="job_2"))

self.assertEqual({
"name": "young_gc_time",
"value": {
Expand Down

0 comments on commit 89a1829

Please sign in to comment.