diff --git a/esrally/exceptions.py b/esrally/exceptions.py index b8d2ffc1e..87a40c897 100644 --- a/esrally/exceptions.py +++ b/esrally/exceptions.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. + class RallyError(Exception): """ Base class for all Rally exceptions @@ -67,3 +68,11 @@ class InvalidSyntax(RallyError): class InvalidName(RallyError): pass + + +class TrackConfigError(RallyError): + """ + Thrown when something is wrong with the track config e.g. user supplied a track-param + that can't be set + """ + pass diff --git a/esrally/rally.py b/esrally/rally.py index 03bcd3dda..3c1c1e4cd 100644 --- a/esrally/rally.py +++ b/esrally/rally.py @@ -432,6 +432,8 @@ def with_actor_system(runnable, cfg): if not already_running: shutdown_complete = False times_interrupted = 0 + # give some time for any outstanding messages to be delivered to the actor system + time.sleep(3) while not shutdown_complete and times_interrupted < 2: try: logger.info("Attempting to shutdown internal actor system.") diff --git a/esrally/track/loader.py b/esrally/track/loader.py index ebccef344..b5129217c 100644 --- a/esrally/track/loader.py +++ b/esrally/track/loader.py @@ -18,6 +18,7 @@ import json import logging import os +import re import glob import urllib.error import tempfile @@ -26,9 +27,11 @@ import jinja2.exceptions import jsonschema import tabulate + from esrally import exceptions, time, PROGRAM_NAME from esrally.track import params, track -from esrally.utils import io, convert, net, console, modules, repo +from esrally.utils import io, convert, net, console, modules, opts, repo +from jinja2 import meta class TrackSyntaxError(exceptions.InvalidSyntax): @@ -107,6 +110,9 @@ def load_track(cfg): logging.getLogger(__name__).exception("Cannot load track [%s]", track_name) raise exceptions.SystemSetupError("Cannot load track %s. List the available tracks with %s list tracks." % (track_name, PROGRAM_NAME)) + except BaseException: + logging.getLogger(__name__).exception("Cannot load track [%s]", track_name) + raise def load_track_plugins(cfg, register_runner, register_scheduler): @@ -468,7 +474,78 @@ def prepare_bundled_document_set(self, document_set, data_root): return False -def render_template(loader, template_name, template_vars=None, glob_helper=lambda f: [], clock=time.Clock): +class TemplateSource: + """ + Prepares the fully assembled track file from file or string. + Doesn't render using jinja2, but embeds track fragments referenced with + rally.collect(parts=... + """ + + collect_parts_re = re.compile(r'''{{\ +?rally\.collect\(parts="(.+?(?="))"\)\ +?}}''') + + def __init__(self, base_path, template_file_name, source=io.FileSource, fileglobber=glob.glob): + self.base_path = base_path + self.template_file_name = template_file_name + self.source = source + self.fileglobber = fileglobber + self.assembled_source = None + self.logger = logging.getLogger(__name__) + + def load_template_from_file(self): + loader = jinja2.FileSystemLoader(self.base_path) + try: + base_track = loader.get_source(jinja2.Environment(), self.template_file_name) + except jinja2.TemplateNotFound: + self.logger.exception("Could not load track from [%s].", self.template_file_name) + raise TrackSyntaxError("Could not load track from '{}'".format(self.template_file_name)) + self.assembled_source = self.replace_includes(self.base_path, base_track[0]) + + def load_template_from_string(self, template_source): + self.assembled_source = self.replace_includes(self.base_path, template_source) + + def replace_includes(self, base_path, track_fragment): + match = TemplateSource.collect_parts_re.findall(track_fragment) + if match: + # Construct replacement dict for matched captures + repl = {} + for glob_pattern in match: + full_glob_path = os.path.join(base_path, glob_pattern) + sub_source = self.read_glob_files(full_glob_path) + repl[glob_pattern] = self.replace_includes(base_path=io.dirname(full_glob_path), track_fragment=sub_source) + + def replstring(matchobj): + # matchobj.groups() is a tuple and first element contains the matched group id + return repl[matchobj.groups()[0]] + + return TemplateSource.collect_parts_re.sub(replstring, track_fragment) + return track_fragment + + def read_glob_files(self, pattern): + source = [] + files = self.fileglobber(pattern) + for fname in files: + with self.source(fname, mode="rt", encoding="utf-8") as fp: + source.append(fp.read()) + return ",\n".join(source) + + +def default_internal_template_vars(glob_helper=lambda f: [], clock=time.Clock): + """ + Dict of internal global variables used by our jinja2 renderers + """ + + return { + "globals": { + "now": clock.now(), + "glob": glob_helper + }, + "filters": { + "days_ago": time.days_ago + } + } + + +def render_template(template_source, template_vars=None, template_internal_vars=None, loader=None): macros = """ {% macro collect(parts) -%} {% set comma = joiner() %} @@ -481,21 +558,42 @@ def render_template(loader, template_name, template_vars=None, glob_helper=lambd # place helpers dict loader first to prevent users from overriding our macros. env = jinja2.Environment( - loader=jinja2.ChoiceLoader([jinja2.DictLoader({"rally.helpers": macros}), loader]) + loader=jinja2.ChoiceLoader([ + jinja2.DictLoader({"rally.helpers": macros}), + jinja2.BaseLoader(), + loader + ]) ) + if template_vars: for k, v in template_vars.items(): env.globals[k] = v # ensure that user variables never override our internal variables - env.globals["now"] = clock.now() - env.globals["glob"] = glob_helper - env.filters["days_ago"] = time.days_ago - template = env.get_template(template_name) + if template_internal_vars: + for macro_type in template_internal_vars: + for env_global_key, env_global_value in template_internal_vars[macro_type].items(): + getattr(env, macro_type)[env_global_key] = env_global_value + template = env.from_string(template_source) return template.render() -def render_template_from_file(template_file_name, template_vars): +def register_all_params_in_track(assembled_source, complete_track_params=None): + j2env = jinja2.Environment() + + # we don't need the following j2 filters/macros but we define them anyway to prevent parsing failures + internal_template_vars = default_internal_template_vars() + for macro_type in internal_template_vars: + for env_global_key, env_global_value in internal_template_vars[macro_type].items(): + getattr(j2env, macro_type)[env_global_key] = env_global_value + + ast = j2env.parse(assembled_source) + j2_variables = meta.find_undeclared_variables(ast) + if complete_track_params: + complete_track_params.populate_track_defined_params(j2_variables) + + +def render_template_from_file(template_file_name, template_vars, complete_track_params=None): def relative_glob(start, f): result = glob.glob(os.path.join(start, f)) if result: @@ -504,10 +602,14 @@ def relative_glob(start, f): return [] base_path = io.dirname(template_file_name) + template_source = TemplateSource(base_path, io.basename(template_file_name)) + template_source.load_template_from_file() + register_all_params_in_track(template_source.assembled_source, complete_track_params) + return render_template(loader=jinja2.FileSystemLoader(base_path), - template_name=io.basename(template_file_name), + template_source=template_source.assembled_source, template_vars=template_vars, - glob_helper=lambda f: relative_glob(base_path, f)) + template_internal_vars=default_internal_template_vars(glob_helper=lambda f: relative_glob(base_path, f))) def filter_included_tasks(t, filters): @@ -622,6 +724,25 @@ def post_process_for_test_mode(t): return t +class CompleteTrackParams: + def __init__(self, user_specified_track_params=None): + self.track_defined_params = set() + self.user_specified_track_params = user_specified_track_params if user_specified_track_params else {} + + def populate_track_defined_params(self, list_of_track_params=None): + self.track_defined_params.update(set(list_of_track_params)) + + @property + def sorted_track_defined_params(self): + return sorted(self.track_defined_params) + + def unused_user_defined_track_params(self): + set_user_params = set(list(self.user_specified_track_params.keys())) + set_user_params.difference_update(self.track_defined_params) + + return list(set_user_params) + + class TrackFileReader: MINIMUM_SUPPORTED_TRACK_VERSION = 2 MAXIMUM_SUPPORTED_TRACK_VERSION = 2 @@ -634,7 +755,10 @@ def __init__(self, cfg): with open(track_schema_file, mode="rt", encoding="utf-8") as f: self.track_schema = json.loads(f.read()) self.track_params = cfg.opts("track", "params") - self.read_track = TrackSpecificationReader(self.track_params) + self.complete_track_params = CompleteTrackParams(user_specified_track_params=self.track_params) + self.read_track = TrackSpecificationReader( + track_params=self.track_params, + complete_track_params=self.complete_track_params) self.logger = logging.getLogger(__name__) def read(self, track_name, track_spec_file, mapping_dir): @@ -649,7 +773,7 @@ def read(self, track_name, track_spec_file, mapping_dir): self.logger.info("Reading track specification file [%s].", track_spec_file) try: - rendered = render_template_from_file(track_spec_file, self.track_params) + rendered = render_template_from_file(track_spec_file, self.track_params, complete_track_params=self.complete_track_params) # render the track to a temporary file instead of dumping it into the logs. It is easier to check for error messages # involving lines numbers and it also does not bloat Rally's log file so much. tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".json") @@ -677,14 +801,39 @@ def read(self, track_name, track_spec_file, mapping_dir): if TrackFileReader.MAXIMUM_SUPPORTED_TRACK_VERSION < track_version: raise exceptions.RallyError("Track {} requires a newer version of Rally. Please upgrade Rally (supported track version: {}, " "required track version: {}).".format(track_name, TrackFileReader.MAXIMUM_SUPPORTED_TRACK_VERSION, - track_version)) + track_version)) try: jsonschema.validate(track_spec, self.track_schema) except jsonschema.exceptions.ValidationError as ve: raise TrackSyntaxError( "Track '{}' is invalid.\n\nError details: {}\nInstance: {}\nPath: {}\nSchema path: {}".format( track_name, ve.message, json.dumps(ve.instance, indent=4, sort_keys=True), ve.absolute_path, ve.absolute_schema_path)) - return self.read_track(track_name, track_spec, mapping_dir) + + current_track = self.read_track(track_name, track_spec, mapping_dir) + + unused_user_defined_track_params = self.complete_track_params.unused_user_defined_track_params() + if len(unused_user_defined_track_params) > 0: + err_msg = ( + "Some of your track parameter(s) {} are not used by this track; perhaps you intend to use {} instead.\n\n" + "All track parameters you provided are:\n" + "{}\n\n" + "All parameters exposed by this track:\n" + "{}".format( + ",".join(opts.double_quoted_list_of(sorted(unused_user_defined_track_params))), + ",".join(opts.double_quoted_list_of(sorted(opts.make_list_of_close_matches( + unused_user_defined_track_params, + self.complete_track_params.track_defined_params + )))), + "\n".join(opts.bulleted_list_of(sorted(list(self.track_params.keys())))), + "\n".join(opts.bulleted_list_of(self.complete_track_params.sorted_track_defined_params)))) + + self.logger.critical(err_msg) + # also dump the message on the console + console.println(err_msg) + raise exceptions.TrackConfigError( + "Unused track parameters {}.".format(sorted(unused_user_defined_track_params)) + ) + return current_track class TrackPluginReader: @@ -733,9 +882,10 @@ class TrackSpecificationReader: Creates a track instances based on its parsed JSON description. """ - def __init__(self, track_params=None, source=io.FileSource): + def __init__(self, track_params=None, complete_track_params=None, source=io.FileSource): self.name = None self.track_params = track_params if track_params else {} + self.complete_track_params = complete_track_params self.source = source self.logger = logging.getLogger(__name__) @@ -750,7 +900,7 @@ def __call__(self, track_name, track_specification, mapping_dir): for tpl in self._r(track_specification, "templates", mandatory=False, default_value=[])] corpora = self._create_corpora(self._r(track_specification, "corpora", mandatory=False, default_value=[]), indices) challenges = self._create_challenges(track_specification) - + # at this point, *all* track params must have been referenced in the templates return track.Track(name=self.name, meta_data=meta_data, description=description, challenges=challenges, indices=indices, templates=templates, corpora=corpora) @@ -779,8 +929,12 @@ def _create_index(self, index_spec, mapping_dir): index_name = self._r(index_spec, "name") body_file = self._r(index_spec, "body", mandatory=False) if body_file: + idx_body_tmpl_src = TemplateSource(mapping_dir, body_file, self.source) with self.source(os.path.join(mapping_dir, body_file), "rt") as f: - body = self._load_template(f.read(), "definition for index {} in {}".format(index_name, body_file)) + idx_body_tmpl_src.load_template_from_string(f.read()) + body = self._load_template( + idx_body_tmpl_src.assembled_source, + "definition for index {} in {}".format(index_name, body_file)) else: body = None @@ -788,18 +942,23 @@ def _create_index(self, index_spec, mapping_dir): def _create_index_template(self, tpl_spec, mapping_dir): name = self._r(tpl_spec, "name") + template_file = self._r(tpl_spec, "template") index_pattern = self._r(tpl_spec, "index-pattern") delete_matching_indices = self._r(tpl_spec, "delete-matching-indices", mandatory=False, default_value=True) - template_file = os.path.join(mapping_dir, self._r(tpl_spec, "template")) + template_file = os.path.join(mapping_dir, template_file) + idx_tmpl_src = TemplateSource(mapping_dir, template_file, self.source) with self.source(template_file, "rt") as f: - template_content = self._load_template(f.read(), "definition for index template {} in {}".format(name, template_file)) + idx_tmpl_src.load_template_from_string(f.read()) + template_content = self._load_template( + idx_tmpl_src.assembled_source, + "definition for index template {} in {}".format(name, template_file)) return track.IndexTemplate(name, index_pattern, template_content, delete_matching_indices) def _load_template(self, contents, description): self.logger.info("Loading template [%s].", description) + register_all_params_in_track(contents, self.complete_track_params) try: - rendered = render_template(loader=jinja2.DictLoader({"default": contents}), - template_name="default", + rendered = render_template(template_source=contents, template_vars=self.track_params) return json.loads(rendered) except Exception as e: diff --git a/esrally/utils/io.py b/esrally/utils/io.py index 6b81d394c..733f7f38f 100644 --- a/esrally/utils/io.py +++ b/esrally/utils/io.py @@ -76,8 +76,8 @@ class DictStringFileSourceFactory: def __init__(self, name_to_contents): self.name_to_contents = name_to_contents - def __call__(self, name, mode): - return StringAsFileSource(self.name_to_contents[name], mode) + def __call__(self, name, mode, encoding="utf-8"): + return StringAsFileSource(self.name_to_contents[name], mode, encoding) class StringAsFileSource: @@ -85,10 +85,11 @@ class StringAsFileSource: Implementation of ``FileSource`` intended for tests. It's kept close to ``FileSource`` to simplify maintenance but it is not meant to be used in production code. """ - def __init__(self, contents, mode): + def __init__(self, contents, mode, encoding="utf-8"): """ :param contents: The file contents as an array of strings. Each item in the array should correspond to one line. :param mode: The file mode. It is ignored in this implementation but kept to implement the same interface as ``FileSource``. + :param encoding: The file encoding. It is ignored in this implementation but kept to implement the same interface as ``FileSource``. """ self.contents = contents self.current_index = 0 diff --git a/esrally/utils/opts.py b/esrally/utils/opts.py index a44940ab7..7c284b23d 100644 --- a/esrally/utils/opts.py +++ b/esrally/utils/opts.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import difflib import json from esrally.utils import io @@ -78,6 +79,32 @@ def to_dict(arg, default_parser=kv_to_map): return default_parser(csv_to_list(arg)) +def bulleted_list_of(src_list): + return ["- {}".format(param) for param in src_list] + + +def double_quoted_list_of(src_list): + return ["\"{}\"".format(param) for param in src_list] + + +def make_list_of_close_matches(word_list, all_possibilities): + """ + Returns list of closest matches for `word_list` from `all_possibilities`. + e.g. [num_of-shards] will return [num_of_shards] when all_possibilities=["num_of_shards", "num_of_replicas"] + + :param word_list: A list of strings that we want to find closest matches for. + :param all_possibilities: List of strings that the algorithm will calculate the closest match from. + :return: + """ + close_matches = [] + for param in word_list: + matched_word = difflib.get_close_matches(param, all_possibilities, n=1) + if matched_word: + close_matches.append(matched_word[0]) + + return close_matches + + class ConnectOptions: """ Base Class to help either parsing --target-hosts or --client-options diff --git a/integration-test.sh b/integration-test.sh index f0601296a..60f6a801b 100755 --- a/integration-test.sh +++ b/integration-test.sh @@ -31,6 +31,8 @@ readonly ES_METRICS_STORE_TRANSPORT_PORT="63200" readonly ES_ARTIFACT_PATH="elasticsearch-${ES_METRICS_STORE_VERSION}" readonly ES_ARTIFACT="${ES_ARTIFACT_PATH}.tar.gz" readonly MIN_CURL_VERSION=(7 12 3) +readonly RALLY_LOG="${HOME}/.rally/logs/rally.log" +readonly RALLY_LOG_BACKUP="${HOME}/.rally/logs/rally.log.it.bak" ES_PID=-1 PROXY_CONTAINER_ID=-1 @@ -67,6 +69,18 @@ function error { log "ERROR" "${1}" } +function backup_rally_log { + set +e + mv -f ${RALLY_LOG} "${RALLY_LOG_BACKUP}" + set -e +} + +function restore_rally_log { + set +e + mv -f ${RALLY_LOG_BACKUP} "${RALLY_LOG}" + set -e +} + function kill_rally_processes { # kill all lingering Rally instances that might still be hanging set +e @@ -138,9 +152,14 @@ function set_up_metrics_store { popd > /dev/null } +function docker_is_running { + docker ps > /dev/null + return $? +} + function set_up_proxy_server { # we want to see output to stderr for diagnosing problems - if docker ps > /dev/null; then + if docker_is_running; then info "Docker is available. Proxy-related tests will be run" local config_dir="$PWD/.rally_it/proxy_tmp" mkdir -p ${config_dir} @@ -183,6 +202,16 @@ function random_configuration { eval "$1='${CONFIGURATIONS[$((RANDOM%num_configs))]}'" } +function random_track { + local num_tracks=${#TRACKS[*]} + eval "$1='${TRACKS[$((RANDOM%num_tracks))]}'" +} + +function random_distribution { + local num_distributions=${#DISTRIBUTIONS[*]} + eval "$1='${DISTRIBUTIONS[$((RANDOM%num_distributions))]}'" +} + function test_configure { info "test configure()" # just run to test the configuration procedure, don't use this configuration in other tests. @@ -233,6 +262,51 @@ function test_distributions { done } +function test_distribution_fails_with_wrong_track_params { + local cfg + local distribution + # TODO check if randomization of track is possible + local track="geonames" # fixed value for now, as the available track params vary between tracks + local track_params + local defined_track_params + local undefined_track_params + + random_configuration cfg + random_distribution dist + + undefined_track_params="number_of-replicas:0" # - simulates a typo + + if [[ ${track} == "geonames" ]]; then + defined_track_params="conflict_probability:45," + fi + + local track_params="${defined_track_params}${undefined_track_params}" + readonly err_msg="Rally didn't fail trying to use the undefined track-param ${undefined_track_params}. Check ${RALLY_LOG}." + + info "test distribution [--configuration-name=${cfg}], [--distribution-version=${dist}], [--track=${track}], [--track-params=${track_params}], [--car=4gheap]" + kill_rally_processes + + backup_rally_log + set +e + esrally --configuration-name="${cfg}" --on-error=abort --distribution-version="${dist}" --track="${track}" --track-params="${track_params}" --test-mode --car=4gheap + ret_code=$? + set -e + + # we expect Rally to fail, with full details in its log file + if [[ ${ret_code} -eq 0 ]]; then + error "Rally didn't fail trying to use the undefined track-param ${undefined_track_params}. Check ${RALLY_LOG}." + error ${err_msg} + exit ${ret_code} + elif docker_is_running && [[ ${ret_code} -ne 0 ]]; then + # need to use grep -P which is unavailable with macOS grep + if ! docker run --rm -v ${RALLY_LOG}:/rally.log:ro ubuntu:xenial grep -Pzoq '.*CRITICAL Some of your track parameter\(s\) "number_of-replicas" are not used by this track; perhaps you intend to use "number_of_replicas" instead\.\n\nAll track parameters you provided are:\n- conflict_probability\n- number_of-replicas\n\nAll parameters exposed by this track:\n- bulk_indexing_clients\n- bulk_size\n- cluster_health\n- conflict_probability\n- index_settings\n- ingest_percentage\n- number_of_replicas\n- number_of_shards\n- on_conflict\n- recency\n- source_enabled\n*' /rally.log; then + error ${err_msg} + exit ${ret_code} + fi + fi + restore_rally_log +} + function test_benchmark_only { # we just use our metrics cluster for these benchmarks. It's not ideal but simpler. local cfg @@ -252,27 +326,23 @@ function test_benchmark_only { } function test_proxy_connection { - readonly rally_log="${HOME}/.rally/logs/rally.log" - readonly rally_log_backup="${HOME}/.rally/logs/rally.log.it.bak" local cfg random_configuration cfg # isolate invocations so we see only the log output from the current invocation - set +e - mv -f ${rally_log} "${rally_log_backup}" - set -e + backup_rally_log set +e esrally list tracks --configuration-name="${cfg}" unset http_proxy set -e - if grep -F -q "Connecting directly to the Internet" "$rally_log"; then + if grep -F -q "Connecting directly to the Internet" "$RALLY_LOG"; then info "Successfully checked that direct internet connection is used." - rm -f ${rally_log} + rm -f ${RALLY_LOG} else - error "Could not find indication that direct internet connection is used. Check ${rally_log}." + error "Could not find indication that direct internet connection is used. Check ${RALLY_LOG}." exit 1 fi @@ -283,18 +353,18 @@ function test_proxy_connection { esrally list tracks --configuration-name="${cfg}" unset http_proxy set -e - if grep -F -q "Connecting via proxy URL [http://127.0.0.1:3128] to the Internet" "$rally_log"; then + if grep -F -q "Connecting via proxy URL [http://127.0.0.1:3128] to the Internet" "$RALLY_LOG"; then info "Successfully checked that proxy is used." else - error "Could not find indication that proxy access is used. Check ${rally_log}." + error "Could not find indication that proxy access is used. Check ${RALLY_LOG}." exit 1 fi - if grep -F -q "No Internet connection detected" "$rally_log"; then + if grep -F -q "No Internet connection detected" "$RALLY_LOG"; then info "Successfully checked that unauthenticated proxy access is prevented." - rm -f ${rally_log} + rm -f ${RALLY_LOG} else - error "Could not find indication that unauthenticated proxy access is prevented. Check ${rally_log}." + error "Could not find indication that unauthenticated proxy access is prevented. Check ${RALLY_LOG}." exit 1 fi @@ -307,25 +377,22 @@ function test_proxy_connection { unset http_proxy set -e - if grep -F -q "Connecting via proxy URL [http://testuser:testuser@127.0.0.1:3128] to the Internet" "$rally_log"; then + if grep -F -q "Connecting via proxy URL [http://testuser:testuser@127.0.0.1:3128] to the Internet" "$RALLY_LOG"; then info "Successfully checked that proxy is used." else - error "Could not find indication that proxy access is used. Check ${rally_log}." + error "Could not find indication that proxy access is used. Check ${RALLY_LOG}." exit 1 fi - if grep -F -q "Detected a working Internet connection" "$rally_log"; then + if grep -F -q "Detected a working Internet connection" "$RALLY_LOG"; then info "Successfully checked that authenticated proxy access is allowed." - rm -f ${rally_log} + rm -f ${RALLY_LOG} else - error "Could not find indication that authenticated proxy access is allowed. Check ${rally_log}." + error "Could not find indication that authenticated proxy access is allowed. Check ${RALLY_LOG}." exit 1 fi # restore original file (but only on success so we keep the test's Rally log file for inspection on errors). - set +e - mv -f ${rally_log_backup} "${rally_log}" - set -e - + restore_rally_log } function run_test { @@ -337,6 +404,8 @@ function run_test { test_configure echo "**************************************** TESTING RALLY LIST COMMANDS *******************************************" test_list + echo "**************************************** TESTING RALLY FAILS WITH UNUSED TRACK-PARAMS **************************" + test_distribution_fails_with_wrong_track_params echo "**************************************** TESTING RALLY WITH ES FROM SOURCES ************************************" test_sources echo "**************************************** TESTING RALLY WITH ES DISTRIBUTIONS ***********************************" diff --git a/tests/track/loader_test.py b/tests/track/loader_test.py index 726be5783..ea2bbdd15 100644 --- a/tests/track/loader_test.py +++ b/tests/track/loader_test.py @@ -15,11 +15,13 @@ # specific language governing permissions and limitations # under the License. +import json +import os import re +import textwrap import unittest.mock as mock -from unittest import TestCase -import jinja2 +from unittest import TestCase from esrally import exceptions, config from esrally.utils import io @@ -685,7 +687,111 @@ def test_prepare_bundled_document_set_uncompressed_docs_wrong_size(self, is_file self.assertEqual(0, prepare_file_offset_table.call_count) +class TemplateSource(TestCase): + @mock.patch("esrally.utils.io.dirname") + @mock.patch.object(loader.TemplateSource, "read_glob_files") + def test_entrypoint_of_replace_includes(self, patched_read_glob, patched_dirname): + track = textwrap.dedent(''' + {% import "rally.helpers" as rally with context %} + { + "version": 2, + "description": "unittest track", + "data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames", + "indices": [ + { + "name": "geonames", + "body": "index.json" + } + ], + "corpora": [ + { + "name": "geonames", + "base-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames", + "documents": [ + { + "source-file": "documents-2.json.bz2", + "document-count": 11396505, + "compressed-bytes": 264698741, + "uncompressed-bytes": 3547614383 + } + ] + } + ], + "operations": [ + {{ rally.collect(parts="operations/*.json") }} + ], + "challenges": [ + {{ rally.collect(parts="challenges/*.json") }} + ] + } + ''') + + def dummy_read_glob(c): + return "{{\"replaced {}\": \"true\"}}".format(c) + + patched_read_glob.side_effect = dummy_read_glob + + base_path = "~/.rally/benchmarks/tracks/default/geonames" + template_file_name = "track.json" + tmpl_src = loader.TemplateSource(base_path, template_file_name) + expected_response = textwrap.dedent(''' + {% import "rally.helpers" as rally with context %} + { + "version": 2, + "description": "unittest track", + "data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames", + "indices": [ + { + "name": "geonames", + "body": "index.json" + } + ], + "corpora": [ + { + "name": "geonames", + "base-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames", + "documents": [ + { + "source-file": "documents-2.json.bz2", + "document-count": 11396505, + "compressed-bytes": 264698741, + "uncompressed-bytes": 3547614383 + } + ] + } + ], + "operations": [ + {"replaced ~/.rally/benchmarks/tracks/default/geonames/operations/*.json": "true"} + ], + "challenges": [ + {"replaced ~/.rally/benchmarks/tracks/default/geonames/challenges/*.json": "true"} + ] + } + ''') + + self.assertEqual( + expected_response, + tmpl_src.replace_includes(base_path, track) + ) + + def test_read_glob_files(self): + tmpl_obj = loader.TemplateSource( + base_path="/some/path/to/a/rally/track", + template_file_name="track.json", + fileglobber=lambda pat: [ + os.path.join(os.path.dirname(__file__), "resources", "track_fragment_1.json"), + os.path.join(os.path.dirname(__file__), "resources", "track_fragment_2.json") + ] + ) + response = tmpl_obj.read_glob_files("*track_fragment_*.json") + expected_response = '{\n "item1": "value1"\n}\n,\n{\n "item2": "value2"\n}\n' + + self.assertEqual(expected_response, response) + + class TemplateRenderTests(TestCase): + unittest_template_internal_vars = loader.default_internal_template_vars(clock=StaticClock) + def test_render_simple_template(self): template = """ { @@ -694,8 +800,7 @@ def test_render_simple_template(self): } """ - rendered = loader.render_template( - loader=jinja2.DictLoader({"unittest": template}), template_name="unittest", clock=StaticClock) + rendered = loader.render_template(template, template_internal_vars=TemplateRenderTests.unittest_template_internal_vars) expected = """ { @@ -713,8 +818,8 @@ def test_render_template_with_external_variables(self): } """ - rendered = loader.render_template( - loader=jinja2.DictLoader({"unittest": template}), template_name="unittest", template_vars={"greeting": "Hi"}, clock=StaticClock) + rendered = loader.render_template(template, template_vars={"greeting": "Hi"}, + template_internal_vars=TemplateRenderTests.unittest_template_internal_vars) expected = """ { @@ -744,15 +849,24 @@ def key_globber(e): } """ + source = io.DictStringFileSourceFactory({ + "dynamic-key-1": [ + textwrap.dedent('"dkey1": "value1"') + ], + "dynamic-key-2": [ + textwrap.dedent('"dkey2": "value2"') + ], + "dynamic-key-3": [ + textwrap.dedent('"dkey3": "value3"') + ] + }) + + template_source = loader.TemplateSource("", "track.json", source=source, fileglobber=key_globber) + template_source.load_template_from_string(template) + rendered = loader.render_template( - loader=jinja2.DictLoader( - { - "unittest": template, - "dynamic-key-1": '"dkey1": "value1"', - "dynamic-key-2": '"dkey2": "value2"', - "dynamic-key-3": '"dkey3": "value3"', - }), - template_name="unittest", glob_helper=key_globber, clock=StaticClock) + template_source.assembled_source, + template_internal_vars=TemplateRenderTests.unittest_template_internal_vars) expected = """ { @@ -766,30 +880,20 @@ def key_globber(e): self.assertEqualIgnoreWhitespace(expected, rendered) def test_render_template_with_variables(self): - def key_globber(e): - if e == "dynamic-key-*": - return ["dynamic-key-1", "dynamic-key-2"] - else: - return [] - template = """ {% set _clients = clients if clients is defined else 16 %} {% set _bulk_size = bulk_size if bulk_size is defined else 100 %} {% import "rally.helpers" as rally with context %} { "key1": "static value", - {{ rally.collect(parts="dynamic-key-*") }} - + "dkey1": {{ _clients }}, + "dkey2": {{ _bulk_size }} } """ rendered = loader.render_template( - loader=jinja2.DictLoader( - { - "unittest": template, - "dynamic-key-1": '"dkey1": {{ _clients }}', - "dynamic-key-2": '"dkey2": {{ _bulk_size }}', - }), - template_name="unittest", template_vars={"clients": 8}, glob_helper=key_globber, clock=StaticClock) + template, + template_vars={"clients": 8}, + template_internal_vars=TemplateRenderTests.unittest_template_internal_vars) expected = """ { @@ -804,7 +908,148 @@ def assertEqualIgnoreWhitespace(self, expected, actual): self.assertEqual(strip_ws(expected), strip_ws(actual)) +class CompleteTrackParamsTests(TestCase): + assembled_source = textwrap.dedent('''{% import "rally.helpers" as rally with context %} + "key1": "value1", + "key2": {{ value2 | default(3) }}, + "key3": {{ value3 | default("default_value3") }} + "key4": {{ value2 | default(3) }} + ''') + + def test_check_complete_track_params_contains_all_track_params(self): + complete_track_params = loader.CompleteTrackParams() + loader.register_all_params_in_track(CompleteTrackParamsTests.assembled_source, complete_track_params) + + self.assertEqual( + ["value2", "value3"], + complete_track_params.sorted_track_defined_params + ) + + def test_check_complete_track_params_does_not_fail_with_no_track_params(self): + complete_track_params = loader.CompleteTrackParams() + loader.register_all_params_in_track('{}', complete_track_params) + + self.assertEqual( + [], + complete_track_params.sorted_track_defined_params + ) + + def test_unused_user_defined_track_params(self): + track_params = { + "number_of_repliacs": 1, # deliberate typo + "enable_source": True, # unknown parameter + "number_of_shards": 5 + } + + complete_track_params = loader.CompleteTrackParams(user_specified_track_params=track_params) + complete_track_params.populate_track_defined_params(list_of_track_params=[ + "bulk_indexing_clients", + "bulk_indexing_iterations", + "bulk_size", + "cluster_health", + "number_of_replicas", + "number_of_shards"] + ) + + self.assertEqual( + ["enable_source", "number_of_repliacs"], + sorted(complete_track_params.unused_user_defined_track_params()) + ) + + def test_unused_user_defined_track_params_doesnt_fail_with_detaults(self): + complete_track_params = loader.CompleteTrackParams() + complete_track_params.populate_track_defined_params(list_of_track_params=[ + "bulk_indexing_clients", + "bulk_indexing_iterations", + "bulk_size", + "cluster_health", + "number_of_replicas", + "number_of_shards"] + ) + + self.assertEqual( + [], + sorted(complete_track_params.unused_user_defined_track_params()) + ) + + class TrackPostProcessingTests(TestCase): + track_with_params_as_string = textwrap.dedent('''{ + "indices": [ + { + "name": "test-index", + "body": "test-index-body.json", + "types": ["test-type"] + } + ], + "corpora": [ + { + "name": "unittest", + "documents": [ + { + "source-file": "documents.json.bz2", + "document-count": 10, + "compressed-bytes": 100, + "uncompressed-bytes": 10000 + } + ] + } + ], + "operations": [ + { + "name": "index-append", + "operation-type": "bulk", + "bulk-size": 5000 + }, + { + "name": "search", + "operation-type": "search" + } + ], + "challenges": [ + { + "name": "default-challenge", + "description": "Default challenge", + "schedule": [ + { + "clients": {{ bulk_indexing_clients | default(8) }}, + "operation": "index-append", + "warmup-time-period": 100, + "time-period": 240 + }, + { + "parallel": { + "tasks": [ + { + "name": "search #1", + "clients": 4, + "operation": "search", + "warmup-iterations": 1000, + "iterations": 2000, + "target-interval": 30 + }, + { + "name": "search #2", + "clients": 1, + "operation": "search", + "warmup-iterations": 1000, + "iterations": 2000, + "target-throughput": 200 + }, + { + "name": "search #3", + "clients": 1, + "operation": "search", + "iterations": 1 + } + ] + } + } + ] + } + ] + }''') + def test_post_processes_track_spec(self): track_specification = { "indices": [ @@ -954,13 +1199,30 @@ def test_post_processes_track_spec(self): ] } - self.assertEqual(self.as_track(expected_post_processed), - loader.post_process_for_test_mode(self.as_track(track_specification))) + complete_track_params = loader.CompleteTrackParams() + index_body = '{"settings": {"index.number_of_shards": {{ number_of_shards | default(5) }}, '\ + '"index.number_of_replicas": {{ number_of_replicas | default(0)}} }}' - def as_track(self, track_specification): - reader = loader.TrackSpecificationReader(source=io.DictStringFileSourceFactory({ - "/mappings/test-index-body.json": ['{"settings": {}}'] - })) + self.assertEqual( + self.as_track(expected_post_processed, complete_track_params=complete_track_params, index_body=index_body), + loader.post_process_for_test_mode( + self.as_track(track_specification, complete_track_params=complete_track_params, index_body=index_body) + ) + ) + + self.assertEqual( + ["number_of_replicas", "number_of_shards"], + complete_track_params.sorted_track_defined_params + ) + + def as_track(self, track_specification, track_params=None, complete_track_params=None, index_body=None): + reader = loader.TrackSpecificationReader( + track_params=track_params, + complete_track_params=complete_track_params, + source=io.DictStringFileSourceFactory({ + "/mappings/test-index-body.json": [index_body] + }) + ) return reader("unittest", track_specification, "/mappings") @@ -1142,7 +1404,8 @@ def test_document_count_mandatory_if_file_present(self): reader("unittest", track_specification, "/mappings") self.assertEqual("Track 'unittest' is invalid. Mandatory element 'document-count' is missing.", ctx.exception.args[0]) - def test_parse_with_mixed_warmup_iterations_and_measurement(self): + @mock.patch("esrally.track.loader.register_all_params_in_track") + def test_parse_with_mixed_warmup_iterations_and_measurement(self, mocked_params_checker): track_specification = { "description": "description for unit test", "indices": [ @@ -1197,7 +1460,8 @@ def test_parse_with_mixed_warmup_iterations_and_measurement(self): "iterations and a time period of '60' seconds. Please do not mix time periods and iterations.", ctx.exception.args[0]) - def test_parse_missing_challenge_or_challenges(self): + @mock.patch("esrally.track.loader.register_all_params_in_track") + def test_parse_missing_challenge_or_challenges(self, mocked_params_checker): track_specification = { "description": "description for unit test", "indices": [ @@ -1230,7 +1494,8 @@ def test_parse_missing_challenge_or_challenges(self): self.assertEqual("Track 'unittest' is invalid. You must define 'challenge', 'challenges' or 'schedule' but none is specified.", ctx.exception.args[0]) - def test_parse_challenge_and_challenges_are_defined(self): + @mock.patch("esrally.track.loader.register_all_params_in_track") + def test_parse_challenge_and_challenges_are_defined(self, mocked_params_checker): track_specification = { "description": "description for unit test", "indices": [ @@ -1265,14 +1530,15 @@ def test_parse_challenge_and_challenges_are_defined(self): self.assertEqual("Track 'unittest' is invalid. Multiple out of 'challenge', 'challenges' or 'schedule' are defined but only " "one of them is allowed.", ctx.exception.args[0]) - def test_parse_with_mixed_warmup_time_period_and_iterations(self): + @mock.patch("esrally.track.loader.register_all_params_in_track") + def test_parse_with_mixed_warmup_time_period_and_iterations(self, mocked_params_checker): track_specification = { "description": "description for unit test", "indices": [ { "name": "test-index", "body": "index.json", - "types": [ "docs" ] + "types": ["docs"] } ], "corpora": [ @@ -1384,7 +1650,8 @@ def test_parse_duplicate_explicit_task_names(self): "'duplicate-task-name'. Please use the task's name property to assign a unique name for each task.", ctx.exception.args[0]) - def test_load_invalid_index_body(self): + @mock.patch("esrally.track.loader.register_all_params_in_track") + def test_load_invalid_index_body(self, mocked_params_checker): track_specification = { "description": "description for unit test", "indices": [ @@ -1544,8 +1811,10 @@ def test_parse_valid_track_specification(self): } ] } + complete_track_params = loader.CompleteTrackParams() reader = loader.TrackSpecificationReader( track_params={"number_of_shards": 3}, + complete_track_params=complete_track_params, source=io.DictStringFileSourceFactory({ "/mappings/body.json": [""" { @@ -1560,6 +1829,11 @@ def test_parse_valid_track_specification(self): """] })) resulting_track = reader("unittest", track_specification, "/mappings") + # j2 variables defined in the track -- used for checking mismatching user track params + self.assertEqual( + ["number_of_shards"], + complete_track_params.sorted_track_defined_params + ) self.assertEqual("unittest", resulting_track.name) self.assertEqual("description for unit test", resulting_track.description) # indices @@ -1616,7 +1890,9 @@ def test_parse_valid_track_specification(self): self.assertEqual({"append": True}, resulting_track.challenges[0].schedule[0].operation.meta_data) self.assertEqual({"operation-index": 0}, resulting_track.challenges[0].schedule[0].meta_data) - def test_parse_valid_without_types(self): + + @mock.patch("esrally.track.loader.register_all_params_in_track") + def test_parse_valid_without_types(self, mocked_param_checker): track_specification = { "description": "description for unit test", "indices": [ @@ -1707,8 +1983,10 @@ def test_parse_valid_track_specification_with_index_template(self): "operations": [], "challenges": [] } + complete_track_params = loader.CompleteTrackParams() reader = loader.TrackSpecificationReader( track_params={"index_pattern": "*"}, + complete_track_params=complete_track_params, source=io.DictStringFileSourceFactory({ "/mappings/default-template.json": [""" { @@ -1720,6 +1998,10 @@ def test_parse_valid_track_specification_with_index_template(self): """], })) resulting_track = reader("unittest", track_specification, "/mappings") + self.assertEqual( + ["index_pattern", "number_of_shards"], + complete_track_params.sorted_track_defined_params + ) self.assertEqual("unittest", resulting_track.name) self.assertEqual("description for unit test", resulting_track.description) self.assertEqual(0, len(resulting_track.indices)) diff --git a/tests/track/resources/track_fragment_1.json b/tests/track/resources/track_fragment_1.json new file mode 100644 index 000000000..cfbd6ae6d --- /dev/null +++ b/tests/track/resources/track_fragment_1.json @@ -0,0 +1,3 @@ +{ + "item1": "value1" +} diff --git a/tests/track/resources/track_fragment_2.json b/tests/track/resources/track_fragment_2.json new file mode 100644 index 000000000..61c65135d --- /dev/null +++ b/tests/track/resources/track_fragment_2.json @@ -0,0 +1,3 @@ +{ + "item2": "value2" +} diff --git a/tests/utils/opts_test.py b/tests/utils/opts_test.py index 25214ab1f..51c868a45 100644 --- a/tests/utils/opts_test.py +++ b/tests/utils/opts_test.py @@ -35,6 +35,104 @@ def test_kv_to_map(self): opts.kv_to_map(["k:'v'", "size:4", "empty:false", "temperature:0.5"])) +class GenericHelperFunctionTests(TestCase): + def test_list_as_bulleted_list(self): + src_list = ["param-1", "param-2", "a_longer-parameter"] + + self.assertEqual( + ["- param-1", "- param-2", "- a_longer-parameter"], + opts.bulleted_list_of(src_list) + ) + + def test_list_as_double_quoted_list(self): + src_list = ["oneitem", "_another-weird_item", "param-3"] + + self.assertEqual( + opts.double_quoted_list_of(src_list), + ['"oneitem"', '"_another-weird_item"', '"param-3"'] + ) + + def test_make_list_of_close_matches(self): + word_list = [ + "bulk_indexing_clients", + "bulk_indexing_iterations", + "target_throughput", + "bulk_size", + "number_of-shards", + "number_of_replicas", + "index_refresh_interval"] + + available_word_list = [ + "bulk_indexing_clients", + "bulk_indexing_iterations", + "bulk_size", + "cluster_health", + "disk_type", + "duration", + "forcemerge", + "index_alias", + "index_refresh_interval", + "indices_delete_pattern", + "joiner", + "max_rolledover_indices", + "number_of_replicas", + "number_of_shards", + "ops_per_25_gb", + "p1_bulk_indexing_clients", + "p1_bulk_size", + "p1_duration_secs", + "p2_bulk_indexing_clients", + "p2_bulk_size", + "p2_duration_secs", + "p2_ops", + "p2_query1_target_interval", + "p2_query2_target_interval", + "p2_query3_target_interval", + "p2_query4_target_interval", + "phase_duration_secs", + "pre_filter_shard_size", + "query_iterations", + "range", + "rate_limit_duration_secs", + "rate_limit_max", + "rate_limit_step", + "rolledover_indices_suffix_separator", + "rollover_max_age", + "rollover_max_size", + "shard_count", + "shard_sizing_iterations", + "shard_sizing_queries", + "source_enabled", + "target_throughput", + "translog_sync"] + + self.assertEqual( + ['bulk_indexing_clients', + 'bulk_indexing_iterations', + 'target_throughput', + 'bulk_size', + # number_of-shards had a typo + 'number_of_shards', + 'number_of_replicas', + 'index_refresh_interval'], + opts.make_list_of_close_matches(word_list, available_word_list) + ) + + def test_make_list_of_close_matches_returns_with_empty_word_list(self): + self.assertEqual( + [], + opts.make_list_of_close_matches([], ["number_of_shards"]) + ) + + def test_make_list_of_close_matches_returns_empty_list_with_no_close_matches(self): + self.assertEqual( + [], + opts.make_list_of_close_matches( + ["number_of_shards", "number_of-replicas"], + []) + ) + + class TestTargetHosts(TestCase): def test_empty_arg_parses_as_empty_list(self): self.assertEqual([], opts.TargetHosts('').default)