Skip to content

Commit

Permalink
Improve module structure
Browse files Browse the repository at this point in the history
Closes #129
  • Loading branch information
danielmitterdorfer committed Oct 7, 2016
1 parent 1ec82a5 commit 35f1578
Show file tree
Hide file tree
Showing 24 changed files with 592 additions and 668 deletions.
2 changes: 1 addition & 1 deletion esrally/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ def _is_set(self, client_opts, k):
return False

def create(self):
return self.client
return self.client
21 changes: 13 additions & 8 deletions esrally/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,22 +158,27 @@ def receiveMessage(self, msg, sender):
self.send(self.myAddress, thespian.actors.ActorExitRequest())

def start_benchmark(self, msg, sender):
logger.info("Benchmark is about to start.")
self.start_sender = sender
self.config = msg.config
current_track = msg.track

logger.info("Preparing track")
# TODO #71: Reconsider this in case we distribute drivers. *For now* the driver will only be on a single machine, so we're safe.
track.prepare_track(current_track, self.config)

logger.info("Benchmark is about to start.")
self.quiet = msg.config.opts("system", "quiet.mode", mandatory=False, default_value=False)
self.es = client.EsClientFactory(msg.config.opts("client", "hosts"), msg.config.opts("client", "options")).create()
self.metrics_store = metrics.InMemoryMetricsStore(config=self.config, meta_info=msg.metrics_meta_info)
invocation = self.config.opts("meta", "time.start")
expected_cluster_health = msg.config.opts("benchmarks", "cluster.health")
track_name = self.config.opts("system", "track")
track_name = self.config.opts("benchmarks", "track")
challenge_name = self.config.opts("benchmarks", "challenge")
selected_car_name = self.config.opts("benchmarks", "car")
self.metrics_store.open(invocation, track_name, challenge_name, selected_car_name)

track = msg.track
challenge = select_challenge(self.config, track)
setup_index(self.es, track, challenge, expected_cluster_health)
challenge = select_challenge(self.config, current_track)
setup_index(self.es, current_track, challenge, expected_cluster_health)
allocator = Allocator(challenge.schedule)
self.allocations = allocator.allocations
self.number_of_steps = len(allocator.join_points) - 1
Expand All @@ -185,7 +190,7 @@ def start_benchmark(self, msg, sender):
for client_id in range(allocator.clients):
self.drivers.append(self.createActor(LoadGenerator))
for client_id, driver in enumerate(self.drivers):
self.send(driver, StartLoadGenerator(client_id, self.config, track.indices, self.allocations[client_id]))
self.send(driver, StartLoadGenerator(client_id, self.config, current_track.indices, self.allocations[client_id]))

self.update_progress_message()
self.wakeupAfter(datetime.timedelta(seconds=Driver.WAKEUP_INTERVAL_SECONDS))
Expand Down Expand Up @@ -422,8 +427,8 @@ def select_challenge(config, t):
for challenge in t.challenges:
if challenge.name == selected_challenge:
return challenge
raise exceptions.ImproperlyConfigured("Unknown challenge [%s] for track [%s]. You can list the available tracks and their "
"challenges with %s list tracks." % (selected_challenge, t.name, PROGRAM_NAME))
raise exceptions.SystemSetupError("Unknown challenge [%s] for track [%s]. You can list the available tracks and their "
"challenges with %s list tracks." % (selected_challenge, t.name, PROGRAM_NAME))


def setup_index(es, t, challenge, expected_cluster_health):
Expand Down
7 changes: 0 additions & 7 deletions esrally/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@ class RallyError(Exception):
pass


class ImproperlyConfigured(RallyError):
"""
Thrown on configuration errors.
"""
pass


class LaunchError(RallyError):
"""
Thrown whenever there was a problem launching the benchmark candidate
Expand Down
60 changes: 0 additions & 60 deletions esrally/mechanic/builder.py

This file was deleted.

9 changes: 9 additions & 0 deletions esrally/car.py → esrally/mechanic/car.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import tabulate

from esrally import exceptions, PROGRAM_NAME
from esrally.utils import sysstats, console


Expand All @@ -8,6 +9,14 @@ def list_cars():
console.println(tabulate.tabulate([[c.name] for c in cars], headers=["Name"]))


def select_car(cfg):
name = cfg.opts("benchmarks", "car")
for c in cars:
if c.name == name:
return c
raise exceptions.SystemSetupError("Unknown car [%s]. You can list the available cars with %s list cars." % (name, PROGRAM_NAME))


mergePartsLogYmlConfig = '''
es.logger.level: INFO
rootLogger: ${es.logger.level}, console, file
Expand Down
File renamed without changes.
79 changes: 63 additions & 16 deletions esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,58 @@
import subprocess
import threading

from esrally import config, cluster, telemetry, time, exceptions
from esrally.mechanic import gear
from esrally.utils import versions, console
from esrally import config, time, exceptions, client
from esrally.mechanic import gear, telemetry, cluster
from esrally.utils import versions, console, process

logger = logging.getLogger("rally.launcher")


class ExternalLauncher:
def __init__(self, cfg):
# benchmarks with external candidates are really scary and we should warn users.
BOGUS_RESULTS_WARNING = """
************************************************************************
************** WARNING: A dark dungeon lies ahead of you **************
************************************************************************
Rally does not have control over the configuration of the benchmarked
Elasticsearch cluster.
Be aware that results may be misleading due to problems with the setup.
Rally is also not able to gather lots of metrics at all (like CPU usage
of the benchmarked cluster) or may even produce misleading metrics (like
the index size).
************************************************************************
****** Use this pipeline only if you are aware of the tradeoffs. ******
*************************** Watch your step! ***************************
************************************************************************
"""

def __init__(self, cfg, metrics_store, client_factory_class=client.EsClientFactory):
self.cfg = cfg
self.metrics_store = metrics_store
self.client_factory = client_factory_class

def start(self, client, metrics_store):
t = telemetry.Telemetry(self.cfg, client=client, metrics_store=metrics_store, devices=[
telemetry.ExternalEnvironmentInfo(self.cfg, client, metrics_store),
telemetry.NodeStats(self.cfg, client, metrics_store),
telemetry.IndexStats(self.cfg, client, metrics_store)
def start(self, car=None):
console.println(ExternalLauncher.BOGUS_RESULTS_WARNING)

hosts = self.cfg.opts("launcher", "external.target.hosts")
client_options = self.cfg.opts("launcher", "client.options")
# unified client config
self.cfg.add(config.Scope.benchmark, "client", "hosts", hosts)
self.cfg.add(config.Scope.benchmark, "client", "options", client_options)

es = self.client_factory(hosts, client_options).create()

t = telemetry.Telemetry(self.cfg, client=es, metrics_store=self.metrics_store, devices=[
telemetry.ExternalEnvironmentInfo(self.cfg, es, self.metrics_store),
telemetry.NodeStats(self.cfg, es, self.metrics_store),
telemetry.IndexStats(self.cfg, es, self.metrics_store)
])
c = cluster.Cluster([], t)
user_defined_version = self.cfg.opts("source", "distribution.version", mandatory=False)
distribution_version = client.info()["version"]["number"]
distribution_version = es.info()["version"]["number"]
if not user_defined_version or user_defined_version.strip() == "":
logger.info("Distribution version was not specified by user. Rally-determined version is [%s]" % distribution_version)
self.cfg.add(config.Scope.benchmark, "source", "distribution.version", distribution_version)
Expand All @@ -35,6 +67,9 @@ def start(self, client, metrics_store):
t.attach_to_cluster(c)
return c

def stop(self, cluster):
pass


class InProcessLauncher:
"""
Expand Down Expand Up @@ -68,18 +103,30 @@ class InProcessLauncher:
}
}

def __init__(self, cfg, clock=time.Clock):
def __init__(self, cfg, metrics_store, clock=time.Clock):
self.cfg = cfg
self.metrics_store = metrics_store
self._clock = clock
self._servers = []

def start(self, car, client, metrics_store):
if self._servers:
logger.warn("There are still referenced servers on startup. Did the previous shutdown succeed?")
def start(self, car):
port = self.cfg.opts("provisioning", "node.http.port")
hosts = [{"host": "localhost", "port": port}]
client_options = self.cfg.opts("launcher", "client.options")
# unified client config
self.cfg.add(config.Scope.benchmark, "client", "hosts", hosts)
self.cfg.add(config.Scope.benchmark, "client", "options", client_options)

es = client.EsClientFactory(hosts, client_options).create()

# we're very specific which nodes we kill as there is potentially also an Elasticsearch based metrics store running on this machine
node_prefix = self.cfg.opts("provisioning", "node.name.prefix")
process.kill_running_es_instances(node_prefix)

logger.info("Starting a cluster based on car [%s] with [%d] nodes." % (car, car.nodes))

t = telemetry.Telemetry(self.cfg, client, metrics_store)
c = cluster.Cluster([self._start_node(node, car, client, metrics_store) for node in range(car.nodes)], t)
t = telemetry.Telemetry(self.cfg, es, self.metrics_store)
c = cluster.Cluster([self._start_node(node, car, es, self.metrics_store) for node in range(car.nodes)], t)
t.attach_to_cluster(c)
return c

Expand Down
76 changes: 35 additions & 41 deletions esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,59 @@
import logging

from esrally import metrics, paths, config
from esrally.utils import console
from esrally.mechanic import builder, supplier, provisioner, launcher
from esrally import paths, config
from esrally.mechanic import supplier, provisioner, launcher

logger = logging.getLogger("rally.mechanic")


def create(cfg, metrics_store, sources=False, build=False, distribution=False, external=False):
if sources:
s = lambda: supplier.from_sources(cfg, build)
p = provisioner.local_provisioner(cfg)
l = launcher.InProcessLauncher(cfg, metrics_store)
elif distribution:
s = lambda: supplier.from_distribution(cfg)
p = provisioner.local_provisioner(cfg)
l = launcher.InProcessLauncher(cfg, metrics_store)
elif external:
s = lambda: None
p = provisioner.no_op_provisioner()
l = launcher.ExternalLauncher(cfg, metrics_store)
else:
raise RuntimeError("One of sources, distribution or external must be True")

return Mechanic(cfg, s, p, l)


class Mechanic:
"""
Mechanic is responsible for preparing the benchmark candidate (i.e. all benchmark candidate related activities before and after
running the benchmark).
"""

def __init__(self, cfg):
def __init__(self, cfg, s, p, l):
self._config = cfg
self._supplier = supplier.Supplier(cfg)
self._builder = builder.Builder(cfg)
self._provisioner = provisioner.Provisioner(cfg)
self._launcher = launcher.InProcessLauncher(cfg)
self._metrics_store = None
self.supplier = s
self.provisioner = p
self.launcher = l

# TODO dm module refactoring: just moved it to the right place. Simplify (this should actually not be needed at all. It's just there
# to ensure we don't mix ES installs)
track_name = self._config.opts("system", "track")
# TODO dm: Check whether we can remove this completely
# ensure we don't mix ES installs
track_name = self._config.opts("benchmarks", "track")
challenge_name = self._config.opts("benchmarks", "challenge")
race_paths = paths.Paths(self._config)
self._config.add(config.Scope.challenge, "system", "challenge.root.dir",
race_paths.challenge_root(track_name, challenge_name))
self._config.add(config.Scope.challenge, "system", "challenge.log.dir",
race_paths.challenge_logs(track_name, challenge_name))
race_paths.challenge_logs(track_name, challenge_name))


# This is the one-time setup the mechanic performs (once for all benchmarks run)
def prepare_candidate(self):
console.println("Preparing for race (might take a few moments) ...")
self._supplier.fetch()
self._builder.build()

def find_candidate(self):
self._builder.add_binary_to_config()
self.supplier()

def start_metrics(self, track, challenge, car):
invocation = self._config.opts("meta", "time.start")
self._metrics_store = metrics.metrics_store(self._config)
self._metrics_store.open(invocation, track, challenge, car, create=True)

def start_engine(self, car, client, http_port):
self._provisioner.prepare(car, http_port)
return self._launcher.start(car, client, self._metrics_store)

def start_engine_external(self, client):
external_launcher = launcher.ExternalLauncher(self._config)
return external_launcher.start(client, self._metrics_store)
def start_engine(self):
selected_car = self.provisioner.prepare()
return self.launcher.start(selected_car)

def stop_engine(self, cluster):
self._launcher.stop(cluster)

def stop_metrics(self):
self._metrics_store.close()

def revise_candidate(self):
self._provisioner.cleanup()

self.launcher.stop(cluster)
self.provisioner.cleanup()
Loading

0 comments on commit 35f1578

Please sign in to comment.