diff --git a/esrally/client.py b/esrally/client.py
index d8012028d..41f7efe6b 100644
--- a/esrally/client.py
+++ b/esrally/client.py
@@ -51,4 +51,4 @@ def _is_set(self, client_opts, k):
return False
def create(self):
- return self.client
\ No newline at end of file
+ return self.client
diff --git a/esrally/driver.py b/esrally/driver.py
index c3643b0df..8f43eb1f3 100644
--- a/esrally/driver.py
+++ b/esrally/driver.py
@@ -158,22 +158,27 @@ def receiveMessage(self, msg, sender):
self.send(self.myAddress, thespian.actors.ActorExitRequest())
def start_benchmark(self, msg, sender):
- logger.info("Benchmark is about to start.")
self.start_sender = sender
self.config = msg.config
+ current_track = msg.track
+
+ logger.info("Preparing track")
+ # TODO #71: Reconsider this in case we distribute drivers. *For now* the driver will only be on a single machine, so we're safe.
+ track.prepare_track(current_track, self.config)
+
+ logger.info("Benchmark is about to start.")
self.quiet = msg.config.opts("system", "quiet.mode", mandatory=False, default_value=False)
self.es = client.EsClientFactory(msg.config.opts("client", "hosts"), msg.config.opts("client", "options")).create()
self.metrics_store = metrics.InMemoryMetricsStore(config=self.config, meta_info=msg.metrics_meta_info)
invocation = self.config.opts("meta", "time.start")
expected_cluster_health = msg.config.opts("benchmarks", "cluster.health")
- track_name = self.config.opts("system", "track")
+ track_name = self.config.opts("benchmarks", "track")
challenge_name = self.config.opts("benchmarks", "challenge")
selected_car_name = self.config.opts("benchmarks", "car")
self.metrics_store.open(invocation, track_name, challenge_name, selected_car_name)
- track = msg.track
- challenge = select_challenge(self.config, track)
- setup_index(self.es, track, challenge, expected_cluster_health)
+ challenge = select_challenge(self.config, current_track)
+ setup_index(self.es, current_track, challenge, expected_cluster_health)
allocator = Allocator(challenge.schedule)
self.allocations = allocator.allocations
self.number_of_steps = len(allocator.join_points) - 1
@@ -185,7 +190,7 @@ def start_benchmark(self, msg, sender):
for client_id in range(allocator.clients):
self.drivers.append(self.createActor(LoadGenerator))
for client_id, driver in enumerate(self.drivers):
- self.send(driver, StartLoadGenerator(client_id, self.config, track.indices, self.allocations[client_id]))
+ self.send(driver, StartLoadGenerator(client_id, self.config, current_track.indices, self.allocations[client_id]))
self.update_progress_message()
self.wakeupAfter(datetime.timedelta(seconds=Driver.WAKEUP_INTERVAL_SECONDS))
@@ -422,8 +427,8 @@ def select_challenge(config, t):
for challenge in t.challenges:
if challenge.name == selected_challenge:
return challenge
- raise exceptions.ImproperlyConfigured("Unknown challenge [%s] for track [%s]. You can list the available tracks and their "
- "challenges with %s list tracks." % (selected_challenge, t.name, PROGRAM_NAME))
+ raise exceptions.SystemSetupError("Unknown challenge [%s] for track [%s]. You can list the available tracks and their "
+ "challenges with %s list tracks." % (selected_challenge, t.name, PROGRAM_NAME))
def setup_index(es, t, challenge, expected_cluster_health):
diff --git a/esrally/exceptions.py b/esrally/exceptions.py
index 3e815f5e6..a4ce646bc 100644
--- a/esrally/exceptions.py
+++ b/esrally/exceptions.py
@@ -5,13 +5,6 @@ class RallyError(Exception):
pass
-class ImproperlyConfigured(RallyError):
- """
- Thrown on configuration errors.
- """
- pass
-
-
class LaunchError(RallyError):
"""
Thrown whenever there was a problem launching the benchmark candidate
diff --git a/esrally/mechanic/builder.py b/esrally/mechanic/builder.py
deleted file mode 100644
index 25f1f9dba..000000000
--- a/esrally/mechanic/builder.py
+++ /dev/null
@@ -1,60 +0,0 @@
-import glob
-import logging
-
-from esrally import config
-from esrally.utils import io, process, console
-from esrally.exceptions import ImproperlyConfigured, BuildError
-
-logger = logging.getLogger("rally.builder")
-
-
-class Builder:
- """
- A builder is responsible for creating an installable binary from the source files.
-
- It is not intended to be used directly but should be triggered by its mechanic.
- """
-
- def __init__(self, cfg):
- self._config = cfg
-
- def build(self):
- # just Gradle is supported for now
- self._exec("gradle.tasks.clean")
- console.println(" Building from sources ...")
- self._exec("gradle.tasks.package")
-
- def add_binary_to_config(self):
- src_dir = self._config.opts("source", "local.src.dir")
- try:
- binary = glob.glob("%s/distribution/tar/build/distributions/*.tar.gz" % src_dir)[0]
- except IndexError:
- raise ImproperlyConfigured("Couldn't find a tar.gz distribution. Please run Rally with the pipeline 'from-sources-complete'.")
- self._config.add(config.Scope.invocation, "builder", "candidate.bin.path", binary)
-
- def _exec(self, task_key):
- src_dir = self._config.opts("source", "local.src.dir")
- logger.info("Building Elasticsearch from sources in [%s]." % src_dir)
- gradle = self._config.opts("build", "gradle.bin")
- task = self._config.opts("build", task_key)
- java_home = self._config.opts("runtime", "java8.home")
-
- log_root = self._config.opts("system", "log.dir")
- build_log_dir = self._config.opts("build", "log.dir")
- log_dir = "%s/%s" % (log_root, build_log_dir)
-
- logger.info("Executing %s %s..." % (gradle, task))
- io.ensure_dir(log_dir)
- log_file = "%s/build.%s.log" % (log_dir, task_key)
-
- # we capture all output to a dedicated build log file
-
- if process.run_subprocess("export JAVA_HOME=%s; cd %s; %s %s > %s 2>&1" % (java_home, src_dir, gradle, task, log_file)):
- msg = "Executing '%s %s' failed. Here are the last 20 lines in the build log file:\n" % (gradle, task)
- msg += "=========================================================================================================\n"
- with open(log_file, "r") as f:
- msg += "\t"
- msg += "\t".join(f.readlines()[-20:])
- msg += "=========================================================================================================\n"
- msg += "The full build log is available at [%s]." % log_file
- raise BuildError(msg)
diff --git a/esrally/car.py b/esrally/mechanic/car.py
similarity index 96%
rename from esrally/car.py
rename to esrally/mechanic/car.py
index f1e6eb1a0..f91375fe8 100644
--- a/esrally/car.py
+++ b/esrally/mechanic/car.py
@@ -1,5 +1,6 @@
import tabulate
+from esrally import exceptions, PROGRAM_NAME
from esrally.utils import sysstats, console
@@ -8,6 +9,14 @@ def list_cars():
console.println(tabulate.tabulate([[c.name] for c in cars], headers=["Name"]))
+def select_car(cfg):
+ name = cfg.opts("benchmarks", "car")
+ for c in cars:
+ if c.name == name:
+ return c
+ raise exceptions.SystemSetupError("Unknown car [%s]. You can list the available cars with %s list cars." % (name, PROGRAM_NAME))
+
+
mergePartsLogYmlConfig = '''
es.logger.level: INFO
rootLogger: ${es.logger.level}, console, file
diff --git a/esrally/cluster.py b/esrally/mechanic/cluster.py
similarity index 100%
rename from esrally/cluster.py
rename to esrally/mechanic/cluster.py
diff --git a/esrally/mechanic/launcher.py b/esrally/mechanic/launcher.py
index def02deef..4007bda7e 100644
--- a/esrally/mechanic/launcher.py
+++ b/esrally/mechanic/launcher.py
@@ -5,26 +5,58 @@
import subprocess
import threading
-from esrally import config, cluster, telemetry, time, exceptions
-from esrally.mechanic import gear
-from esrally.utils import versions, console
+from esrally import config, time, exceptions, client
+from esrally.mechanic import gear, telemetry, cluster
+from esrally.utils import versions, console, process
logger = logging.getLogger("rally.launcher")
class ExternalLauncher:
- def __init__(self, cfg):
+ # benchmarks with external candidates are really scary and we should warn users.
+ BOGUS_RESULTS_WARNING = """
+************************************************************************
+************** WARNING: A dark dungeon lies ahead of you **************
+************************************************************************
+
+Rally does not have control over the configuration of the benchmarked
+Elasticsearch cluster.
+
+Be aware that results may be misleading due to problems with the setup.
+Rally is also not able to gather lots of metrics at all (like CPU usage
+of the benchmarked cluster) or may even produce misleading metrics (like
+the index size).
+
+************************************************************************
+****** Use this pipeline only if you are aware of the tradeoffs. ******
+*************************** Watch your step! ***************************
+************************************************************************
+"""
+
+ def __init__(self, cfg, metrics_store, client_factory_class=client.EsClientFactory):
self.cfg = cfg
+ self.metrics_store = metrics_store
+ self.client_factory = client_factory_class
- def start(self, client, metrics_store):
- t = telemetry.Telemetry(self.cfg, client=client, metrics_store=metrics_store, devices=[
- telemetry.ExternalEnvironmentInfo(self.cfg, client, metrics_store),
- telemetry.NodeStats(self.cfg, client, metrics_store),
- telemetry.IndexStats(self.cfg, client, metrics_store)
+ def start(self, car=None):
+ console.println(ExternalLauncher.BOGUS_RESULTS_WARNING)
+
+ hosts = self.cfg.opts("launcher", "external.target.hosts")
+ client_options = self.cfg.opts("launcher", "client.options")
+ # unified client config
+ self.cfg.add(config.Scope.benchmark, "client", "hosts", hosts)
+ self.cfg.add(config.Scope.benchmark, "client", "options", client_options)
+
+ es = self.client_factory(hosts, client_options).create()
+
+ t = telemetry.Telemetry(self.cfg, client=es, metrics_store=self.metrics_store, devices=[
+ telemetry.ExternalEnvironmentInfo(self.cfg, es, self.metrics_store),
+ telemetry.NodeStats(self.cfg, es, self.metrics_store),
+ telemetry.IndexStats(self.cfg, es, self.metrics_store)
])
c = cluster.Cluster([], t)
user_defined_version = self.cfg.opts("source", "distribution.version", mandatory=False)
- distribution_version = client.info()["version"]["number"]
+ distribution_version = es.info()["version"]["number"]
if not user_defined_version or user_defined_version.strip() == "":
logger.info("Distribution version was not specified by user. Rally-determined version is [%s]" % distribution_version)
self.cfg.add(config.Scope.benchmark, "source", "distribution.version", distribution_version)
@@ -35,6 +67,9 @@ def start(self, client, metrics_store):
t.attach_to_cluster(c)
return c
+ def stop(self, cluster):
+ pass
+
class InProcessLauncher:
"""
@@ -68,18 +103,30 @@ class InProcessLauncher:
}
}
- def __init__(self, cfg, clock=time.Clock):
+ def __init__(self, cfg, metrics_store, clock=time.Clock):
self.cfg = cfg
+ self.metrics_store = metrics_store
self._clock = clock
self._servers = []
- def start(self, car, client, metrics_store):
- if self._servers:
- logger.warn("There are still referenced servers on startup. Did the previous shutdown succeed?")
+ def start(self, car):
+ port = self.cfg.opts("provisioning", "node.http.port")
+ hosts = [{"host": "localhost", "port": port}]
+ client_options = self.cfg.opts("launcher", "client.options")
+ # unified client config
+ self.cfg.add(config.Scope.benchmark, "client", "hosts", hosts)
+ self.cfg.add(config.Scope.benchmark, "client", "options", client_options)
+
+ es = client.EsClientFactory(hosts, client_options).create()
+
+ # we're very specific which nodes we kill as there is potentially also an Elasticsearch based metrics store running on this machine
+ node_prefix = self.cfg.opts("provisioning", "node.name.prefix")
+ process.kill_running_es_instances(node_prefix)
+
logger.info("Starting a cluster based on car [%s] with [%d] nodes." % (car, car.nodes))
- t = telemetry.Telemetry(self.cfg, client, metrics_store)
- c = cluster.Cluster([self._start_node(node, car, client, metrics_store) for node in range(car.nodes)], t)
+ t = telemetry.Telemetry(self.cfg, es, self.metrics_store)
+ c = cluster.Cluster([self._start_node(node, car, es, self.metrics_store) for node in range(car.nodes)], t)
t.attach_to_cluster(c)
return c
diff --git a/esrally/mechanic/mechanic.py b/esrally/mechanic/mechanic.py
index d60e435f8..a381df43a 100644
--- a/esrally/mechanic/mechanic.py
+++ b/esrally/mechanic/mechanic.py
@@ -1,65 +1,59 @@
import logging
-from esrally import metrics, paths, config
-from esrally.utils import console
-from esrally.mechanic import builder, supplier, provisioner, launcher
+from esrally import paths, config
+from esrally.mechanic import supplier, provisioner, launcher
logger = logging.getLogger("rally.mechanic")
+def create(cfg, metrics_store, sources=False, build=False, distribution=False, external=False):
+ if sources:
+ s = lambda: supplier.from_sources(cfg, build)
+ p = provisioner.local_provisioner(cfg)
+ l = launcher.InProcessLauncher(cfg, metrics_store)
+ elif distribution:
+ s = lambda: supplier.from_distribution(cfg)
+ p = provisioner.local_provisioner(cfg)
+ l = launcher.InProcessLauncher(cfg, metrics_store)
+ elif external:
+ s = lambda: None
+ p = provisioner.no_op_provisioner()
+ l = launcher.ExternalLauncher(cfg, metrics_store)
+ else:
+ raise RuntimeError("One of sources, distribution or external must be True")
+
+ return Mechanic(cfg, s, p, l)
+
+
class Mechanic:
"""
Mechanic is responsible for preparing the benchmark candidate (i.e. all benchmark candidate related activities before and after
running the benchmark).
"""
- def __init__(self, cfg):
+ def __init__(self, cfg, s, p, l):
self._config = cfg
- self._supplier = supplier.Supplier(cfg)
- self._builder = builder.Builder(cfg)
- self._provisioner = provisioner.Provisioner(cfg)
- self._launcher = launcher.InProcessLauncher(cfg)
- self._metrics_store = None
+ self.supplier = s
+ self.provisioner = p
+ self.launcher = l
- # TODO dm module refactoring: just moved it to the right place. Simplify (this should actually not be needed at all. It's just there
- # to ensure we don't mix ES installs)
- track_name = self._config.opts("system", "track")
+ # TODO dm: Check whether we can remove this completely
+ # ensure we don't mix ES installs
+ track_name = self._config.opts("benchmarks", "track")
challenge_name = self._config.opts("benchmarks", "challenge")
race_paths = paths.Paths(self._config)
self._config.add(config.Scope.challenge, "system", "challenge.root.dir",
race_paths.challenge_root(track_name, challenge_name))
self._config.add(config.Scope.challenge, "system", "challenge.log.dir",
- race_paths.challenge_logs(track_name, challenge_name))
+ race_paths.challenge_logs(track_name, challenge_name))
-
- # This is the one-time setup the mechanic performs (once for all benchmarks run)
def prepare_candidate(self):
- console.println("Preparing for race (might take a few moments) ...")
- self._supplier.fetch()
- self._builder.build()
-
- def find_candidate(self):
- self._builder.add_binary_to_config()
+ self.supplier()
- def start_metrics(self, track, challenge, car):
- invocation = self._config.opts("meta", "time.start")
- self._metrics_store = metrics.metrics_store(self._config)
- self._metrics_store.open(invocation, track, challenge, car, create=True)
-
- def start_engine(self, car, client, http_port):
- self._provisioner.prepare(car, http_port)
- return self._launcher.start(car, client, self._metrics_store)
-
- def start_engine_external(self, client):
- external_launcher = launcher.ExternalLauncher(self._config)
- return external_launcher.start(client, self._metrics_store)
+ def start_engine(self):
+ selected_car = self.provisioner.prepare()
+ return self.launcher.start(selected_car)
def stop_engine(self, cluster):
- self._launcher.stop(cluster)
-
- def stop_metrics(self):
- self._metrics_store.close()
-
- def revise_candidate(self):
- self._provisioner.cleanup()
-
+ self.launcher.stop(cluster)
+ self.provisioner.cleanup()
diff --git a/esrally/mechanic/provisioner.py b/esrally/mechanic/provisioner.py
index 6ecb93e14..7d190789d 100644
--- a/esrally/mechanic/provisioner.py
+++ b/esrally/mechanic/provisioner.py
@@ -4,11 +4,20 @@
import logging
from esrally import config, exceptions
+from esrally.mechanic import car
from esrally.utils import io, versions, console
logger = logging.getLogger("rally.provisioner")
+def local_provisioner(cfg):
+ return Provisioner(cfg)
+
+
+def no_op_provisioner():
+ return NoOpProvisioner()
+
+
class Provisioner:
"""
The provisioner prepares the runtime environment for running the benchmark. It prepares all configuration files and copies the binary
@@ -19,10 +28,12 @@ def __init__(self, cfg):
self._config = cfg
self.preserve = self._config.opts("provisioning", "install.preserve")
- # TODO #71: This should be split into an environment independent configuration (car) and environment dependent configuration (http_port)
- def prepare(self, car, http_port):
+ def prepare(self):
+ selected_car = car.select_car(self._config)
+ http_port = self._config.opts("provisioning", "node.http.port")
self._install_binary()
- self._configure(car, http_port)
+ self._configure(selected_car, http_port)
+ return selected_car
def cleanup(self):
install_dir = self._install_dir()
@@ -121,3 +132,11 @@ def _install_dir(self):
root = self._config.opts("system", "challenge.root.dir")
install = self._config.opts("provisioning", "local.install.dir")
return "%s/%s" % (root, install)
+
+
+class NoOpProvisioner:
+ def prepare(self):
+ pass
+
+ def cleanup(self):
+ pass
\ No newline at end of file
diff --git a/esrally/mechanic/supplier.py b/esrally/mechanic/supplier.py
index df3106dbe..6afa3f7ee 100644
--- a/esrally/mechanic/supplier.py
+++ b/esrally/mechanic/supplier.py
@@ -1,13 +1,62 @@
+import os
+import glob
import logging
+import urllib.error
+import xml.etree.ElementTree
-from esrally.utils import git, console
+from esrally import config, exceptions, PROGRAM_NAME
+from esrally.utils import git, console, io, process, net, versions
+from esrally.exceptions import BuildError, SystemSetupError
logger = logging.getLogger("rally.supplier")
-class Supplier:
+def from_sources(cfg, build=True):
+ console.println("Preparing for race (might take a few moments) ...")
+ builder = Builder(cfg)
+ SourceRepository(cfg).fetch()
+
+ if build:
+ builder.build()
+ builder.add_binary_to_config()
+
+
+def from_distribution(cfg):
+ version = cfg.opts("source", "distribution.version")
+ repo_name = cfg.opts("source", "distribution.repository")
+ if version.strip() == "":
+ raise exceptions.SystemSetupError("Could not determine version. Please specify the Elasticsearch distribution "
+ "to download with the command line parameter --distribution-version. "
+ "E.g. --distribution-version=5.0.0")
+ distributions_root = "%s/%s" % (cfg.opts("system", "root.dir"), cfg.opts("source", "distribution.dir"))
+ io.ensure_dir(distributions_root)
+ distribution_path = "%s/elasticsearch-%s.tar.gz" % (distributions_root, version)
+
+ try:
+ repo = distribution_repos[repo_name]
+ except KeyError:
+ raise exceptions.SystemSetupError("Unknown distribution repository [%s]. Valid values are: [%s]"
+ % (repo_name, ",".join(distribution_repos.keys())))
+
+ download_url = repo.download_url(version)
+ logger.info("Resolved download URL [%s] for version [%s]" % (download_url, version))
+ if not os.path.isfile(distribution_path) or repo.must_download:
+ try:
+ console.println("Downloading Elasticsearch %s ..." % version, logger=logger.info)
+ net.download(download_url, distribution_path)
+ except urllib.error.HTTPError:
+ logging.exception("Cannot download Elasticsearch distribution for version [%s] from [%s]." % (version, download_url))
+ raise exceptions.SystemSetupError("Cannot download Elasticsearch distribution from [%s]. Please check that the specified "
+ "version [%s] is correct." % (download_url, version))
+ else:
+ logger.info("Skipping download for version [%s]. Found an existing binary locally at [%s]." % (version, distribution_path))
+
+ cfg.add(cfg.Scope.invocation, "builder", "candidate.bin.path", distribution_path)
+
+
+class SourceRepository:
"""
- Supplier fetches the benchmark candidate source tree from the remote repository. In the current implementation, only git is supported.
+ Supplier fetches the benchmark candidate source tree from the remote repository.
"""
def __init__(self, cfg):
@@ -31,7 +80,7 @@ def _update(self):
elif revision == "current":
logger.info("Skip fetching sources")
elif revision.startswith("@"):
- # concert timestamp annotated for Rally to something git understands -> we strip leading and trailing " and the @.
+ # convert timestamp annotated for Rally to something git understands -> we strip leading and trailing " and the @.
git.pull_ts(self.src_dir, revision[1:])
else: # assume a git commit hash
git.pull_revision(self.src_dir, revision)
@@ -47,3 +96,116 @@ def src_dir(self):
def remote_url(self):
return self.cfg.opts("source", "remote.repo.url")
+
+class Builder:
+ """
+ A builder is responsible for creating an installable binary from the source files.
+
+ It is not intended to be used directly but should be triggered by its mechanic.
+ """
+
+ def __init__(self, cfg):
+ self._config = cfg
+
+ def build(self):
+ self._exec("gradle.tasks.clean")
+ console.println(" Building from sources ...")
+ self._exec("gradle.tasks.package")
+
+ def add_binary_to_config(self):
+ src_dir = self._config.opts("source", "local.src.dir")
+ try:
+ binary = glob.glob("%s/distribution/tar/build/distributions/*.tar.gz" % src_dir)[0]
+ except IndexError:
+ raise SystemSetupError("Couldn't find a tar.gz distribution. Please run Rally with the pipeline 'from-sources-complete'.")
+ self._config.add(config.Scope.invocation, "builder", "candidate.bin.path", binary)
+
+ def _exec(self, task_key):
+ try:
+ src_dir = self._config.opts("source", "local.src.dir")
+ except config.ConfigError:
+ logging.exception("Rally is not configured to build from sources")
+ raise SystemSetupError("Rally is not setup to build from sources. You can either benchmark a binary distribution or "
+ "install the required software and reconfigure Rally with %s --configure." % PROGRAM_NAME)
+
+ logger.info("Building Elasticsearch from sources in [%s]." % src_dir)
+ gradle = self._config.opts("build", "gradle.bin")
+ task = self._config.opts("build", task_key)
+ java_home = self._config.opts("runtime", "java8.home")
+
+ log_root = self._config.opts("system", "log.dir")
+ build_log_dir = self._config.opts("build", "log.dir")
+ log_dir = "%s/%s" % (log_root, build_log_dir)
+
+ logger.info("Executing %s %s..." % (gradle, task))
+ io.ensure_dir(log_dir)
+ log_file = "%s/build.%s.log" % (log_dir, task_key)
+
+ # we capture all output to a dedicated build log file
+
+ if process.run_subprocess("export JAVA_HOME=%s; cd %s; %s %s > %s 2>&1" % (java_home, src_dir, gradle, task, log_file)):
+ msg = "Executing '%s %s' failed. Here are the last 20 lines in the build log file:\n" % (gradle, task)
+ msg += "=========================================================================================================\n"
+ with open(log_file, "r") as f:
+ msg += "\t"
+ msg += "\t".join(f.readlines()[-20:])
+ msg += "=========================================================================================================\n"
+ msg += "The full build log is available at [%s]." % log_file
+ raise BuildError(msg)
+
+
+class ReleaseDistributionRepo:
+ def __init__(self):
+ self.must_download = False
+
+ def download_url(self, version):
+ major_version = int(versions.components(version)["major"])
+ if major_version > 1 and not self.on_or_after_5_0_0_beta1(version):
+ download_url = "https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/%s/" \
+ "elasticsearch-%s.tar.gz" % (version, version)
+ elif self.on_or_after_5_0_0_beta1(version):
+ download_url = "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-%s.tar.gz" % version
+ else:
+ download_url = "https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-%s.tar.gz" % version
+ return download_url
+
+ def on_or_after_5_0_0_beta1(self, version):
+ components = versions.components(version)
+ major_version = int(components["major"])
+ minor_version = int(components["minor"])
+ patch_version = int(components["patch"])
+ suffix = components["suffix"] if "suffix" in components else None
+
+ if major_version < 5:
+ return False
+ elif major_version == 5 and minor_version == 0 and patch_version == 0 and suffix and suffix.startswith("alpha"):
+ return False
+ return True
+
+
+class SnapshotDistributionRepo:
+ def __init__(self):
+ self.must_download = True
+
+ def download_url(self, version):
+ root_path = "https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/distribution/tar/elasticsearch/%s" % version
+ metadata_url = "%s/maven-metadata.xml" % root_path
+ try:
+ metadata = net.retrieve_content_as_string(metadata_url)
+ x = xml.etree.ElementTree.fromstring(metadata)
+ found_snapshot_versions = x.findall("./versioning/snapshotVersions/snapshotVersion/[extension='tar.gz']/value")
+ except Exception:
+ logger.exception("Could not retrieve a valid metadata.xml file from remote URL [%s]." % metadata_url)
+ raise exceptions.SystemSetupError("Cannot derive download URL for Elasticsearch %s" % version)
+
+ if len(found_snapshot_versions) == 1:
+ snapshot_id = found_snapshot_versions[0].text
+ return "%s/elasticsearch-%s.tar.gz" % (root_path, snapshot_id)
+ else:
+ logger.error("Found [%d] version identifiers in [%s]. Contents: %s" % (len(found_snapshot_versions), metadata_url, metadata))
+ raise exceptions.SystemSetupError("Cannot derive download URL for Elasticsearch %s" % version)
+
+distribution_repos = {
+ "release": ReleaseDistributionRepo(),
+ "snapshot": SnapshotDistributionRepo()
+}
diff --git a/esrally/telemetry.py b/esrally/mechanic/telemetry.py
similarity index 100%
rename from esrally/telemetry.py
rename to esrally/mechanic/telemetry.py
diff --git a/esrally/metrics.py b/esrally/metrics.py
index dda93200d..e842b5b33 100644
--- a/esrally/metrics.py
+++ b/esrally/metrics.py
@@ -116,18 +116,28 @@ class MetaInfoScope(Enum):
"""
-def metrics_store(config):
+def metrics_store(config, read_only=True, invocation=None, track=None, challenge=None, car=None):
"""
Creates a proper metrics store based on the current configuration.
- :param config: Config object. Mandatory.
+
+ :param config: Config object.
+ :param read_only: Whether to open the metrics store only for reading (Default: True).
:return: A metrics store implementation.
"""
if config.opts("reporting", "datastore.type") == "elasticsearch":
logger.info("Creating ES metrics store")
- return EsMetricsStore(config)
+ store = EsMetricsStore(config)
else:
logger.info("Creating in-memory metrics store")
- return InMemoryMetricsStore(config)
+ store = InMemoryMetricsStore(config)
+
+ selected_invocation = config.opts("meta", "time.start") if invocation is None else invocation
+ selected_track = config.opts("benchmarks", "track") if track is None else track
+ selected_challenge = config.opts("benchmarks", "challenge") if challenge is None else challenge
+ selected_car = config.opts("benchmarks", "car") if car is None else car
+
+ store.open(selected_invocation, selected_track, selected_challenge, selected_car, create=not read_only)
+ return store
class SampleType(IntEnum):
@@ -218,7 +228,7 @@ def add_meta_info(self, scope, scope_key, key, value):
self._meta_info[MetaInfoScope.node][scope_key] = {}
self._meta_info[MetaInfoScope.node][scope_key][key] = value
else:
- raise exceptions.ImproperlyConfigured("Unknown meta info scope [%s]" % scope)
+ raise exceptions.SystemSetupError("Unknown meta info scope [%s]" % scope)
@property
def meta_info(self):
@@ -309,7 +319,7 @@ def _put(self, level, level_key, name, value, unit, operation, operation_type, s
meta = self._meta_info[MetaInfoScope.cluster].copy()
meta.update(self._meta_info[MetaInfoScope.node][level_key])
else:
- raise exceptions.ImproperlyConfigured("Unknown meta info level [%s] for metric [%s]" % (level, name))
+ raise exceptions.SystemSetupError("Unknown meta info level [%s] for metric [%s]" % (level, name))
if absolute_time is None:
absolute_time = self._clock.now()
if relative_time is None:
diff --git a/esrally/racecontrol.py b/esrally/racecontrol.py
index 21dd57709..6768749ae 100644
--- a/esrally/racecontrol.py
+++ b/esrally/racecontrol.py
@@ -1,32 +1,16 @@
import logging
-import os
+import shutil
import sys
-import urllib.error
-import xml.etree.ElementTree
import tabulate
import thespian.actors
-from esrally import config, driver, exceptions, sweeper, track, reporter, metrics, car, client, PROGRAM_NAME
+from esrally import config, driver, exceptions, paths, track, reporter, metrics, PROGRAM_NAME
from esrally.mechanic import mechanic
-from esrally.utils import process, net, io, versions, console
+from esrally.utils import console, io
logger = logging.getLogger("rally.racecontrol")
-
-class PipelineStep:
- """
- Describes a single step in an execution pipeline, e.g. "build" or "benchmark".
- """
- def __init__(self, name, ctx, command):
- self.name = name
- self.ctx = ctx
- self.command = command
-
- def __call__(self, challenge=None, car=None):
- if challenge is None and car is None:
- self.command(self.ctx)
- else:
- self.command(self.ctx, challenge, car)
+pipelines = {}
class Pipeline:
@@ -40,355 +24,121 @@ class Pipeline:
* Report results
"""
- def __init__(self, name, description, steps):
+ def __init__(self, name, description, target):
"""
Creates a new pipeline.
:param name: A short name of the pipeline. This name will be used to reference it from the command line.
:param description: A human-readable description what the pipeline does.
- :param steps: The concrete steps to execute. The steps will be executed in the provided order. A pipeline should consist of at
- least one step.
+ :param target: A function that implements this pipeline
"""
self.name = name
self.description = description
- self.steps = steps
-
- def __call__(self):
- for step in self.steps:
- step()
-
-#################################################
-#
-# Internal helper functions for pipeline steps
-#
-# If a helper function is short enough it can
-# also be added just as a lambda.
-#
-#################################################
+ self.target = target
+ pipelines[name] = self
-
-# TODO dm module refactoring: mechanic
-def check_can_handle_source_distribution(ctx):
- try:
- ctx.config.opts("source", "local.src.dir")
- except config.ConfigError:
- logging.exception("Rally is not configured to build from sources")
- raise exceptions.SystemSetupError("Rally is not setup to build from sources. You can either benchmark a binary distribution or "
- "install the required software and reconfigure Rally with %s --configure." % PROGRAM_NAME)
+ def __call__(self, cfg):
+ self.target(cfg)
-# TODO dm module refactoring: mechanic (once per node or actually once per machine)
-def kill(ctx):
- # we're very specific which nodes we kill as there is potentially also an Elasticsearch based metrics store running on this machine
- node_prefix = ctx.config.opts("provisioning", "node.name.prefix")
- process.kill_running_es_instances(node_prefix)
+def sweep(cfg):
+ invocation_root = cfg.opts("system", "invocation.root.dir")
+ track_name = cfg.opts("benchmarks", "track")
+ challenge_name = cfg.opts("benchmarks", "challenge")
+ car_name = cfg.opts("benchmarks", "car")
+ log_root = paths.Paths(cfg).log_root()
+ archive_path = "%s/logs-%s-%s-%s.zip" % (invocation_root, track_name, challenge_name, car_name)
+ io.compress(log_root, archive_path)
+ console.println("\nLogs for this race are archived in %s" % archive_path)
+ shutil.rmtree(log_root)
-# TODO dm module refactoring: reporter?
-def store_race(ctx):
- metrics.race_store(ctx.config).store_race(ctx.track)
+def benchmark(cfg, mechanic, metrics_store):
+ track_name = cfg.opts("benchmarks", "track")
+ challenge_name = cfg.opts("benchmarks", "challenge")
+ selected_car_name = cfg.opts("benchmarks", "car")
-def prepare_track(ctx):
- track_name = ctx.config.opts("system", "track")
- try:
- ctx.track = track.load_track(ctx.config, track_name)
- except FileNotFoundError:
- logger.error("Cannot load track [%s]" % track_name)
- raise exceptions.ImproperlyConfigured("Cannot load track %s. You can list the available tracks with %s list tracks." %
- (track_name, PROGRAM_NAME))
- # TODO #71: Reconsider this in case we distribute drivers. *For now* the driver will only be on a single machine, so we're safe.
- track.prepare_track(ctx.track, ctx.config)
+ console.println("Racing on track [%s] and challenge [%s] with car [%s]" % (track_name, challenge_name, selected_car_name))
+ mechanic.prepare_candidate()
+ cluster = mechanic.start_engine()
-# benchmark when we provision ourselves
-def benchmark_internal(ctx):
- track_name = ctx.config.opts("system", "track")
- challenge_name = ctx.config.opts("benchmarks", "challenge")
- selected_car_name = ctx.config.opts("benchmarks", "car")
+ t = track.load_track(cfg)
+ metrics.race_store(cfg).store_race(t)
- console.println("Racing on track [%s] and challenge [%s] with car [%s]" % (track_name, challenge_name, selected_car_name))
- # TODO dm module refactoring: mechanic
- selected_car = None
- for c in car.cars:
- if c.name == selected_car_name:
- selected_car = c
-
- if not selected_car:
- raise exceptions.ImproperlyConfigured("Unknown car [%s]. You can list the available cars with %s list cars."
- % (selected_car_name, PROGRAM_NAME))
-
- port = ctx.config.opts("provisioning", "node.http.port")
- hosts = [{"host": "localhost", "port": port}]
- client_options = ctx.config.opts("launcher", "client.options")
- # unified client config
- ctx.config.add(config.Scope.benchmark, "client", "hosts", hosts)
- ctx.config.add(config.Scope.benchmark, "client", "options", client_options)
-
- es_client = client.EsClientFactory(hosts, client_options).create()
-
- # TODO dm module refactoring: separate module? don't let the mechanic handle the metrics store but rather just provide it
- ctx.mechanic.start_metrics(track_name, challenge_name, selected_car_name)
- cluster = ctx.mechanic.start_engine(selected_car, es_client, port)
actors = thespian.actors.ActorSystem()
main_driver = actors.createActor(driver.Driver)
- #TODO dm: Retrieving the metrics store here is *dirty*...
- metrics_store = ctx.mechanic._metrics_store
-
cluster.on_benchmark_start()
- result = actors.ask(main_driver, driver.StartBenchmark(ctx.config, ctx.track, metrics_store.meta_info))
+ result = actors.ask(main_driver, driver.StartBenchmark(cfg, t, metrics_store.meta_info))
if isinstance(result, driver.BenchmarkComplete):
cluster.on_benchmark_stop()
metrics_store.bulk_add(result.metrics)
- ctx.mechanic.stop_engine(cluster)
- ctx.mechanic.revise_candidate()
- ctx.mechanic.stop_metrics()
+ mechanic.stop_engine(cluster)
+ metrics_store.close()
+ reporter.summarize(cfg, t)
+ sweep(cfg)
elif isinstance(result, driver.BenchmarkFailure):
raise exceptions.RallyError(result.message, result.cause)
else:
raise exceptions.RallyError("Driver has returned no metrics but instead [%s]. Terminating race without result." % str(result))
-# TODO dm module refactoring: mechanic
-def prepare_benchmark_external(ctx):
- track_name = ctx.config.opts("system", "track")
- challenge_name = ctx.config.opts("benchmarks", "challenge")
- # override externally used car name for this benchmark. We'll use a fixed one for external benchmarks.
- car_name = "external"
- ctx.config.add(config.Scope.benchmark, "benchmarks", "car", car_name)
+# Poor man's curry
+def from_sources_complete(cfg):
+ metrics_store = metrics.metrics_store(cfg, read_only=False)
+ return benchmark(cfg, mechanic.create(cfg, metrics_store, sources=True, build=True), metrics_store)
- ctx.mechanic.start_metrics(track_name, challenge_name, car_name)
- hosts = ctx.config.opts("launcher", "external.target.hosts")
- client_options = ctx.config.opts("launcher", "client.options")
- # unified client config
- ctx.config.add(config.Scope.benchmark, "client", "hosts", hosts)
- ctx.config.add(config.Scope.benchmark, "client", "options", client_options)
+def from_sources_skip_build(cfg):
+ metrics_store = metrics.metrics_store(cfg, read_only=False)
+ return benchmark(cfg, mechanic.create(cfg, metrics_store, sources=True, build=False), metrics_store)
- es_client = client.EsClientFactory(hosts, client_options).create()
- ctx.cluster = ctx.mechanic.start_engine_external(es_client)
+def from_distribution(cfg):
+ metrics_store = metrics.metrics_store(cfg, read_only=False)
+ return benchmark(cfg, mechanic.create(cfg, metrics_store, distribution=True), metrics_store)
-# benchmark assuming Elasticsearch is already running externally
-def benchmark_external(ctx):
- # TODO dm module refactoring: we can just inline prepare_benchmark_external and simplify this code a bit
- track_name = ctx.config.opts("system", "track")
- challenge_name = ctx.config.opts("benchmarks", "challenge")
- console.println("Racing on track [%s] and challenge [%s]" % (track_name, challenge_name))
- actors = thespian.actors.ActorSystem()
- main_driver = actors.createActor(driver.Driver)
- #TODO dm: Retrieving the metrics store here is *dirty*...
- metrics_store = ctx.mechanic._metrics_store
- ctx.cluster.on_benchmark_start()
- result = actors.ask(main_driver, driver.StartBenchmark(ctx.config, ctx.track, metrics_store.meta_info))
- if isinstance(result, driver.BenchmarkComplete):
- ctx.cluster.on_benchmark_stop()
- metrics_store.bulk_add(result.metrics)
- ctx.mechanic.stop_metrics()
- elif isinstance(result, driver.BenchmarkFailure):
- raise exceptions.RallyError(result.message, result.cause)
- else:
- raise exceptions.RallyError("Driver has returned no metrics but instead [%s]. Terminating race without result." % str(result))
+def benchmark_only(cfg):
+ # We'll use a special car name for external benchmarks.
+ cfg.add(config.Scope.benchmark, "benchmarks", "car", "external")
+ metrics_store = metrics.metrics_store(cfg, read_only=False)
+ return benchmark(cfg, mechanic.create(cfg, metrics_store, external=True), metrics_store)
-# TODO dm module refactoring: mechanic
-def download_benchmark_candidate(ctx):
- version = ctx.config.opts("source", "distribution.version")
- repo_name = ctx.config.opts("source", "distribution.repository")
- if version.strip() == "":
- raise exceptions.SystemSetupError("Could not determine version. Please specify the Elasticsearch distribution "
- "to download with the command line parameter --distribution-version. "
- "E.g. --distribution-version=5.0.0")
- distributions_root = "%s/%s" % (ctx.config.opts("system", "root.dir"), ctx.config.opts("source", "distribution.dir"))
- io.ensure_dir(distributions_root)
- distribution_path = "%s/elasticsearch-%s.tar.gz" % (distributions_root, version)
+Pipeline("from-sources-complete",
+ "Builds and provisions Elasticsearch, runs a benchmark and reports results.", from_sources_complete)
- try:
- repo = distribution_repos[repo_name]
- except KeyError:
- raise exceptions.SystemSetupError("Unknown distribution repository [%s]. Valid values are: [%s]"
- % (repo_name, ",".join(distribution_repos.keys())))
-
- download_url = repo.download_url(version)
- logger.info("Resolved download URL [%s] for version [%s]" % (download_url, version))
- if not os.path.isfile(distribution_path) or repo.must_download:
- try:
- console.println("Downloading Elasticsearch %s ..." % version, logger=logger.info)
- net.download(download_url, distribution_path)
- except urllib.error.HTTPError:
- logging.exception("Cannot download Elasticsearch distribution for version [%s] from [%s]." % (version, download_url))
- raise exceptions.SystemSetupError("Cannot download Elasticsearch distribution from [%s]. Please check that the specified "
- "version [%s] is correct." % (download_url, version))
- else:
- logger.info("Skipping download for version [%s]. Found an existing binary locally at [%s]." % (version, distribution_path))
-
- ctx.config.add(config.Scope.invocation, "builder", "candidate.bin.path", distribution_path)
-
-
-class ReleaseDistributionRepo:
- def __init__(self):
- self.must_download = False
-
- def download_url(self, version):
- major_version = int(versions.components(version)["major"])
- if major_version > 1 and not self.on_or_after_5_0_0_beta1(version):
- download_url = "https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/%s/" \
- "elasticsearch-%s.tar.gz" % (version, version)
- elif self.on_or_after_5_0_0_beta1(version):
- download_url = "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-%s.tar.gz" % version
- else:
- download_url = "https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-%s.tar.gz" % version
- return download_url
-
- def on_or_after_5_0_0_beta1(self, version):
- components = versions.components(version)
- major_version = int(components["major"])
- minor_version = int(components["minor"])
- patch_version = int(components["patch"])
- suffix = components["suffix"] if "suffix" in components else None
-
- if major_version < 5:
- return False
- elif major_version == 5 and minor_version == 0 and patch_version == 0 and suffix and suffix.startswith("alpha"):
- return False
- return True
-
-class SnapshotDistributionRepo:
- def __init__(self):
- self.must_download = True
-
- def download_url(self, version):
- root_path = "https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/distribution/tar/elasticsearch/%s" % version
- metadata_url = "%s/maven-metadata.xml" % root_path
- try:
- metadata = net.retrieve_content_as_string(metadata_url)
- x = xml.etree.ElementTree.fromstring(metadata)
- found_snapshot_versions = x.findall("./versioning/snapshotVersions/snapshotVersion/[extension='tar.gz']/value")
- except Exception:
- logger.exception("Could not retrieve a valid metadata.xml file from remote URL [%s]." % metadata_url)
- raise exceptions.SystemSetupError("Cannot derive download URL for Elasticsearch %s" % version)
-
- if len(found_snapshot_versions) == 1:
- snapshot_id = found_snapshot_versions[0].text
- return "%s/elasticsearch-%s.tar.gz" % (root_path, snapshot_id)
- else:
- logger.error("Found [%d] version identifiers in [%s]. Contents: %s" % (len(found_snapshot_versions), metadata_url, metadata))
- raise exceptions.SystemSetupError("Cannot derive download URL for Elasticsearch %s" % version)
-
-distribution_repos = {
- "release": ReleaseDistributionRepo(),
- "snapshot": SnapshotDistributionRepo()
-}
-
-
-# benchmarks with external candidates are really scary and we should warn users.
-bogus_results_warning = """
-************************************************************************
-************** WARNING: A dark dungeon lies ahead of you **************
-************************************************************************
-
-Rally does not have control over the configuration of the benchmarked
-Elasticsearch cluster.
-
-Be aware that results may be misleading due to problems with the setup.
-Rally is also not able to gather lots of metrics at all (like CPU usage
-of the benchmarked cluster) or may even produce misleading metrics (like
-the index size).
-
-************************************************************************
-****** Use this pipeline only if you are aware of the tradeoffs. ******
-*************************** Watch your step! ***************************
-************************************************************************
-"""
-
-pipelines = {
- "from-sources-complete":
- lambda ctx=None: Pipeline("from-sources-complete", "Builds and provisions Elasticsearch, runs a benchmark and reports results.",
- [
- PipelineStep("check-can-handle-sources", ctx, check_can_handle_source_distribution),
- PipelineStep("kill-es", ctx, kill),
- PipelineStep("prepare-track", ctx, prepare_track),
- PipelineStep("store-race", ctx, store_race),
- PipelineStep("build", ctx, lambda ctx: ctx.mechanic.prepare_candidate()),
- PipelineStep("find-candidate", ctx, lambda ctx: ctx.mechanic.find_candidate()),
- PipelineStep("benchmark", ctx, benchmark_internal),
- PipelineStep("report", ctx, lambda ctx: ctx.reporter.report(ctx.track)),
- PipelineStep("sweep", ctx, sweeper.sweep)
- ]
- ),
- "from-sources-skip-build":
- lambda ctx=None: Pipeline("from-sources-skip-build", "Provisions Elasticsearch (skips the build), runs a benchmark and reports results.",
- [
- PipelineStep("check-can-handle-sources", ctx, check_can_handle_source_distribution),
- PipelineStep("kill-es", ctx, kill),
- PipelineStep("prepare-track", ctx, prepare_track),
- PipelineStep("store-race", ctx, store_race),
- PipelineStep("find-candidate", ctx, lambda ctx: ctx.mechanic.find_candidate()),
- PipelineStep("benchmark", ctx, benchmark_internal),
- PipelineStep("report", ctx, lambda ctx: ctx.reporter.report(ctx.track)),
- PipelineStep("sweep", ctx, sweeper.sweep)
- ]
-
- ),
- "from-distribution":
- lambda ctx=None: Pipeline("from-distribution", "Downloads an Elasticsearch distribution, provisions it, runs a benchmark and reports "
- "results.",
- [
- PipelineStep("kill-es", ctx, kill),
- PipelineStep("download-candidate", ctx, download_benchmark_candidate),
- PipelineStep("prepare-track", ctx, prepare_track),
- PipelineStep("store-race", ctx, store_race),
- PipelineStep("benchmark", ctx, benchmark_internal),
- PipelineStep("report", ctx, lambda ctx: ctx.reporter.report(ctx.track)),
- PipelineStep("sweep", ctx, sweeper.sweep)
- ]
-
- ),
- "benchmark-only":
- lambda ctx=None: Pipeline("benchmark-only", "Assumes an already running Elasticsearch instance, runs a benchmark and reports results",
- [
- PipelineStep("warn-bogus", ctx, lambda ctx: console.println(bogus_results_warning)),
- PipelineStep("prepare-track", ctx, prepare_track),
- PipelineStep("prepare-benchmark-external", ctx, prepare_benchmark_external),
- PipelineStep("store-race", ctx, store_race),
- PipelineStep("benchmark", ctx, benchmark_external),
- PipelineStep("report", ctx, lambda ctx: ctx.reporter.report(ctx.track)),
- PipelineStep("sweep", ctx, sweeper.sweep)
- ]
- ),
-
-}
+Pipeline("from-sources-skip-build",
+ "Provisions Elasticsearch (skips the build), runs a benchmark and reports results.", from_sources_skip_build)
+
+Pipeline("from-distribution",
+ "Downloads an Elasticsearch distribution, provisions it, runs a benchmark and reports results.", from_distribution)
+
+Pipeline("benchmark-only",
+ "Assumes an already running Elasticsearch instance, runs a benchmark and reports results", benchmark_only)
def list_pipelines():
console.println("Available pipelines:\n")
console.println(
- tabulate.tabulate([[pipeline().name, pipeline().description] for pipeline in pipelines.values()], headers=["Name", "Description"]))
+ tabulate.tabulate([[pipeline.name, pipeline.description] for pipeline in pipelines.values()], headers=["Name", "Description"]))
def run(cfg):
name = cfg.opts("system", "pipeline")
try:
- pipeline = pipelines[name](RacingContext(cfg))
+ pipeline = pipelines[name]
except KeyError:
- raise exceptions.ImproperlyConfigured(
+ raise exceptions.SystemSetupError(
"Unknown pipeline [%s]. You can list the available pipelines with %s list pipelines." % (name, PROGRAM_NAME))
try:
- pipeline()
+ pipeline(cfg)
except exceptions.RallyError as e:
# just pass on our own errors. It should be treated differently on top-level
raise e
except BaseException:
tb = sys.exc_info()[2]
raise exceptions.RallyError("This race ended early with a fatal crash. For details please see the logs.").with_traceback(tb)
-
-
-class RacingContext:
- def __init__(self, cfg):
- self.config = cfg
- self.mechanic = mechanic.Mechanic(cfg)
- self.reporter = reporter.SummaryReporter(cfg)
- self.track = None
- self.cluster = None
diff --git a/esrally/rally.py b/esrally/rally.py
index f2efb9d04..a80a07024 100644
--- a/esrally/rally.py
+++ b/esrally/rally.py
@@ -8,8 +8,9 @@
import pkg_resources
import thespian.actors
-from esrally import config, paths, racecontrol, reporter, metrics, telemetry, track, car, exceptions, PROGRAM_NAME
+from esrally import config, paths, racecontrol, reporter, metrics, track, exceptions, PROGRAM_NAME
from esrally.utils import io, convert, git, process, console
+from esrally.mechanic import car, telemetry
__version__ = pkg_resources.require("esrally")[0].version
@@ -189,6 +190,7 @@ def parse_args():
try:
int(os.environ["COLUMNS"])
except (KeyError, ValueError):
+ # noinspection PyBroadException
try:
os.environ['COLUMNS'] = str(shutil.get_terminal_size().columns)
except BaseException:
@@ -412,7 +414,7 @@ def list(cfg):
elif what == "cars":
car.list_cars()
else:
- raise exceptions.ImproperlyConfigured("Cannot list unknown configuration option [%s]" % what)
+ raise exceptions.SystemSetupError("Cannot list unknown configuration option [%s]" % what)
def print_help_on_errors(cfg):
@@ -434,7 +436,7 @@ def dispatch_sub_command(cfg, sub_command):
elif sub_command == "race":
racecontrol.run(cfg)
else:
- raise exceptions.ImproperlyConfigured("Unknown subcommand [%s]" % sub_command)
+ raise exceptions.SystemSetupError("Unknown subcommand [%s]" % sub_command)
return True
except exceptions.RallyError as e:
logging.exception("Cannot run subcommand [%s]." % sub_command)
@@ -530,12 +532,12 @@ def main():
cfg.add(config.Scope.applicationOverride, "source", "distribution.repository", args.distribution_repository)
cfg.add(config.Scope.applicationOverride, "system", "pipeline", args.pipeline)
cfg.add(config.Scope.applicationOverride, "system", "track.repository", args.track_repository)
- cfg.add(config.Scope.applicationOverride, "system", "track", args.track)
cfg.add(config.Scope.applicationOverride, "system", "quiet.mode", args.quiet)
cfg.add(config.Scope.applicationOverride, "system", "offline.mode", args.offline)
cfg.add(config.Scope.applicationOverride, "system", "user.tag", args.user_tag)
cfg.add(config.Scope.applicationOverride, "system", "logging.output", args.logging)
cfg.add(config.Scope.applicationOverride, "telemetry", "devices", csv_to_list(args.telemetry))
+ cfg.add(config.Scope.applicationOverride, "benchmarks", "track", args.track)
cfg.add(config.Scope.applicationOverride, "benchmarks", "challenge", args.challenge)
cfg.add(config.Scope.applicationOverride, "benchmarks", "car", args.car)
cfg.add(config.Scope.applicationOverride, "benchmarks", "cluster.health", args.cluster_health)
@@ -560,6 +562,7 @@ def main():
logger.info("Command line arguments: %s" % args)
# Kill any lingering Rally processes before attempting to continue - the actor system needs to a singleton on this machine
+ # noinspection PyBroadException
try:
process.kill_running_rally_instances()
except BaseException:
diff --git a/esrally/reporter.py b/esrally/reporter.py
index 1e2635c2a..dcb53f452 100644
--- a/esrally/reporter.py
+++ b/esrally/reporter.py
@@ -13,24 +13,26 @@
MEDIAN = "50.0"
+def summarize(cfg, track):
+ SummaryReporter(cfg).report(track)
+
+
def compare(cfg):
baseline_ts = cfg.opts("report", "comparison.baseline.timestamp")
contender_ts = cfg.opts("report", "comparison.contender.timestamp")
if not baseline_ts or not contender_ts:
- raise exceptions.ImproperlyConfigured("compare needs baseline and a contender")
+ raise exceptions.SystemSetupError("compare needs baseline and a contender")
race_store = metrics.race_store(cfg)
ComparisonReporter(cfg).report(
race_store.find_by_timestamp(baseline_ts),
race_store.find_by_timestamp(contender_ts))
-# TODO dm: Inline?
def print_internal(message):
console.println(message, logger=logger.info)
-# TODO dm: Inline?
def print_header(message):
print_internal(console.format.bold(message))
@@ -163,15 +165,9 @@ def report(self, t):
print_internal("")
selected_challenge = self._config.opts("benchmarks", "challenge")
- selected_car = self._config.opts("benchmarks", "car")
- invocation = self._config.opts("meta", "time.start")
- logger.info("Generating summary report for invocation=[%s], track=[%s], challenge=[%s], car=[%s]" %
- (invocation, t.name, selected_challenge, selected_car))
for challenge in t.challenges:
if challenge.name == selected_challenge:
store = metrics.metrics_store(self._config)
- store.open(invocation, t.name, challenge.name, selected_car)
-
stats = Stats(store, challenge)
metrics_table = []
@@ -339,12 +335,12 @@ def report(self, r1, r2):
(r1.trial_timestamp, r1.track, r1.challenge, r1.car,
r2.trial_timestamp, r2.track, r2.challenge, r2.car))
# we don't verify anything about the races as it is possible that the user benchmarks two different tracks intentionally
- baseline_store = metrics.metrics_store(self._config)
- baseline_store.open(r1.trial_timestamp, r1.track, r1.challenge.name, r1.car)
+ baseline_store = metrics.metrics_store(self._config,
+ invocation=r1.trial_timestamp, track=r1.track, challenge=r1.challenge.name, car=r1.car)
baseline_stats = Stats(baseline_store, r1.challenge)
- contender_store = metrics.metrics_store(self._config)
- contender_store.open(r2.trial_timestamp, r2.track, r2.challenge.name, r2.car)
+ contender_store = metrics.metrics_store(self._config,
+ invocation=r2.trial_timestamp, track=r2.track, challenge=r2.challenge.name, car=r2.car)
contender_stats = Stats(contender_store, r2.challenge)
print_internal("")
diff --git a/esrally/sweeper.py b/esrally/sweeper.py
deleted file mode 100644
index fb908f54b..000000000
--- a/esrally/sweeper.py
+++ /dev/null
@@ -1,20 +0,0 @@
-import shutil
-
-from esrally import paths
-from esrally.utils import io, console
-
-
-def sweep(ctx):
- invocation_root = paths.Paths(ctx.config).invocation_root()
- track_name = ctx.config.opts("system", "track")
- challenge_name = ctx.config.opts("benchmarks", "challenge")
- car_name = ctx.config.opts("benchmarks", "car")
-
- log_root = paths.Paths(ctx.config).log_root()
- # for external benchmarks, there is no match to a car
- car_suffix = "-%s" % car_name if car_name else ""
- archive_path = "%s/logs-%s-%s%s.zip" % (invocation_root, track_name, challenge_name, car_suffix)
- io.compress(log_root, archive_path)
- console.println("\nLogs for this race are archived in %s" % archive_path)
- shutil.rmtree(log_root)
-
diff --git a/esrally/track.py b/esrally/track.py
index 3427e48cf..97445fac0 100644
--- a/esrally/track.py
+++ b/esrally/track.py
@@ -10,7 +10,7 @@
import jsonschema
import tabulate
-from esrally import exceptions, time
+from esrally import exceptions, time, PROGRAM_NAME
from esrally.utils import io, convert, net, git, versions, console
logger = logging.getLogger("rally.track")
@@ -193,21 +193,26 @@ def list_tracks(cfg):
headers=["Name", "Description", "Challenges"]))
-def load_track(cfg, track_name):
+def load_track(cfg):
"""
- Loads a track with the given name.
+ Loads a track
- :param cfg: The config object.
- :param track_name: The name of a track to load.
+ :param cfg: The config object. It contains the name of the track to load.
:return: The loaded track.
"""
- repo = TrackRepository(cfg)
- reader = TrackFileReader(cfg)
- distribution_version = cfg.opts("source", "distribution.version", mandatory=False)
- data_root = cfg.opts("benchmarks", "local.dataset.cache")
- return reader.read(track_name, repo.track_file(distribution_version, track_name), repo.track_dir(track_name),
- "%s/%s" % (data_root, track_name.lower()))
+ track_name = cfg.opts("benchmarks", "track")
+ try:
+ repo = TrackRepository(cfg)
+ reader = TrackFileReader(cfg)
+ distribution_version = cfg.opts("source", "distribution.version", mandatory=False)
+ data_root = cfg.opts("benchmarks", "local.dataset.cache")
+ return reader.read(track_name, repo.track_file(distribution_version, track_name), repo.track_dir(track_name),
+ "%s/%s" % (data_root, track_name.lower()))
+ except FileNotFoundError:
+ logger.exception("Cannot load track [%s]" % track_name)
+ raise exceptions.SystemSetupError("Cannot load track %s. You can list the available tracks with %s list tracks." %
+ (track_name, PROGRAM_NAME))
def prepare_track(track, cfg):
diff --git a/tests/mechanic/builder_test.py b/tests/mechanic/builder_test.py
deleted file mode 100644
index 5af16c595..000000000
--- a/tests/mechanic/builder_test.py
+++ /dev/null
@@ -1,42 +0,0 @@
-from unittest import TestCase
-import unittest.mock as mock
-
-from esrally import config
-from esrally.mechanic import builder
-
-
-class BuilderTests(TestCase):
-
- @mock.patch("esrally.utils.process.run_subprocess")
- def test_build(self, mock_run_subprocess):
- mock_run_subprocess.return_value = False
-
- cfg = config.Config()
- cfg.add(config.Scope.application, "source", "local.src.dir", "/src")
- cfg.add(config.Scope.application, "runtime", "java8.home", "/opt/jdk8")
- cfg.add(config.Scope.application, "build", "gradle.bin", "/usr/local/gradle")
- cfg.add(config.Scope.application, "build", "gradle.tasks.clean", "clean")
- cfg.add(config.Scope.application, "build", "gradle.tasks.package", "assemble")
- cfg.add(config.Scope.application, "system", "log.dir", "logs")
- cfg.add(config.Scope.application, "build", "log.dir", "build")
-
- b = builder.Builder(cfg)
- b.build()
-
- calls = [
- # Actual call
- mock.call("export JAVA_HOME=/opt/jdk8; cd /src; /usr/local/gradle clean > logs/build/build.gradle.tasks.clean.log 2>&1"),
- # Return value check
- mock.call("export JAVA_HOME=/opt/jdk8; cd /src; /usr/local/gradle assemble > logs/build/build.gradle.tasks.package.log 2>&1"),
- ]
-
- mock_run_subprocess.assert_has_calls(calls)
-
- @mock.patch("glob.glob", lambda p: ["elasticsearch.zip"])
- def test_add_binary_to_config(self):
- cfg = config.Config()
- cfg.add(config.Scope.application, "source", "local.src.dir", "/src")
- b = builder.Builder(cfg)
- b.add_binary_to_config()
- self.assertEqual(cfg.opts("builder", "candidate.bin.path"), "elasticsearch.zip")
-
diff --git a/tests/mechanic/launcher_test.py b/tests/mechanic/launcher_test.py
index 4894a6a6d..78c9cf0ba 100644
--- a/tests/mechanic/launcher_test.py
+++ b/tests/mechanic/launcher_test.py
@@ -9,6 +9,14 @@ def add_meta_info(self, scope, scope_key, key, value):
pass
+class MockClientFactory:
+ def __init__(self, hosts, client_options):
+ pass
+
+ def create(self):
+ return MockClient()
+
+
class MockClient:
def __init__(self):
self.cluster = SubClient({
@@ -64,9 +72,11 @@ class ExternalLauncherTests(TestCase):
def test_setup_external_cluster_single_node(self):
cfg = config.Config()
cfg.add(config.Scope.application, "telemetry", "devices", [])
+ cfg.add(config.Scope.application, "launcher", "external.target.hosts", ["10.0.0.10:9200", "10.0.0.11:9200"])
+ cfg.add(config.Scope.application, "launcher", "client.options", [])
- m = launcher.ExternalLauncher(cfg)
- m.start(MockClient(), MockMetricsStore())
+ m = launcher.ExternalLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
+ m.start()
# automatically determined by launcher on attach
self.assertEqual(cfg.opts("source", "distribution.version"), "5.0.0")
@@ -74,9 +84,11 @@ def test_setup_external_cluster_single_node(self):
def test_setup_external_cluster_multiple_nodes(self):
cfg = config.Config()
cfg.add(config.Scope.application, "telemetry", "devices", [])
+ cfg.add(config.Scope.application, "launcher", "external.target.hosts", ["10.0.0.10:9200", "10.0.0.11:9200"])
+ cfg.add(config.Scope.application, "launcher", "client.options", [])
cfg.add(config.Scope.application, "source", "distribution.version", "2.3.3")
- m = launcher.ExternalLauncher(cfg)
- m.start(MockClient(), MockMetricsStore())
+ m = launcher.ExternalLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
+ m.start()
# did not change user defined value
self.assertEqual(cfg.opts("source", "distribution.version"), "2.3.3")
diff --git a/tests/mechanic/provisioner_test.py b/tests/mechanic/provisioner_test.py
index d86137752..37057a122 100644
--- a/tests/mechanic/provisioner_test.py
+++ b/tests/mechanic/provisioner_test.py
@@ -1,7 +1,7 @@
import unittest.mock as mock
from unittest import TestCase
-from esrally import config, car
+from esrally import config
from esrally.mechanic import provisioner
@@ -69,12 +69,14 @@ def test_prepare(self, mock_path_exists, mock_rm, mock_ensure_dir, mock_decompre
cfg = config.Config()
cfg.add(config.Scope.application, "system", "env.name", "unittest")
cfg.add(config.Scope.application, "system", "challenge.root.dir", "/rally-root/track/challenge")
+ cfg.add(config.Scope.application, "benchmarks", "car", "defaults")
cfg.add(config.Scope.application, "builder", "candidate.bin.path", "/data/builds/distributions/")
cfg.add(config.Scope.application, "provisioning", "local.install.dir", "es-bin")
cfg.add(config.Scope.application, "provisioning", "install.preserve", False)
cfg.add(config.Scope.application, "provisioning", "datapaths", [])
+ cfg.add(config.Scope.application, "provisioning", "node.http.port", 39200)
p = provisioner.Provisioner(cfg)
- p.prepare(car.Car(name="test-car"), 39200)
+ p.prepare()
self.assertEqual(cfg.opts("provisioning", "local.binary.path"), "/install/elasticsearch-5.0.0-SNAPSHOT")
diff --git a/tests/mechanic/supplier_test.py b/tests/mechanic/supplier_test.py
index bf530998e..aca5c9fab 100644
--- a/tests/mechanic/supplier_test.py
+++ b/tests/mechanic/supplier_test.py
@@ -1,11 +1,13 @@
+import urllib.error
+
from unittest import TestCase
import unittest.mock as mock
-from esrally import config
+from esrally import config, exceptions
from esrally.mechanic import supplier
-class SupplierTests(TestCase):
+class SourceRepositoryTests(TestCase):
@mock.patch("esrally.utils.git.head_revision", autospec=True)
@mock.patch("esrally.utils.git.pull", autospec=True)
@mock.patch("esrally.utils.git.clone", autospec=True)
@@ -19,7 +21,7 @@ def test_intial_checkout_latest(self, mock_is_working_copy, mock_clone, mock_pul
mock_is_working_copy.return_value = False
mock_head_revision.return_value = "HEAD"
- s = supplier.Supplier(cfg)
+ s = supplier.SourceRepository(cfg)
s.fetch()
mock_is_working_copy.assert_called_with("/src")
@@ -40,7 +42,7 @@ def test_checkout_current(self, mock_is_working_copy, mock_clone, mock_pull, moc
mock_is_working_copy.return_value = True
mock_head_revision.return_value = "HEAD"
- s = supplier.Supplier(cfg)
+ s = supplier.SourceRepository(cfg)
s.fetch()
mock_is_working_copy.assert_called_with("/src")
@@ -60,7 +62,7 @@ def test_checkout_ts(self, mock_is_working_copy, mock_pull_ts, mock_head_revisio
mock_is_working_copy.return_value = True
mock_head_revision.return_value = "HEAD"
- s = supplier.Supplier(cfg)
+ s = supplier.SourceRepository(cfg)
s.fetch()
mock_is_working_copy.assert_called_with("/src")
@@ -79,9 +81,146 @@ def test_checkout_revision(self, mock_is_working_copy, mock_pull_revision, mock_
mock_is_working_copy.return_value = True
mock_head_revision.return_value = "HEAD"
- s = supplier.Supplier(cfg)
+ s = supplier.SourceRepository(cfg)
s.fetch()
mock_is_working_copy.assert_called_with("/src")
mock_pull_revision.assert_called_with("/src", "67c2f42")
mock_head_revision.assert_called_with("/src")
+
+
+class BuilderTests(TestCase):
+
+ @mock.patch("esrally.utils.process.run_subprocess")
+ def test_build(self, mock_run_subprocess):
+ mock_run_subprocess.return_value = False
+
+ cfg = config.Config()
+ cfg.add(config.Scope.application, "source", "local.src.dir", "/src")
+ cfg.add(config.Scope.application, "runtime", "java8.home", "/opt/jdk8")
+ cfg.add(config.Scope.application, "build", "gradle.bin", "/usr/local/gradle")
+ cfg.add(config.Scope.application, "build", "gradle.tasks.clean", "clean")
+ cfg.add(config.Scope.application, "build", "gradle.tasks.package", "assemble")
+ cfg.add(config.Scope.application, "system", "log.dir", "logs")
+ cfg.add(config.Scope.application, "build", "log.dir", "build")
+
+ b = supplier.Builder(cfg)
+ b.build()
+
+ calls = [
+ # Actual call
+ mock.call("export JAVA_HOME=/opt/jdk8; cd /src; /usr/local/gradle clean > logs/build/build.gradle.tasks.clean.log 2>&1"),
+ # Return value check
+ mock.call("export JAVA_HOME=/opt/jdk8; cd /src; /usr/local/gradle assemble > logs/build/build.gradle.tasks.package.log 2>&1"),
+ ]
+
+ mock_run_subprocess.assert_has_calls(calls)
+
+ @mock.patch("glob.glob", lambda p: ["elasticsearch.zip"])
+ def test_add_binary_to_config(self):
+ cfg = config.Config()
+ cfg.add(config.Scope.application, "source", "local.src.dir", "/src")
+ b = supplier.Builder(cfg)
+ b.add_binary_to_config()
+ self.assertEqual(cfg.opts("builder", "candidate.bin.path"), "elasticsearch.zip")
+
+
+class SnapshotDistributionRepositoryTests(TestCase):
+ @mock.patch("esrally.utils.net.retrieve_content_as_string")
+ def test_download_url_for_valid_version(self, content_as_string):
+ content_as_string.return_value = """
+
+ org.elasticsearch.distribution.tar
+ elasticsearch
+ 5.0.0-SNAPSHOT
+
+
+ 20160613.162731
+ 397
+
+ 20160616030717
+
+
+ pom
+ 5.0.0-20160613.162731-397
+ 20160613162731
+
+
+ tar.gz
+ 5.0.0-20160613.162731-397
+ 20160613162731
+
+
+
+
+"""
+ repo = supplier.SnapshotDistributionRepo()
+ self.assertEqual("https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/distribution/tar/elasticsearch/"
+ "5.0.0-SNAPSHOT/elasticsearch-5.0.0-20160613.162731-397.tar.gz", repo.download_url("5.0.0-SNAPSHOT"))
+
+ @mock.patch("esrally.utils.net.retrieve_content_as_string")
+ def test_download_url_for_invalid_metadata(self, content_as_string):
+ content_as_string.return_value = """
+
+ org.elasticsearch.distribution.tar
+ elasticsearch
+ 5.0.0-SNAPSHOT
+
+
+ 20160613.162731
+ 397
+
+ 20160616030717
+
+
+ pom
+ 5.0.0-20160613.162731-397
+ 20160613162731
+
+
+
+
+"""
+ repo = supplier.SnapshotDistributionRepo()
+ with self.assertRaises(exceptions.SystemSetupError) as ctx:
+ repo.download_url("5.0.0-SNAPSHOT")
+ self.assertEqual("Cannot derive download URL for Elasticsearch 5.0.0-SNAPSHOT", ctx.exception.args[0])
+
+ @mock.patch("esrally.utils.net.retrieve_content_as_string")
+ def test_download_url_for_corrupt_metadata(self, content_as_string):
+ content_as_string.return_value = """
+
- org.elasticsearch.distribution.tar
- elasticsearch
- 5.0.0-SNAPSHOT
-
-
- 20160613.162731
- 397
-
- 20160616030717
-
-
- pom
- 5.0.0-20160613.162731-397
- 20160613162731
-
-
- tar.gz
- 5.0.0-20160613.162731-397
- 20160613162731
-
-
-
-
-"""
- repo = racecontrol.SnapshotDistributionRepo()
- self.assertEqual("https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/distribution/tar/elasticsearch/"
- "5.0.0-SNAPSHOT/elasticsearch-5.0.0-20160613.162731-397.tar.gz", repo.download_url("5.0.0-SNAPSHOT"))
-
- @mock.patch("esrally.utils.net.retrieve_content_as_string")
- def test_download_url_for_invalid_metadata(self, content_as_string):
- content_as_string.return_value = """
-
- org.elasticsearch.distribution.tar
- elasticsearch
- 5.0.0-SNAPSHOT
-
-
- 20160613.162731
- 397
-
- 20160616030717
-
-
- pom
- 5.0.0-20160613.162731-397
- 20160613162731
-
-
-
-
-"""
- repo = racecontrol.SnapshotDistributionRepo()
- with self.assertRaises(exceptions.SystemSetupError) as ctx:
- repo.download_url("5.0.0-SNAPSHOT")
- self.assertEqual("Cannot derive download URL for Elasticsearch 5.0.0-SNAPSHOT", ctx.exception.args[0])
-
- @mock.patch("esrally.utils.net.retrieve_content_as_string")
- def test_download_url_for_corrupt_metadata(self, content_as_string):
- content_as_string.return_value = """
-