Skip to content

Commit

Permalink
Store race results in a format optimized for reporting
Browse files Browse the repository at this point in the history
Closes #283
  • Loading branch information
danielmitterdorfer committed May 15, 2017
1 parent b3d9c37 commit 464c51f
Show file tree
Hide file tree
Showing 5 changed files with 380 additions and 12 deletions.
68 changes: 65 additions & 3 deletions esrally/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ def metrics_template(self):
def races_template(self):
return self._read("races-template")

def results_template(self):
return self._read("results-template")

def _read(self, template_name):
return open("%s/resources/%s.json" % (self.script_dir, template_name)).read()

Expand Down Expand Up @@ -927,7 +930,7 @@ def race_store(cfg):
"""
if cfg.opts("reporting", "datastore.type") == "elasticsearch":
logger.info("Creating ES race store")
return CompositeRaceStore(EsRaceStore(cfg), FileRaceStore(cfg))
return CompositeRaceStore(EsRaceStore(cfg), EsResultsStore(cfg), FileRaceStore(cfg))
else:
logger.info("Creating file race store")
return FileRaceStore(cfg)
Expand Down Expand Up @@ -1003,6 +1006,9 @@ def results_of_lap_number(self, lap):
return self.lap_results[lap - 1]

def as_dict(self):
"""
:return: A dict representation suitable for persisting this race instance as JSON.
"""
return {
"rally-version": self.rally_version,
"environment": self.environment_name,
Expand All @@ -1017,6 +1023,30 @@ def as_dict(self):
"results": self.results.as_dict()
}

def to_result_dicts(self):
"""
:return: a list of dicts, suitable for persisting the results of this race in a format that is Kibana-friendly.
"""
result_template = {
"environment": self.environment_name,
"trial-timestamp": time.to_iso8601(self.trial_timestamp),
"distribution-version": self.cluster.distribution_version,
"user-tag": self.user_tag,
"track": self.track_name,
"challenge": self.challenge_name,
"car": self.car,
# allow to logically delete records, e.g. for UI purposes when we only want to show the latest result on graphs
"active": True
}
all_results = []

for item in self.results.as_flat_list():
result = result_template.copy()
result.update(item)
all_results.append(result)

return all_results

@classmethod
def from_dict(cls, d):
# Don't restore a few properties like cluster because they (a) cannot be reconstructed easily without knowledge of other modules
Expand Down Expand Up @@ -1053,10 +1083,11 @@ class CompositeRaceStore:
"""
Internal helper class to store races as file and to Elasticsearch in case users want Elasticsearch as a race store.
It provides the same API as RaceStore. It delegates writes to both stores and all read operations only the Elasticsearch race store.
It provides the same API as RaceStore. It delegates writes to all stores and all read operations only the Elasticsearch race store.
"""
def __init__(self, es_store, file_store):
def __init__(self, es_store, es_results_store, file_store):
self.es_store = es_store
self.es_results_store = es_results_store
self.file_store = file_store

def find_by_timestamp(self, timestamp):
Expand All @@ -1065,6 +1096,7 @@ def find_by_timestamp(self, timestamp):
def store_race(self, race):
self.file_store.store_race(race)
self.es_store.store_race(race)
self.es_results_store.store_results(race)

def list(self):
return self.es_store.list()
Expand Down Expand Up @@ -1128,6 +1160,7 @@ def __init__(self, cfg, client_factory_class=EsClientFactory, index_template_pro
:param cfg: The config object. Mandatory.
:param client_factory_class: This parameter is optional and needed for testing.
:param index_template_provider_class: This parameter is optional and needed for testing.
"""
super().__init__(cfg)
self.client = client_factory_class(cfg).create()
Expand Down Expand Up @@ -1193,3 +1226,32 @@ def find_by_timestamp(self, timestamp):
return Race.from_dict(result["hits"]["hits"][0]["_source"])
else:
return None


class EsResultsStore:
"""
Stores the results of a race in a format that is better suited for reporting with Kibana.
"""
INDEX_PREFIX = "rally-results-"
RESULTS_DOC_TYPE = "results"

def __init__(self, cfg, client_factory_class=EsClientFactory, index_template_provider_class=IndexTemplateProvider):
"""
Creates a new results store.
:param cfg: The config object. Mandatory.
:param client_factory_class: This parameter is optional and needed for testing.
:param index_template_provider_class: This parameter is optional and needed for testing.
"""
self.cfg = cfg
self.trial_timestamp = cfg.opts("system", "time.start")
self.client = client_factory_class(cfg).create()
self.index_template_provider = index_template_provider_class(cfg)

def store_results(self, race):
# always update the mapping to the latest version
self.client.put_template("rally-results", self.index_template_provider.results_template())
self.client.bulk_index(index=self.index_name(), doc_type=EsResultsStore.RESULTS_DOC_TYPE, items=race.to_result_dicts())

def index_name(self):
return "%s%04d-%02d" % (EsResultsStore.INDEX_PREFIX, self.trial_timestamp.year, self.trial_timestamp.month)
26 changes: 25 additions & 1 deletion esrally/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,32 @@ def __init__(self, d=None):
def as_dict(self):
return self.__dict__

def as_flat_list(self):
all_results = []
for metric, value in self.as_dict().items():
if metric == "op_metrics":
for item in value:
if "throughput" in item:
all_results.append({"operation": item["operation"], "name": "throughput", "value": item["throughput"]})
if "latency" in item:
all_results.append({"operation": item["operation"], "name": "latency", "value": item["latency"]})
if "service_time" in item:
all_results.append({"operation": item["operation"], "name": "service_time", "value": item["service_time"]})
if "error_rate" in item:
all_results.append({"operation": item["operation"], "name": "error_rate", "value": {"single": item["error_rate"]}})
elif value is not None:
result = {
"name": metric,
"value": {
"single": value
}
}
all_results.append(result)
# sorting is just necessary to have a stable order for tests. As we just have a small number of metrics, the overhead is neglible.
return sorted(all_results, key=lambda m: m["name"])

def v(self, d, k, default=None):
return d[k] if d else default
return d.get(k, default) if d else default

def add_op_metrics(self, operation, throughput, latency, service_time, error_rate):
self.op_metrics.append({
Expand Down
75 changes: 75 additions & 0 deletions esrally/resources/results-template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
"template": "rally-results-*",
"settings": {
"index": {
"refresh_interval": "5s"
}
},
"mappings": {
"_default_": {
"dynamic_templates": [
{
"strings": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
],
"_all": {
"enabled": false
},
"_source": {
"enabled": true
},
"properties": {
"trial-timestamp": {
"type": "date",
"format": "basic_date_time_no_millis",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"active": {
"type": "boolean"
},
"environment": {
"type": "keyword"
},
"user-tag": {
"type": "keyword"
},
"track": {
"type": "keyword"
},
"challenge": {
"type": "keyword"
},
"car": {
"type": "keyword"
},
"distribution-version": {
"type": "keyword"
},
"operation": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"value": {
"type": "object",
"properties": {
"single": {
"type": "float"
}
}
}
}
}
}
}
133 changes: 126 additions & 7 deletions tests/metrics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def metrics_template(self):
def races_template(self):
return "races-test-template"

def results_template(self):
return "results-test-template"


class StaticClock:
NOW = 1453362707
Expand Down Expand Up @@ -623,6 +626,115 @@ def test_store_race(self):
self.es_mock.index.assert_called_with(index="rally-races-2016-01", doc_type="races", item=expected_doc)


class EsResultsStoreTests(TestCase):
TRIAL_TIMESTAMP = datetime.datetime(2016, 1, 31)

def setUp(self):
self.cfg = config.Config()
self.cfg.add(config.Scope.application, "system", "env.name", "unittest")
self.cfg.add(config.Scope.application, "system", "time.start", EsRaceStoreTests.TRIAL_TIMESTAMP)
self.race_store = metrics.EsResultsStore(self.cfg,
client_factory_class=MockClientFactory,
index_template_provider_class=DummyIndexTemplateProvider,
)
# get hold of the mocked client...
self.es_mock = self.race_store.client

def test_store_results(self):
# here we need the real thing
from esrally import reporter
from esrally.mechanic import cluster

schedule = [
track.Task(track.Operation("index", track.OperationType.Index))
]

t = track.Track(name="unittest-track", short_description="unittest track",
source_root_url="http://example.org",
indices=[track.Index(name="tests", auto_managed=True, types=[track.Type(name="test-type", mapping_file=None)])],
challenges=[
track.Challenge(name="index", description="Index", default=True, index_settings=None, schedule=schedule)
])

c = cluster.Cluster([], [], None)
c.distribution_version = "5.0.0"

race = metrics.Race(rally_version="0.4.4", environment_name="unittest", trial_timestamp=EsResultsStoreTests.TRIAL_TIMESTAMP,
pipeline="from-sources", user_tag="let-me-test", track=t, challenge=t.default_challenge, car="4gheap",
total_laps=12,
cluster=c,
lap_results=[],
results=reporter.Stats(
{
"young_gc_time": 100,
"old_gc_time": 5,
"op_metrics": [
{
"operation": "index",
"throughput": {
"min": 1000,
"median": 1250,
"max": 1500,
"unit": "docs/s"
}
}
]
})
)

self.race_store.store_results(race)

expected_docs = [
{
"environment": "unittest",
"trial-timestamp": "20160131T000000Z",
"distribution-version": "5.0.0",
"user-tag": "let-me-test",
"track": "unittest-track",
"challenge": "index",
"car": "4gheap",
"active": True,
"name": "old_gc_time",
"value": {
"single": 5
}
},
{
"environment": "unittest",
"trial-timestamp": "20160131T000000Z",
"distribution-version": "5.0.0",
"user-tag": "let-me-test",
"track": "unittest-track",
"challenge": "index",
"car": "4gheap",
"active": True,
"name": "throughput",
"operation": "index",
"value": {
"min": 1000,
"median": 1250,
"max": 1500,
"unit": "docs/s"
}
},
{
"environment": "unittest",
"trial-timestamp": "20160131T000000Z",
"distribution-version": "5.0.0",
"user-tag": "let-me-test",
"track": "unittest-track",
"challenge": "index",
"car": "4gheap",
"active": True,
"name": "young_gc_time",
"value": {
"single": 100
}
}
]
self.es_mock.bulk_index.assert_called_with(index="rally-results-2016-01", doc_type="results", items=expected_docs)


class InMemoryMetricsStoreTests(TestCase):
def setUp(self):
self.cfg = config.Config()
Expand Down Expand Up @@ -737,8 +849,10 @@ def test_get_error_rate_zero_without_samples(self):
def test_get_error_rate_by_sample_type(self):
self.metrics_store.open(EsMetricsTests.TRIAL_TIMESTAMP, "test", "append-no-conflicts", "defaults", create=True)
self.metrics_store.lap = 1
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Warmup, meta_data={"success": False})
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Normal, meta_data={"success": True})
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Warmup,
meta_data={"success": False})
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Normal,
meta_data={"success": True})

self.metrics_store.close()

Expand All @@ -750,11 +864,16 @@ def test_get_error_rate_by_sample_type(self):
def test_get_error_rate_mixed(self):
self.metrics_store.open(EsMetricsTests.TRIAL_TIMESTAMP, "test", "append-no-conflicts", "defaults", create=True)
self.metrics_store.lap = 1
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Normal, meta_data={"success": True})
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Normal, meta_data={"success": True})
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Normal, meta_data={"success": False})
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Normal, meta_data={"success": True})
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Normal, meta_data={"success": True})
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Normal,
meta_data={"success": True})
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Normal,
meta_data={"success": True})
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Normal,
meta_data={"success": False})
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Normal,
meta_data={"success": True})
self.metrics_store.put_value_cluster_level("service_time", 3.0, "ms", operation="term-query", sample_type=metrics.SampleType.Normal,
meta_data={"success": True})

self.metrics_store.close()

Expand Down
Loading

0 comments on commit 464c51f

Please sign in to comment.