Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate system metrics per node #803

Merged
5 changes: 5 additions & 0 deletions docs/migrate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ Migration Guide
Migrating to Rally 1.4.0
------------------------

Index size and Total Written are not contained in the command line report
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/contained/included ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'll change this accordingly before merging.

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Elasticsearch nodes are now managed independently of benchmark execution and thus all system metrics ("index size" and "total written") may be determined after the command line report has been written. The corresponding metrics (``final_index_size_bytes`` and ``disk_io_write_bytes``) are still written to the Elasticsearch metrics store if one is configured.

Node details are omitted from race metadata
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
12 changes: 0 additions & 12 deletions docs/summary_report.rst
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,6 @@ Total Old Gen GC
* **Definition**: The total runtime of the old generation garbage collector across the whole cluster as reported by the node stats API.
* **Corresponding metrics key**: ``node_total_old_gen_gc_time``

Index size
----------

* **Definition**: Final resulting index size on the file system after all nodes have been shutdown at the end of the benchmark. It includes all files in the nodes' data directories (actual index files and translog).
* **Corresponding metrics key**: ``final_index_size_bytes``

Store size
----------

Expand All @@ -132,12 +126,6 @@ Translog size
* **Definition**: The size in bytes of the translog as reported by the indices stats API.
* **Corresponding metrics key**: ``translog_size_in_bytes``

Total written
---------------

* **Definition**: number of bytes that have been written to disk during the benchmark. On Linux this metric reports only the bytes that have been written by Elasticsearch, on Mac OS X it reports the number of bytes written by all processes.
* **Corresponding metrics key**: ``disk_io_write_bytes``

Heap used for ``X``
-------------------

Expand Down
7 changes: 2 additions & 5 deletions esrally/chart_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,10 @@ def io(title, environment, race_config):
{
"id": "1",
"enabled": True,
"type": "median",
"type": "sum",
"schema": "metric",
"params": {
"field": "value.single",
"percents": [
50
],
"customLabel": "[Bytes]"
}
},
Expand Down Expand Up @@ -936,7 +933,7 @@ def io(title, environment, race_config):
"metrics": [
{
"id": str(uuid.uuid4()),
"type": "avg",
"type": "sum",
"field": "value.single"
}
],
Expand Down
9 changes: 3 additions & 6 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@ class PrepareBenchmark:
Initiates preparation steps for a benchmark. The benchmark should only be started after StartBenchmark is sent.
"""

def __init__(self, config, track, metrics_meta_info):
def __init__(self, config, track):
"""
:param config: Rally internal configuration object.
:param track: The track to use.
:param metrics_meta_info: meta info for the metrics store.
"""
self.config = config
self.track = track
self.metrics_meta_info = metrics_meta_info


class StartBenchmark:
Expand Down Expand Up @@ -211,7 +209,7 @@ def receiveUnrecognizedMessage(self, msg, sender):
def receiveMsg_PrepareBenchmark(self, msg, sender):
self.start_sender = sender
self.coordinator = Driver(self, msg.config)
self.coordinator.prepare_benchmark(msg.track, msg.metrics_meta_info)
self.coordinator.prepare_benchmark(msg.track)

@actor.no_retry("driver")
def receiveMsg_StartBenchmark(self, msg, sender):
Expand Down Expand Up @@ -439,15 +437,14 @@ def retrieve_cluster_info(self, es):
self.logger.exception("Could not retrieve cluster info on benchmark start")
return None

def prepare_benchmark(self, t, metrics_meta_info):
def prepare_benchmark(self, t):
self.track = t
self.challenge = select_challenge(self.config, self.track)
self.quiet = self.config.opts("system", "quiet.mode", mandatory=False, default_value=False)
self.throughput_calculator = ThroughputCalculator()
self.metrics_store = metrics.metrics_store(cfg=self.config,
track=self.track.name,
challenge=self.challenge.name,
meta_info=metrics_meta_info,
read_only=False)
es_clients = self.create_es_clients()
self.wait_for_rest_api(es_clients)
Expand Down
131 changes: 59 additions & 72 deletions esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ def for_nodes(self, all_node_ips=None, ip=None, port=None, node_ids=None):


class EngineStarted:
def __init__(self, system_meta_info, team_revision):
self.system_meta_info = system_meta_info
def __init__(self, team_revision):
self.team_revision = team_revision


Expand All @@ -84,8 +83,7 @@ class StopEngine:


class EngineStopped:
def __init__(self, system_metrics):
self.system_metrics = system_metrics
pass


class ResetRelativeTime:
Expand Down Expand Up @@ -115,22 +113,15 @@ def __init__(self, cfg, open_metrics_context, cluster_settings, sources, build,


class NodesStarted:
def __init__(self, system_meta_info):
"""
Creates a new NodesStarted message.

:param system_meta_info:
"""
self.system_meta_info = system_meta_info
pass


class StopNodes:
pass


class NodesStopped:
def __init__(self, system_metrics):
self.system_metrics = system_metrics
pass


def cluster_distribution_version(cfg, client_factory=client.EsClientFactory):
Expand Down Expand Up @@ -189,16 +180,13 @@ def nodes_by_host(ip_port_pairs):
class MechanicActor(actor.RallyActor):
WAKEUP_RESET_RELATIVE_TIME = "relative_time"

WAKEUP_FLUSH_METRICS = "flush_metrics"

"""
This actor coordinates all associated mechanics on remote hosts (which do the actual work).
"""

def __init__(self):
super().__init__()
self.cfg = None
self.metrics_store = None
self.race_control = None
self.cluster_launcher = None
self.cluster = None
Expand Down Expand Up @@ -232,10 +220,8 @@ def receiveMsg_StartEngine(self, msg, sender):
self.logger.info("Received signal from race control to start engine.")
self.race_control = sender
self.cfg = msg.cfg
cls = metrics.metrics_store_class(self.cfg)
self.metrics_store = cls(self.cfg)
self.metrics_store.open(ctx=msg.open_metrics_context)
self.car, _ = load_team(self.cfg, msg.external)
# TODO: This is implicitly set by #load_team() - can we gather this elsewhere?
self.team_revision = self.cfg.opts("mechanic", "repository.revision")

# In our startup procedure we first create all mechanics. Only if this succeeds we'll continue.
Expand Down Expand Up @@ -269,8 +255,6 @@ def receiveMsg_StartEngine(self, msg, sender):

@actor.no_retry("mechanic")
def receiveMsg_NodesStarted(self, msg, sender):
self.metrics_store.merge_meta_info(msg.system_meta_info)

# Initially the addresses of the children are not
# known and there is just a None placeholder in the
# array. As addresses become known, fill them in.
Expand All @@ -291,10 +275,6 @@ def receiveMsg_ResetRelativeTime(self, msg, sender):
def receiveMsg_WakeupMessage(self, msg, sender):
if msg.payload == MechanicActor.WAKEUP_RESET_RELATIVE_TIME:
self.reset_relative_time()
elif msg.payload == MechanicActor.WAKEUP_FLUSH_METRICS:
self.logger.debug("Flushing cluster-wide system metrics store.")
self.metrics_store.flush(refresh=False)
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS, payload=MechanicActor.WAKEUP_FLUSH_METRICS)
else:
raise exceptions.RallyAssertionError("Unknown wakeup reason [{}]".format(msg.payload))

Expand All @@ -312,22 +292,17 @@ def receiveMsg_StopEngine(self, msg, sender):

@actor.no_retry("mechanic")
def receiveMsg_NodesStopped(self, msg, sender):
self.metrics_store.bulk_add(msg.system_metrics)
self.transition_when_all_children_responded(sender, msg, "cluster_stopping", "cluster_stopped", self.on_all_nodes_stopped)

def on_all_nodes_started(self):
self.send(self.race_control, EngineStarted(self.metrics_store.meta_info, self.team_revision))
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS, payload=MechanicActor.WAKEUP_FLUSH_METRICS)
self.send(self.race_control, EngineStarted(self.team_revision))

def reset_relative_time(self):
self.logger.info("Resetting relative time of cluster system metrics store.")
self.metrics_store.reset_relative_time()
for m in self.children:
self.send(m, ResetRelativeTime(0))

def on_all_nodes_stopped(self):
self.metrics_store.flush(refresh=False)
self.send(self.race_control, EngineStopped(self.metrics_store.to_externalizable(clear=True)))
self.send(self.race_control, EngineStopped())
# clear all state as the mechanic might get reused later
for m in self.children:
self.send(m, thespian.actors.ActorExitRequest())
Expand Down Expand Up @@ -426,10 +401,7 @@ class NodeMechanicActor(actor.RallyActor):

def __init__(self):
super().__init__()
self.config = None
self.metrics_store = None
self.mechanic = None
self.running = False
self.host = None

def receiveMsg_StartNodes(self, msg, sender):
Expand All @@ -441,32 +413,31 @@ def receiveMsg_StartNodes(self, msg, sender):
self.logger.info("Starting node(s) %s on [%s].", msg.node_ids, msg.ip)

# Load node-specific configuration
self.config = config.auto_load_local_config(msg.cfg, additional_sections=[
cfg = config.auto_load_local_config(msg.cfg, additional_sections=[
# only copy the relevant bits
"track", "mechanic", "client", "telemetry",
# allow metrics store to extract race meta-data
"race",
"source"
])
# set root path (normally done by the main entry point)
self.config.add(config.Scope.application, "node", "rally.root", paths.rally_root())
cfg.add(config.Scope.application, "node", "rally.root", paths.rally_root())
if not msg.external:
self.config.add(config.Scope.benchmark, "provisioning", "node.ip", msg.ip)
cfg.add(config.Scope.benchmark, "provisioning", "node.ip", msg.ip)
# we need to override the port with the value that the user has specified instead of using the default value (39200)
self.config.add(config.Scope.benchmark, "provisioning", "node.http.port", msg.port)
self.config.add(config.Scope.benchmark, "provisioning", "node.ids", msg.node_ids)
cfg.add(config.Scope.benchmark, "provisioning", "node.http.port", msg.port)
cfg.add(config.Scope.benchmark, "provisioning", "node.ids", msg.node_ids)

cls = metrics.metrics_store_class(self.config)
self.metrics_store = cls(self.config)
self.metrics_store.open(ctx=msg.open_metrics_context)
cls = metrics.metrics_store_class(cfg)
metrics_store = cls(cfg)
metrics_store.open(ctx=msg.open_metrics_context)
# avoid follow-up errors in case we receive an unexpected ActorExitRequest due to an early failure in a parent actor.

self.mechanic = create(self.config, self.metrics_store, msg.all_node_ips, msg.cluster_settings, msg.sources, msg.build,
self.mechanic = create(cfg, metrics_store, msg.all_node_ips, msg.cluster_settings, msg.sources, msg.build,
msg.distribution, msg.external, msg.docker)
nodes = self.mechanic.start_engine()
self.running = True
self.mechanic.start_engine()
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS)
self.send(getattr(msg, "reply_to", sender), NodesStarted(self.metrics_store.meta_info))
self.send(getattr(msg, "reply_to", sender), NodesStarted())
except Exception:
self.logger.exception("Cannot process message [%s]", msg)
# avoid "can't pickle traceback objects"
Expand All @@ -487,34 +458,20 @@ def receiveUnrecognizedMessage(self, msg, sender):
# noinspection PyBroadException
try:
self.logger.debug("NodeMechanicActor#receiveMessage(msg = [%s] sender = [%s])", str(type(msg)), str(sender))
if isinstance(msg, ResetRelativeTime):
self.logger.info("Resetting relative time of system metrics store on host [%s].", self.host)
self.metrics_store.reset_relative_time()
elif isinstance(msg, thespian.actors.WakeupMessage):
if self.running:
self.logger.debug("Flushing system metrics store on host [%s].", self.host)
self.metrics_store.flush(refresh=False)
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS)
if isinstance(msg, ResetRelativeTime) and self.mechanic:
self.mechanic.reset_relative_time()
elif isinstance(msg, thespian.actors.WakeupMessage) and self.mechanic:
self.mechanic.flush_metrics()
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS)
elif isinstance(msg, StopNodes):
self.logger.info("Stopping nodes %s.", self.mechanic.nodes)
self.mechanic.stop_engine()
self.metrics_store.flush(refresh=False)
self.send(sender, NodesStopped(self.metrics_store.to_externalizable(clear=True)))
# clear all state as the mechanic might get reused later
self.metrics_store.close()
# TODO: Run the reporter (StatsCalculator) here to calculate summary stats and save them to the
# metrics store (no command line output though!)
self.running = False
self.config = None
self.send(sender, NodesStopped())
self.mechanic = None
self.metrics_store = None
elif isinstance(msg, thespian.actors.ActorExitRequest):
if self.running:
self.logger.info("Stopping nodes %s (due to ActorExitRequest)", self.mechanic.nodes)
if self.mechanic:
self.mechanic.stop_engine()
self.running = False
self.mechanic = None
except BaseException as e:
self.running = False
self.logger.exception("Cannot process message [%s]", msg)
self.send(getattr(msg, "reply_to", sender), actor.BenchmarkFailure("Error on host %s" % str(self.host), e))

Expand Down Expand Up @@ -563,7 +520,7 @@ def create(cfg, metrics_store, all_node_ips, cluster_settings=None, sources=Fals
# It is a programmer error (and not a user error) if this function is called with wrong parameters
raise RuntimeError("One of sources, distribution, docker or external must be True")

return Mechanic(s, p, l)
return Mechanic(cfg, metrics_store, s, p, l)


class Mechanic:
Expand All @@ -572,11 +529,14 @@ class Mechanic:
running the benchmark).
"""

def __init__(self, supply, provisioners, l):
def __init__(self, cfg, metrics_store, supply, provisioners, launcher):
self.cfg = cfg
self.metrics_store = metrics_store
self.supply = supply
self.provisioners = provisioners
self.launcher = l
self.launcher = launcher
self.nodes = []
self.logger = logging.getLogger(__name__)

def start_engine(self):
binaries = self.supply()
Expand All @@ -586,8 +546,35 @@ def start_engine(self):
self.nodes = self.launcher.start(node_configs)
return self.nodes

def reset_relative_time(self):
self.logger.info("Resetting relative time of system metrics store.")
self.metrics_store.reset_relative_time()

def flush_metrics(self, refresh=False):
self.logger.debug("Flushing system metrics.")
self.metrics_store.flush(refresh=refresh)

def stop_engine(self):
self.logger.info("Stopping nodes %s.", self.nodes)
self.launcher.stop(self.nodes)
self.flush_metrics(refresh=True)
try:
current_race = self._current_race()
for node in self.nodes:
self._add_results(current_race, node)
except exceptions.NotFound as e:
self.logger.warning("Cannot store system metrics: %s.", str(e))

self.metrics_store.close()
self.nodes = []
for p in self.provisioners:
p.cleanup()

def _current_race(self):
race_id = self.cfg.opts("system", "race.id")
return metrics.race_store(self.cfg).find_by_race_id(race_id)

def _add_results(self, current_race, node):
results = metrics.calculate_system_results(self.metrics_store, node.node_name)
current_race.add_results(results)
metrics.results_store(self.cfg).store_results(current_race)
Loading