Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail Rally early if there are unused variables in track-params #688

Merged
merged 18 commits into from
May 23, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions esrally/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.


class RallyError(Exception):
"""
Base class for all Rally exceptions
Expand Down Expand Up @@ -67,3 +68,11 @@ class InvalidSyntax(RallyError):

class InvalidName(RallyError):
pass


class TrackConfigError(RallyError):
"""
Thrown when something is wrong with the track config e.g. user supplied a track-param
that can't be set
"""
pass
2 changes: 2 additions & 0 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ def with_actor_system(runnable, cfg):
if not already_running:
shutdown_complete = False
times_interrupted = 0
# give some time for any outstanding messages to be delivered to the actor system
time.sleep(3)
while not shutdown_complete and times_interrupted < 2:
try:
logger.info("Attempting to shutdown internal actor system.")
Expand Down
201 changes: 180 additions & 21 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import json
import logging
import os
import re
import glob
import urllib.error
import tempfile
Expand All @@ -26,9 +27,11 @@
import jinja2.exceptions
import jsonschema
import tabulate

from esrally import exceptions, time, PROGRAM_NAME
from esrally.track import params, track
from esrally.utils import io, convert, net, console, modules, repo
from esrally.utils import io, convert, net, console, modules, opts, repo
from jinja2 import meta


class TrackSyntaxError(exceptions.InvalidSyntax):
Expand Down Expand Up @@ -107,6 +110,9 @@ def load_track(cfg):
logging.getLogger(__name__).exception("Cannot load track [%s]", track_name)
raise exceptions.SystemSetupError("Cannot load track %s. List the available tracks with %s list tracks." %
(track_name, PROGRAM_NAME))
except BaseException:
logging.getLogger(__name__).exception("Cannot load track [%s]", track_name)
raise


def load_track_plugins(cfg, register_runner, register_scheduler):
Expand Down Expand Up @@ -468,7 +474,78 @@ def prepare_bundled_document_set(self, document_set, data_root):
return False


def render_template(loader, template_name, template_vars=None, glob_helper=lambda f: [], clock=time.Clock):
class TemplateSource:
"""
Prepares the fully assembled track file from file or string.
Doesn't render using jinja2, but embeds track fragments referenced with
rally.collect(parts=...
"""

collect_parts_re = re.compile(r'''{{\ +?rally\.collect\(parts="(.+?(?="))"\)\ +?}}''')

def __init__(self, base_path, template_file_name, source=io.FileSource, fileglobber=glob.glob):
self.base_path = base_path
self.template_file_name = template_file_name
self.source = source
self.fileglobber = fileglobber
self.assembled_source = None
self.logger = logging.getLogger(__name__)

def load_template_from_file(self):
loader = jinja2.FileSystemLoader(self.base_path)
try:
base_track = loader.get_source(jinja2.Environment(), self.template_file_name)
except jinja2.TemplateNotFound:
self.logger.exception("Could not load track from [%s].", self.template_file_name)
raise TrackSyntaxError("Could not load track from '{}'".format(self.template_file_name))
self.assembled_source = self.replace_includes(self.base_path, base_track[0])

def load_template_from_string(self, template_source):
self.assembled_source = self.replace_includes(self.base_path, template_source)

def replace_includes(self, base_path, track_fragment):
match = TemplateSource.collect_parts_re.findall(track_fragment)
if match:
# Construct replacement dict for matched captures
repl = {}
for glob_pattern in match:
full_glob_path = os.path.join(base_path, glob_pattern)
sub_source = self.read_glob_files(full_glob_path)
repl[glob_pattern] = self.replace_includes(base_path=io.dirname(full_glob_path), track_fragment=sub_source)

def replstring(matchobj):
# matchobj.groups() is a tuple and first element contains the matched group id
return repl[matchobj.groups()[0]]

return TemplateSource.collect_parts_re.sub(replstring, track_fragment)
return track_fragment

def read_glob_files(self, pattern):
source = []
files = self.fileglobber(pattern)
for fname in files:
with self.source(fname, mode="rt", encoding="utf-8") as fp:
source.append(fp.read())
return ",\n".join(source)


def default_internal_template_vars(glob_helper=lambda f: [], clock=time.Clock):
"""
Dict of internal global variables used by our jinja2 renderers
"""

return {
"globals": {
"now": clock.now(),
"glob": glob_helper
},
"filters": {
"days_ago": time.days_ago
}
}


def render_template(template_source, template_vars=None, template_internal_vars=None, loader=None):
macros = """
{% macro collect(parts) -%}
{% set comma = joiner() %}
Expand All @@ -481,21 +558,42 @@ def render_template(loader, template_name, template_vars=None, glob_helper=lambd

# place helpers dict loader first to prevent users from overriding our macros.
env = jinja2.Environment(
loader=jinja2.ChoiceLoader([jinja2.DictLoader({"rally.helpers": macros}), loader])
loader=jinja2.ChoiceLoader([
jinja2.DictLoader({"rally.helpers": macros}),
jinja2.BaseLoader(),
loader
])
)

if template_vars:
for k, v in template_vars.items():
env.globals[k] = v
# ensure that user variables never override our internal variables
env.globals["now"] = clock.now()
env.globals["glob"] = glob_helper
env.filters["days_ago"] = time.days_ago
template = env.get_template(template_name)
if template_internal_vars:
for macro_type in template_internal_vars:
for env_global_key, env_global_value in template_internal_vars[macro_type].items():
getattr(env, macro_type)[env_global_key] = env_global_value

template = env.from_string(template_source)
return template.render()


def render_template_from_file(template_file_name, template_vars):
def register_all_params_in_track(assembled_source, complete_track_params=None):
j2env = jinja2.Environment()

# we don't need the following j2 filters/macros but we define them anyway to prevent parsing failures
internal_template_vars = default_internal_template_vars()
for macro_type in internal_template_vars:
for env_global_key, env_global_value in internal_template_vars[macro_type].items():
getattr(j2env, macro_type)[env_global_key] = env_global_value

ast = j2env.parse(assembled_source)
j2_variables = meta.find_undeclared_variables(ast)
if complete_track_params:
complete_track_params.populate_track_defined_params(j2_variables)


def render_template_from_file(template_file_name, template_vars, complete_track_params=None):
def relative_glob(start, f):
result = glob.glob(os.path.join(start, f))
if result:
Expand All @@ -504,10 +602,14 @@ def relative_glob(start, f):
return []

base_path = io.dirname(template_file_name)
template_source = TemplateSource(base_path, io.basename(template_file_name))
template_source.load_template_from_file()
register_all_params_in_track(template_source.assembled_source, complete_track_params)

return render_template(loader=jinja2.FileSystemLoader(base_path),
template_name=io.basename(template_file_name),
template_source=template_source.assembled_source,
template_vars=template_vars,
glob_helper=lambda f: relative_glob(base_path, f))
template_internal_vars=default_internal_template_vars(glob_helper=lambda f: relative_glob(base_path, f)))


def filter_included_tasks(t, filters):
Expand Down Expand Up @@ -622,6 +724,25 @@ def post_process_for_test_mode(t):
return t


class CompleteTrackParams:
def __init__(self, user_specified_track_params=None):
self.track_defined_params = set()
self.user_specified_track_params = user_specified_track_params

def populate_track_defined_params(self, list_of_track_params=None):
self.track_defined_params.update(set(list_of_track_params))

@property
def sorted_track_defined_params(self):
return sorted(self.track_defined_params)

def unused_user_defined_track_params(self):
set_user_params = set(list(self.user_specified_track_params.keys()))
set_user_params.difference_update(self.track_defined_params)

return list(set_user_params)


class TrackFileReader:
MINIMUM_SUPPORTED_TRACK_VERSION = 2
MAXIMUM_SUPPORTED_TRACK_VERSION = 2
Expand All @@ -634,7 +755,10 @@ def __init__(self, cfg):
with open(track_schema_file, mode="rt", encoding="utf-8") as f:
self.track_schema = json.loads(f.read())
self.track_params = cfg.opts("track", "params")
self.read_track = TrackSpecificationReader(self.track_params)
self.complete_track_params = CompleteTrackParams(user_specified_track_params=self.track_params)
self.read_track = TrackSpecificationReader(
track_params=self.track_params,
complete_track_params=self.complete_track_params)
self.logger = logging.getLogger(__name__)

def read(self, track_name, track_spec_file, mapping_dir):
Expand All @@ -649,7 +773,7 @@ def read(self, track_name, track_spec_file, mapping_dir):

self.logger.info("Reading track specification file [%s].", track_spec_file)
try:
rendered = render_template_from_file(track_spec_file, self.track_params)
rendered = render_template_from_file(track_spec_file, self.track_params, complete_track_params=self.complete_track_params)
# render the track to a temporary file instead of dumping it into the logs. It is easier to check for error messages
# involving lines numbers and it also does not bloat Rally's log file so much.
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".json")
Expand Down Expand Up @@ -677,14 +801,39 @@ def read(self, track_name, track_spec_file, mapping_dir):
if TrackFileReader.MAXIMUM_SUPPORTED_TRACK_VERSION < track_version:
raise exceptions.RallyError("Track {} requires a newer version of Rally. Please upgrade Rally (supported track version: {}, "
"required track version: {}).".format(track_name, TrackFileReader.MAXIMUM_SUPPORTED_TRACK_VERSION,
track_version))
track_version))
try:
jsonschema.validate(track_spec, self.track_schema)
except jsonschema.exceptions.ValidationError as ve:
raise TrackSyntaxError(
"Track '{}' is invalid.\n\nError details: {}\nInstance: {}\nPath: {}\nSchema path: {}".format(
track_name, ve.message, json.dumps(ve.instance, indent=4, sort_keys=True), ve.absolute_path, ve.absolute_schema_path))
return self.read_track(track_name, track_spec, mapping_dir)

current_track = self.read_track(track_name, track_spec, mapping_dir)

unused_user_defined_track_params = self.complete_track_params.unused_user_defined_track_params()
if len(unused_user_defined_track_params) > 0:
err_msg = (
"Some of your track parameter(s) {} are not used by this track; perhaps you intend to use {} instead.\n\n"
"All track parameters you provided are:\n"
"{}\n\n"
"All parameters exposed by this track:\n"
"{}".format(
",".join(opts.double_quoted_list_of(sorted(unused_user_defined_track_params))),
",".join(opts.double_quoted_list_of(sorted(opts.make_list_of_close_matches(
unused_user_defined_track_params,
self.complete_track_params.track_defined_params
)))),
"\n".join(opts.bulleted_list_of(sorted(list(self.track_params.keys())))),
"\n".join(opts.bulleted_list_of(self.complete_track_params.sorted_track_defined_params))))

self.logger.critical(err_msg)
# also dump the message on the console
console.println(err_msg)
raise exceptions.TrackConfigError(
"Unused track parameters {}.".format(sorted(unused_user_defined_track_params))
)
return current_track


class TrackPluginReader:
Expand Down Expand Up @@ -733,9 +882,10 @@ class TrackSpecificationReader:
Creates a track instances based on its parsed JSON description.
"""

def __init__(self, track_params=None, source=io.FileSource):
def __init__(self, track_params=None, complete_track_params=None, source=io.FileSource):
self.name = None
self.track_params = track_params if track_params else {}
self.complete_track_params = complete_track_params
self.source = source
self.logger = logging.getLogger(__name__)

Expand All @@ -750,7 +900,7 @@ def __call__(self, track_name, track_specification, mapping_dir):
for tpl in self._r(track_specification, "templates", mandatory=False, default_value=[])]
corpora = self._create_corpora(self._r(track_specification, "corpora", mandatory=False, default_value=[]), indices)
challenges = self._create_challenges(track_specification)

# at this point, *all* track params must have been referenced in the templates
return track.Track(name=self.name, meta_data=meta_data, description=description, challenges=challenges, indices=indices,
templates=templates, corpora=corpora)

Expand Down Expand Up @@ -779,27 +929,36 @@ def _create_index(self, index_spec, mapping_dir):
index_name = self._r(index_spec, "name")
body_file = self._r(index_spec, "body", mandatory=False)
if body_file:
idx_body_tmpl_src = TemplateSource(mapping_dir, body_file, self.source)
with self.source(os.path.join(mapping_dir, body_file), "rt") as f:
body = self._load_template(f.read(), "definition for index {} in {}".format(index_name, body_file))
idx_body_tmpl_src.load_template_from_string(f.read())
body = self._load_template(
idx_body_tmpl_src.assembled_source,
"definition for index {} in {}".format(index_name, body_file))
else:
body = None

return track.Index(name=index_name, body=body, types=self._r(index_spec, "types", mandatory=False, default_value=[]))

def _create_index_template(self, tpl_spec, mapping_dir):
name = self._r(tpl_spec, "name")
template_file = self._r(tpl_spec, "template")
index_pattern = self._r(tpl_spec, "index-pattern")
delete_matching_indices = self._r(tpl_spec, "delete-matching-indices", mandatory=False, default_value=True)
template_file = os.path.join(mapping_dir, self._r(tpl_spec, "template"))
template_file = os.path.join(mapping_dir, template_file)
idx_tmpl_src = TemplateSource(mapping_dir, template_file, self.source)
with self.source(template_file, "rt") as f:
template_content = self._load_template(f.read(), "definition for index template {} in {}".format(name, template_file))
idx_tmpl_src.load_template_from_string(f.read())
template_content = self._load_template(
idx_tmpl_src.assembled_source,
"definition for index template {} in {}".format(name, template_file))
return track.IndexTemplate(name, index_pattern, template_content, delete_matching_indices)

def _load_template(self, contents, description):
self.logger.info("Loading template [%s].", description)
register_all_params_in_track(contents, self.complete_track_params)
try:
rendered = render_template(loader=jinja2.DictLoader({"default": contents}),
template_name="default",
rendered = render_template(template_source=contents,
template_vars=self.track_params)
return json.loads(rendered)
except Exception as e:
Expand Down
7 changes: 4 additions & 3 deletions esrally/utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,20 @@ class DictStringFileSourceFactory:
def __init__(self, name_to_contents):
self.name_to_contents = name_to_contents

def __call__(self, name, mode):
return StringAsFileSource(self.name_to_contents[name], mode)
def __call__(self, name, mode, encoding="utf-8"):
return StringAsFileSource(self.name_to_contents[name], mode, encoding)


class StringAsFileSource:
"""
Implementation of ``FileSource`` intended for tests. It's kept close to ``FileSource`` to simplify maintenance but it is not meant to
be used in production code.
"""
def __init__(self, contents, mode):
def __init__(self, contents, mode, encoding="utf-8"):
"""
:param contents: The file contents as an array of strings. Each item in the array should correspond to one line.
:param mode: The file mode. It is ignored in this implementation but kept to implement the same interface as ``FileSource``.
:param encoding: The file encoding. It is ignored in this implementation but kept to implement the same interface as ``FileSource``.
"""
self.contents = contents
self.current_index = 0
Expand Down
Loading