Skip to content

Commit

Permalink
Always rely on source artifact caching (#1178)
Browse files Browse the repository at this point in the history
With this commit we remove the undocumented command line parameter
`--skip-build` which was useful before Rally has implemented source
artifact caching. As Rally is aware whether an artifact needs to built,
this flag can be removed.
  • Loading branch information
danielmitterdorfer authored Feb 22, 2021
1 parent 5461f0c commit 8350479
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 120 deletions.
19 changes: 8 additions & 11 deletions esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
def download(cfg):
car, plugins = load_team(cfg, external=False)

s = supplier.create(cfg, sources=False, distribution=True, build=False, car=car, plugins=plugins)
s = supplier.create(cfg, sources=False, distribution=True, car=car, plugins=plugins)
binaries = s()
console.println(json.dumps(binaries, indent=2), force=True)

Expand All @@ -48,7 +48,6 @@ def install(cfg):
# A non-empty distribution-version is provided
distribution = bool(cfg.opts("mechanic", "distribution.version", mandatory=False))
sources = not distribution
build = not cfg.opts("mechanic", "skip.build")
build_type = cfg.opts("mechanic", "build.type")
ip = cfg.opts("mechanic", "network.host")
http_port = int(cfg.opts("mechanic", "network.http.port"))
Expand All @@ -57,7 +56,7 @@ def install(cfg):
seed_hosts = cfg.opts("mechanic", "seed.hosts")

if build_type == "tar":
binary_supplier = supplier.create(cfg, sources, distribution, build, car, plugins)
binary_supplier = supplier.create(cfg, sources, distribution, car, plugins)
p = provisioner.local(cfg=cfg, car=car, plugins=plugins, cluster_settings={}, ip=ip, http_port=http_port,
all_node_ips=seed_hosts, all_node_names=master_nodes, target_root=root_path,
node_name=node_name)
Expand Down Expand Up @@ -166,13 +165,12 @@ def _delete_node_file(root_path):
##############################

class StartEngine:
def __init__(self, cfg, open_metrics_context, cluster_settings, sources, build, distribution, external, docker, ip=None, port=None,
def __init__(self, cfg, open_metrics_context, cluster_settings, sources, distribution, external, docker, ip=None, port=None,
node_id=None):
self.cfg = cfg
self.open_metrics_context = open_metrics_context
self.cluster_settings = cluster_settings
self.sources = sources
self.build = build
self.distribution = distribution
self.external = external
self.docker = docker
Expand All @@ -192,7 +190,7 @@ def for_nodes(self, all_node_ips=None, all_node_ids=None, ip=None, port=None, no
:param node_ids: A list of node id to set.
:return: A corresponding ``StartNodes`` message with the specified IP, port number and node ids.
"""
return StartNodes(self.cfg, self.open_metrics_context, self.cluster_settings, self.sources, self.build, self.distribution,
return StartNodes(self.cfg, self.open_metrics_context, self.cluster_settings, self.sources, self.distribution,
self.external, self.docker, all_node_ips, all_node_ids, ip, port, node_ids)


Expand All @@ -219,13 +217,12 @@ def __init__(self, reset_in_seconds):
##############################

class StartNodes:
def __init__(self, cfg, open_metrics_context, cluster_settings, sources, build, distribution, external, docker,
def __init__(self, cfg, open_metrics_context, cluster_settings, sources, distribution, external, docker,
all_node_ips, all_node_ids, ip, port, node_ids):
self.cfg = cfg
self.open_metrics_context = open_metrics_context
self.cluster_settings = cluster_settings
self.sources = sources
self.build = build
self.distribution = distribution
self.external = external
self.docker = docker
Expand Down Expand Up @@ -561,7 +558,7 @@ def receiveMsg_StartNodes(self, msg, sender):
# avoid follow-up errors in case we receive an unexpected ActorExitRequest due to an early failure in a parent actor.

self.mechanic = create(cfg, metrics_store, msg.ip, msg.port, msg.all_node_ips, msg.all_node_ids,
msg.cluster_settings, msg.sources, msg.build, msg.distribution,
msg.cluster_settings, msg.sources, msg.distribution,
msg.external, msg.docker)
self.mechanic.start_engine()
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS)
Expand Down Expand Up @@ -622,14 +619,14 @@ def load_team(cfg, external):


def create(cfg, metrics_store, node_ip, node_http_port, all_node_ips, all_node_ids, cluster_settings=None,
sources=False, build=False, distribution=False, external=False, docker=False):
sources=False, distribution=False, external=False, docker=False):
race_root_path = paths.race_root(cfg)
node_ids = cfg.opts("provisioning", "node.ids", mandatory=False)
node_name_prefix = cfg.opts("provisioning", "node.name.prefix")
car, plugins = load_team(cfg, external)

if sources or distribution:
s = supplier.create(cfg, sources, distribution, build, car, plugins)
s = supplier.create(cfg, sources, distribution, car, plugins)
p = []
all_node_names = ["%s-%s" % (node_name_prefix, n) for n in all_node_ids]
for node_id in node_ids:
Expand Down
18 changes: 5 additions & 13 deletions esrally/mechanic/supplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
REVISION_PATTERN = r"(\w.*?):(.*)"


def create(cfg, sources, distribution, build, car, plugins=None):
def create(cfg, sources, distribution, car, plugins=None):
logger = logging.getLogger(__name__)
if plugins is None:
plugins = []
caching_enabled = cfg.opts("source", "cache", mandatory=False, default_value=True)
revisions = _extract_revisions(cfg.opts("mechanic", "source.revision", mandatory=sources))
distribution_version = cfg.opts("mechanic", "distribution.version", mandatory=False)
supply_requirements = _supply_requirements(sources, distribution, build, plugins, revisions, distribution_version)
supply_requirements = _supply_requirements(sources, distribution, plugins, revisions, distribution_version)
build_needed = any([build for _, _, build in supply_requirements.values()])
es_supplier_type, es_version, _ = supply_requirements["elasticsearch"]
src_config = cfg.all_opts("source")
Expand Down Expand Up @@ -153,15 +153,15 @@ def _required_revision(revisions, key, name=None):
raise exceptions.SystemSetupError("No revision specified for %s" % n)


def _supply_requirements(sources, distribution, build, plugins, revisions, distribution_version):
def _supply_requirements(sources, distribution, plugins, revisions, distribution_version):
# per artifact (elasticsearch or a specific plugin):
# * key: artifact
# * value: ("source" | "distribution", distribution_version | revision, build = True | False)
supply_requirements = {}

# can only build Elasticsearch with source-related pipelines -> ignore revision in that case
if "elasticsearch" in revisions and sources:
supply_requirements["elasticsearch"] = ("source", _required_revision(revisions, "elasticsearch", "Elasticsearch"), build)
supply_requirements["elasticsearch"] = ("source", _required_revision(revisions, "elasticsearch", "Elasticsearch"), True)
else:
# no revision given or explicitly specified that it's from a distribution -> must use a distribution
supply_requirements["elasticsearch"] = ("distribution", _required_version(distribution_version), False)
Expand All @@ -173,14 +173,6 @@ def _supply_requirements(sources, distribution, build, plugins, revisions, distr
else:
# allow catch-all only if we're generally building from sources. If it is mixed, the user should tell explicitly.
if plugin.name in revisions or ("all" in revisions and sources):
# this plugin always needs to built unless we explicitly disable it; we cannot solely rely on the Rally pipeline.
# We either have:
#
# * --pipeline=from-sources --distribution-version=X.Y.Z where the plugin should not be built but ES should be
# a distributed version.
# * --distribution-version=X.Y.Z --revision="my-plugin:abcdef" where the plugin should be built from sources.
# pylint: disable=consider-using-ternary
plugin_needs_build = (sources and build) or distribution
# be a bit more lenient when checking for plugin revisions. This allows users to specify `--revision="current"` and
# rely on Rally to do the right thing.
try:
Expand All @@ -193,7 +185,7 @@ def _supply_requirements(sources, distribution, build, plugins, revisions, distr
else:
logging.getLogger(__name__).info("Revision for [%s] is not explicitly defined. Using catch-all revision [%s].",
plugin.name, plugin_revision)
supply_requirements[plugin.name] = ("source", plugin_revision, plugin_needs_build)
supply_requirements[plugin.name] = ("source", plugin_revision, True)
else:
supply_requirements[plugin.name] = (distribution, _required_version(distribution_version), False)
return supply_requirements
Expand Down
10 changes: 4 additions & 6 deletions esrally/racecontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ def __call__(self, cfg):


class Setup:
def __init__(self, cfg, sources=False, build=False, distribution=False, external=False, docker=False):
def __init__(self, cfg, sources=False, distribution=False, external=False, docker=False):
self.cfg = cfg
self.sources = sources
self.build = build
self.distribution = distribution
self.external = external
self.docker = docker
Expand Down Expand Up @@ -105,7 +104,6 @@ def receiveMsg_Setup(self, msg, sender):
self.coordinator.metrics_store.open_context,
cluster_settings,
msg.sources,
msg.build,
msg.distribution,
msg.external,
msg.docker))
Expand Down Expand Up @@ -234,13 +232,13 @@ def on_benchmark_complete(self, new_metrics):
self.metrics_store.close()


def race(cfg, sources=False, build=False, distribution=False, external=False, docker=False):
def race(cfg, sources=False, distribution=False, external=False, docker=False):
logger = logging.getLogger(__name__)
# at this point an actor system has to run and we should only join
actor_system = actor.bootstrap_actor_system(try_join=True)
benchmark_actor = actor_system.createActor(BenchmarkActor, targetActorRequirements={"coordinator": True})
try:
result = actor_system.ask(benchmark_actor, Setup(cfg, sources, build, distribution, external, docker))
result = actor_system.ask(benchmark_actor, Setup(cfg, sources, distribution, external, docker))
if isinstance(result, Success):
logger.info("Benchmark has finished successfully.")
# may happen if one of the load generators has detected that the user has cancelled the benchmark.
Expand Down Expand Up @@ -276,7 +274,7 @@ def set_default_hosts(cfg, host="127.0.0.1", port=9200):
def from_sources(cfg):
port = cfg.opts("provisioning", "node.http.port")
set_default_hosts(cfg, port=port)
return race(cfg, sources=True, build=True)
return race(cfg, sources=True)


def from_distribution(cfg):
Expand Down
7 changes: 0 additions & 7 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,6 @@ def add_track_source(subparser):
" The timestamp must be specified as: \"@ts\" where \"ts\" must be a valid ISO 8601 timestamp, "
"e.g. \"@2013-07-27T10:37:00Z\" (default: current).",
default="current") # optimized for local usage, don't fetch sources
install_parser.add_argument(
"--skip-build",
help="Whether Rally should skip rebuilding Elasticsearch (default: false).",
default=False,
action="store_true")
# Intentionally undocumented as we do not consider Docker a fully supported option.
install_parser.add_argument(
"--build-type",
Expand Down Expand Up @@ -793,8 +788,6 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "mechanic", "network.host", args.network_host)
cfg.add(config.Scope.applicationOverride, "mechanic", "network.http.port", args.http_port)
cfg.add(config.Scope.applicationOverride, "mechanic", "source.revision", args.revision)
# TODO: Remove this special treatment and rely on artifact caching (follow-up PR)
cfg.add(config.Scope.applicationOverride, "mechanic", "skip.build", args.skip_build)
cfg.add(config.Scope.applicationOverride, "mechanic", "build.type", args.build_type)
cfg.add(config.Scope.applicationOverride, "mechanic", "runtime.jdk", args.runtime_jdk)
cfg.add(config.Scope.applicationOverride, "mechanic", "node.name", args.node_name)
Expand Down
Loading

0 comments on commit 8350479

Please sign in to comment.