Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail Rally early if there are unused variables in track-params #688

Merged
merged 18 commits into from
May 23, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ def with_actor_system(runnable, cfg):
if not already_running:
shutdown_complete = False
times_interrupted = 0
# give some time for any outstanding messages to be delivered to the actor system
time.sleep(3)
while not shutdown_complete and times_interrupted < 2:
try:
logger.info("Attempting to shutdown internal actor system.")
Expand Down
81 changes: 50 additions & 31 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
# specific language governing permissions and limitations
# under the License.

import copy
import difflib
import json
import logging
import os
Expand All @@ -35,6 +33,7 @@
from esrally.utils import io, convert, net, console, modules, opts, repo
from jinja2 import meta


class TrackSyntaxError(exceptions.InvalidSyntax):
"""
Raised whenever a syntax problem is encountered when loading the track specification.
Expand Down Expand Up @@ -497,7 +496,7 @@ def load_template_from_file(self):
try:
base_track = loader.get_source(jinja2.Environment(), self.template_file_name)
except jinja2.TemplateNotFound:
self.logger.exception("Could not track from [%s].", self.template_file_name)
self.logger.exception("Could not load track from [%s].", self.template_file_name)
raise TrackSyntaxError("Could not load track from '{}'".format(self.template_file_name))
self.assembled_source = self.replace_includes(self.base_path, base_track[0])

Expand Down Expand Up @@ -525,12 +524,28 @@ def read_glob_files(self, pattern):
source = []
files = self.fileglobber(pattern)
for fname in files:
with self.source(fname, mode="rt") as fp:
with self.source(fname, mode="rt", encoding="utf-8") as fp:
source.append(fp.read())
return ",\n".join(source)


def render_template(template_source, template_vars=None, glob_helper=lambda f: [], clock=time.Clock, loader=None):
def default_internal_template_vars(glob_helper=lambda f: [], clock=time.Clock):
"""
Dict of internal global variables used by our jinja2 renderers
"""

return {
"globals": {
"now": clock.now(),
"glob": glob_helper
},
"filters": {
"days_ago": time.days_ago
}
}


def render_template(template_source, template_vars=None, template_internal_vars=None, loader=None):
macros = """
{% macro collect(parts) -%}
{% set comma = joiner() %}
Expand All @@ -554,24 +569,28 @@ def render_template(template_source, template_vars=None, glob_helper=lambda f: [
for k, v in template_vars.items():
env.globals[k] = v
# ensure that user variables never override our internal variables
env.globals["now"] = clock.now()
env.globals["glob"] = glob_helper
env.filters["days_ago"] = time.days_ago
if template_internal_vars:
for macro_type in template_internal_vars:
for env_global_key, env_global_value in template_internal_vars[macro_type].items():
getattr(env, macro_type)[env_global_key] = env_global_value

template = env.from_string(template_source)
return template.render()


def register_all_params_in_track(assembled_source, complete_track_params=None):
j2env = jinja2.Environment()

# we don't need the following j2 filters/macros but we define them anyway to prevent parsing failures
j2env.globals["now"] = time.Clock()
# use dummy macro for glob, we don't require it to assemble the source
j2env.globals["glob"] = lambda c: ""
j2env.filters["days_ago"] = time.days_ago
internal_template_vars = default_internal_template_vars()
for macro_type in internal_template_vars:
for env_global_key, env_global_value in internal_template_vars[macro_type].items():
getattr(j2env, macro_type)[env_global_key] = env_global_value

ast = j2env.parse(assembled_source)
j2_variables = meta.find_undeclared_variables(ast)
# print("The j2 variables are {}".format(j2_variables))
complete_track_params.populate_track_defined_params(j2_variables)
if complete_track_params:
complete_track_params.populate_track_defined_params(j2_variables)


def render_template_from_file(template_file_name, template_vars, complete_track_params=None):
Expand All @@ -585,13 +604,12 @@ def relative_glob(start, f):
base_path = io.dirname(template_file_name)
template_source = TemplateSource(base_path, io.basename(template_file_name))
template_source.load_template_from_file()
# print(template_source.assembled_source)
register_all_params_in_track(template_source.assembled_source, complete_track_params)

return render_template(loader=jinja2.FileSystemLoader(base_path),
template_source=template_source.assembled_source,
template_vars=template_vars,
glob_helper=lambda f: relative_glob(base_path, f))
template_internal_vars=default_internal_template_vars(glob_helper=lambda f: relative_glob(base_path, f)))


def filter_included_tasks(t, filters):
Expand Down Expand Up @@ -707,8 +725,9 @@ def post_process_for_test_mode(t):


class CompleteTrackParams:
def __init__(self):
def __init__(self, user_specified_track_params=None):
self.track_defined_params = set()
self.user_specified_track_params = user_specified_track_params

def populate_track_defined_params(self, list_of_track_params=None):
self.track_defined_params.update(set(list_of_track_params))
Expand All @@ -717,6 +736,12 @@ def populate_track_defined_params(self, list_of_track_params=None):
def sorted_track_defined_params(self):
return sorted(self.track_defined_params)

def unused_user_defined_track_params(self):
set_user_params = set(list(self.user_specified_track_params.keys()))
set_user_params.difference_update(self.track_defined_params)

return list(set_user_params)


class TrackFileReader:
MINIMUM_SUPPORTED_TRACK_VERSION = 2
Expand All @@ -730,7 +755,7 @@ def __init__(self, cfg):
with open(track_schema_file, mode="rt", encoding="utf-8") as f:
self.track_schema = json.loads(f.read())
self.track_params = cfg.opts("track", "params")
self.complete_track_params = CompleteTrackParams()
self.complete_track_params = CompleteTrackParams(user_specified_track_params=self.track_params)
self.read_track = TrackSpecificationReader(
track_params=self.track_params,
complete_track_params=self.complete_track_params)
Expand Down Expand Up @@ -786,36 +811,30 @@ def read(self, track_name, track_spec_file, mapping_dir):

current_track = self.read_track(track_name, track_spec, mapping_dir)

unused_user_defined_track_params = self.unused_user_defined_track_params()
unused_user_defined_track_params = self.complete_track_params.unused_user_defined_track_params()
if len(unused_user_defined_track_params) > 0:
err_msg = (
"Some of your track parameter(s) {} are not used by this track; perhaps you intend to use {} instead.\n\n"
"All track parameters you provided are:\n"
"{}\n\n"
"All parameters exposed by this track:\n"
"{}".format(
",".join(opts.list_as_double_quoted_list(sorted(unused_user_defined_track_params))),
",".join(opts.list_as_double_quoted_list(sorted(opts.make_list_of_close_matches(
",".join(opts.double_quoted_list_of(sorted(unused_user_defined_track_params))),
",".join(opts.double_quoted_list_of(sorted(opts.make_list_of_close_matches(
unused_user_defined_track_params,
self.complete_track_params.track_defined_params
)))),
"\n".join(opts.list_as_bulleted_list(sorted(list(self.track_params.keys())))),
"\n".join(opts.list_as_bulleted_list(self.complete_track_params.sorted_track_defined_params))))
"\n".join(opts.bulleted_list_of(sorted(list(self.track_params.keys())))),
"\n".join(opts.bulleted_list_of(self.complete_track_params.sorted_track_defined_params))))

self.logger.exception(err_msg)
self.logger.critical(err_msg)
# also dump the message on the console
console.println(err_msg)
raise exceptions.TrackConfigError(
"There is a problem with one or more of the supplied track-params. Check the Rally log file for details."
"Unused track parameters {}.".format(sorted(unused_user_defined_track_params))
)
return current_track

def unused_user_defined_track_params(self):
set_user_params = set(list(self.track_params.keys()))
set_user_params.difference_update(self.complete_track_params.track_defined_params)

return list(set_user_params)


class TrackPluginReader:
"""
Expand Down
7 changes: 4 additions & 3 deletions esrally/utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,20 @@ class DictStringFileSourceFactory:
def __init__(self, name_to_contents):
self.name_to_contents = name_to_contents

def __call__(self, name, mode):
return StringAsFileSource(self.name_to_contents[name], mode)
def __call__(self, name, mode, encoding="utf-8"):
return StringAsFileSource(self.name_to_contents[name], mode, encoding)


class StringAsFileSource:
"""
Implementation of ``FileSource`` intended for tests. It's kept close to ``FileSource`` to simplify maintenance but it is not meant to
be used in production code.
"""
def __init__(self, contents, mode):
def __init__(self, contents, mode, encoding="utf-8"):
"""
:param contents: The file contents as an array of strings. Each item in the array should correspond to one line.
:param mode: The file mode. It is ignored in this implementation but kept to implement the same interface as ``FileSource``.
:param encoding: The file encoding. It is ignored in this implementation but kept to implement the same interface as ``FileSource``.
"""
self.contents = contents
self.current_index = 0
Expand Down
4 changes: 2 additions & 2 deletions esrally/utils/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ def to_dict(arg, default_parser=kv_to_map):
return default_parser(csv_to_list(arg))


def list_as_bulleted_list(src_list):
def bulleted_list_of(src_list):
return ["- {}".format(param) for param in src_list]


def list_as_double_quoted_list(src_list):
def double_quoted_list_of(src_list):
return ["\"{}\"".format(param) for param in src_list]


Expand Down
11 changes: 8 additions & 3 deletions integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,14 @@ function set_up_metrics_store {
popd > /dev/null
}

function docker_is_running {
docker ps > /dev/null
return $?
}

function set_up_proxy_server {
# we want to see output to stderr for diagnosing problems
if docker ps > /dev/null; then
if docker_is_running; then
info "Docker is available. Proxy-related tests will be run"
local config_dir="$PWD/.rally_it/proxy_tmp"
mkdir -p ${config_dir}
Expand Down Expand Up @@ -292,9 +297,9 @@ function test_distribution_fails_with_wrong_track_params {
error "Rally didn't fail trying to use the undefined track-param ${undefined_track_params}. Check ${RALLY_LOG}."
error ${err_msg}
exit ${ret_code}
elif [[ ${ret_code} -ne 0 ]]; then
elif docker_is_running && [[ ${ret_code} -ne 0 ]]; then
# need to use grep -P which is unavailable with macOS grep
if ! docker run --rm -v ${RALLY_LOG}:/rally.log:ro ubuntu grep -Pzoq '.*ERROR Some of your track parameter\(s\) "number_of-replicas" are not used by this track; perhaps you intend to use "number_of_replicas" instead\.\n\nAll track parameters you provided are:\n- conflict_probability\n- number_of-replicas\n\nAll parameters exposed by this track:\n- bulk_indexing_clients\n- bulk_size\n- cluster_health\n- conflict_probability\n- index_settings\n- ingest_percentage\n- number_of_replicas\n- number_of_shards\n- on_conflict\n- recency\n- source_enabled\n*' /rally.log; then
if ! docker run --rm -v ${RALLY_LOG}:/rally.log:ro ubuntu:xenial grep -Pzoq '.*CRITICAL Some of your track parameter\(s\) "number_of-replicas" are not used by this track; perhaps you intend to use "number_of_replicas" instead\.\n\nAll track parameters you provided are:\n- conflict_probability\n- number_of-replicas\n\nAll parameters exposed by this track:\n- bulk_indexing_clients\n- bulk_size\n- cluster_health\n- conflict_probability\n- index_settings\n- ingest_percentage\n- number_of_replicas\n- number_of_shards\n- on_conflict\n- recency\n- source_enabled\n*' /rally.log; then
error ${err_msg}
exit ${ret_code}
fi
Expand Down
73 changes: 32 additions & 41 deletions tests/track/loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,8 @@ def test_read_glob_files(self):


class TemplateRenderTests(TestCase):
unittest_template_internal_vars = loader.default_internal_template_vars(clock=StaticClock)

def test_render_simple_template(self):
template = """
{
Expand All @@ -798,7 +800,7 @@ def test_render_simple_template(self):
}
"""

rendered = loader.render_template(template, clock=StaticClock)
rendered = loader.render_template(template, template_internal_vars=TemplateRenderTests.unittest_template_internal_vars)

expected = """
{
Expand All @@ -816,7 +818,8 @@ def test_render_template_with_external_variables(self):
}
"""

rendered = loader.render_template(template, template_vars={"greeting": "Hi"}, clock=StaticClock)
rendered = loader.render_template(template, template_vars={"greeting": "Hi"},
template_internal_vars=TemplateRenderTests.unittest_template_internal_vars)

expected = """
{
Expand Down Expand Up @@ -846,7 +849,7 @@ def key_globber(e):
}
"""

source=io.DictStringFileSourceFactory({
source = io.DictStringFileSourceFactory({
"dynamic-key-1": [
textwrap.dedent('"dkey1": "value1"')
],
Expand All @@ -863,7 +866,7 @@ def key_globber(e):

rendered = loader.render_template(
template_source.assembled_source,
clock=StaticClock)
template_internal_vars=TemplateRenderTests.unittest_template_internal_vars)

expected = """
{
Expand All @@ -888,9 +891,9 @@ def test_render_template_with_variables(self):
}
"""
rendered = loader.render_template(
template,
template_vars={"clients": 8},
clock=StaticClock)
template,
template_vars={"clients": 8},
template_internal_vars=TemplateRenderTests.unittest_template_internal_vars)

expected = """
{
Expand Down Expand Up @@ -931,6 +934,28 @@ def test_check_complete_track_params_does_not_fail_with_no_track_params(self):
complete_track_params.sorted_track_defined_params
)

def test_unused_user_defined_track_params(self):
track_params = {
"number_of_repliacs": 1, # deliberate typo
"enable_source": True, # unknown parameter
"number_of_shards": 5
}

complete_track_params = loader.CompleteTrackParams(user_specified_track_params=track_params)
complete_track_params.populate_track_defined_params(list_of_track_params=[
"bulk_indexing_clients",
"bulk_indexing_iterations",
"bulk_size",
"cluster_health",
"number_of_replicas",
"number_of_shards"]
)

self.assertEqual(
["enable_source", "number_of_repliacs"],
sorted(complete_track_params.unused_user_defined_track_params())
)


class TrackPostProcessingTests(TestCase):
track_with_params_as_string = textwrap.dedent('''{
Expand Down Expand Up @@ -1319,40 +1344,6 @@ def test_filters_tasks(self):
self.assertEqual("cluster-stats", schedule[2].name)


class TrackFileReaderTests(TestCase):
@mock.patch("esrally.track.loader.TrackSpecificationReader")
def test_unused_user_defined_track_params(self, mocked_track_specification_reader):
track_params = {
"number_of_repliacs": 1, # deliberate typo
"enable_source": True, # unknown parameter
"number_of_shards": 5
}

class MockedCompletedTrackParams:
def __init__(self):
self.track_defined_params = {
"bulk_indexing_clients",
"bulk_indexing_iterations",
"bulk_size",
"cluster_health",
"number_of_replicas",
"number_of_shards"
}

cfg = config.Config()
cfg.add(config.Scope.application, "node", "rally.root", "/unittest_patch")
cfg.add(config.Scope.application, "track", "params", track_params)
with mock.patch("builtins.open", new_callable=mock.mock_open()) as _:
with mock.patch("json.loads") as _:
track_file_reader = loader.TrackFileReader(cfg)
track_file_reader.complete_track_params = MockedCompletedTrackParams()

self.assertEqual(
["enable_source", "number_of_repliacs"],
sorted(track_file_reader.unused_user_defined_track_params())
)


class TrackSpecificationReaderTests(TestCase):
def test_description_is_optional(self):
track_specification = {
Expand Down
Loading