diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 0a9060974..ca0315081 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -50,6 +50,8 @@ def register_default_runners(): register_runner(track.OperationType.Refresh.name, Retry(Refresh()), async_runner=True) register_runner(track.OperationType.CreateIndex.name, Retry(CreateIndex()), async_runner=True) register_runner(track.OperationType.DeleteIndex.name, Retry(DeleteIndex()), async_runner=True) + register_runner(track.OperationType.CreateDataStream.name, Retry(CreateDataStream()), async_runner=True) + register_runner(track.OperationType.DeleteDataStream.name, Retry(DeleteDataStream()), async_runner=True) register_runner(track.OperationType.CreateIndexTemplate.name, Retry(CreateIndexTemplate()), async_runner=True) register_runner(track.OperationType.DeleteIndexTemplate.name, Retry(DeleteIndexTemplate()), async_runner=True) register_runner(track.OperationType.ShrinkIndex.name, Retry(ShrinkIndex()), async_runner=True) @@ -1031,6 +1033,22 @@ def __repr__(self, *args, **kwargs): return "create-index" +class CreateDataStream(Runner): + """ + Execute the `create data stream API `_. + """ + + async def __call__(self, es, params): + data_streams = mandatory(params, "data-streams", self) + request_params = mandatory(params, "request-params", self) + for data_stream in data_streams: + await es.indices.create_data_stream(data_stream, params=request_params) + return len(data_streams), "ops" + + def __repr__(self, *args, **kwargs): + return "create-data-stream" + + class DeleteIndex(Runner): """ Execute the `delete index API `_. @@ -1058,6 +1076,33 @@ def __repr__(self, *args, **kwargs): return "delete-index" +class DeleteDataStream(Runner): + """ + Execute the `delete data stream API `_. + """ + + async def __call__(self, es, params): + ops = 0 + + data_streams = mandatory(params, "data-streams", self) + only_if_exists = mandatory(params, "only-if-exists", self) + request_params = mandatory(params, "request-params", self) + + for data_stream in data_streams: + if not only_if_exists: + await es.indices.delete_data_stream(data_stream, ignore=[404], params=request_params) + ops += 1 + elif only_if_exists and await es.indices.exists(index=data_stream): + self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) + await es.indices.delete_data_stream(data_stream, params=request_params) + ops += 1 + + return ops, "ops" + + def __repr__(self, *args, **kwargs): + return "delete-data-stream" + + class CreateIndexTemplate(Runner): """ Execute the `PUT index template API `_. diff --git a/esrally/resources/track-schema.json b/esrally/resources/track-schema.json index b22be6782..ef2edf9da 100644 --- a/esrally/resources/track-schema.json +++ b/esrally/resources/track-schema.json @@ -262,6 +262,24 @@ ] } }, + "data-streams": { + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "title": "Data Stream", + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Name of the data stream to create." + } + }, + "required": [ + "name" + ] + } + }, "corpora": { "type": "array", "minItems": 1, @@ -291,6 +309,10 @@ "type": "string", "description": "The name of the associated index (if any)." }, + "target-data-stream": { + "type": "string", + "description": "The name of the associated data stream (if any)." + }, "target-type": { "type": "string", "description": "The name of the associated document type (if any)." @@ -469,4 +491,4 @@ "$ref": "#/definitions/schedule" } } -} \ No newline at end of file +} diff --git a/esrally/track/loader.py b/esrally/track/loader.py index 105660377..51bc08c2d 100644 --- a/esrally/track/loader.py +++ b/esrally/track/loader.py @@ -723,6 +723,7 @@ def filter_out_match(task, filters, exclude): return t + def filters_from_filtered_tasks(filtered_tasks): filters = [] if filtered_tasks: @@ -987,13 +988,19 @@ def __call__(self, track_name, track_specification, mapping_dir): meta_data = self._r(track_specification, "meta", mandatory=False) indices = [self._create_index(idx, mapping_dir) for idx in self._r(track_specification, "indices", mandatory=False, default_value=[])] + data_streams = [self._create_data_stream(idx) + for idx in self._r(track_specification, "data-streams", mandatory=False, default_value=[])] + if len(indices) > 0 and len(data_streams) > 0: + # we guard against this early and support either or + raise TrackSyntaxError("indices and data-streams cannot both be specified") templates = [self._create_index_template(tpl, mapping_dir) for tpl in self._r(track_specification, "templates", mandatory=False, default_value=[])] - corpora = self._create_corpora(self._r(track_specification, "corpora", mandatory=False, default_value=[]), indices) + corpora = self._create_corpora(self._r(track_specification, "corpora", mandatory=False, default_value=[]), + indices, data_streams) challenges = self._create_challenges(track_specification) # at this point, *all* track params must have been referenced in the templates - return track.Track(name=self.name, meta_data=meta_data, description=description, challenges=challenges, indices=indices, - templates=templates, corpora=corpora) + return track.Track(name=self.name, meta_data=meta_data, description=description, challenges=challenges, + indices=indices, data_streams=data_streams, templates=templates, corpora=corpora) def _error(self, msg): raise TrackSyntaxError("Track '%s' is invalid. %s" % (self.name, msg)) @@ -1031,6 +1038,9 @@ def _create_index(self, index_spec, mapping_dir): return track.Index(name=index_name, body=body, types=self._r(index_spec, "types", mandatory=False, default_value=[])) + def _create_data_stream(self, data_stream_spec): + return track.DataStream(name=self._r(data_stream_spec, "name")) + def _create_index_template(self, tpl_spec, mapping_dir): name = self._r(tpl_spec, "name") template_file = self._r(tpl_spec, "template") @@ -1056,7 +1066,9 @@ def _load_template(self, contents, description): self.logger.exception("Could not load file template for %s.", description) raise TrackSyntaxError("Could not load file template for '%s'" % description, str(e)) - def _create_corpora(self, corpora_specs, indices): + def _create_corpora(self, corpora_specs, indices, data_streams): + if len(indices) > 0 and len(data_streams) > 0: + raise TrackSyntaxError("indices and data-streams cannot both be specified") document_corpora = [] known_corpora_names = set() for corpus_spec in corpora_specs: @@ -1069,17 +1081,29 @@ def _create_corpora(self, corpora_specs, indices): corpus = track.DocumentCorpus(name=name) # defaults on corpus level default_base_url = self._r(corpus_spec, "base-url", mandatory=False, default_value=None) - default_source_format = self._r(corpus_spec, "source-format", mandatory=False, default_value=track.Documents.SOURCE_FORMAT_BULK) - default_action_and_meta_data = self._r(corpus_spec, "includes-action-and-meta-data", mandatory=False, default_value=False) + default_source_format = self._r(corpus_spec, "source-format", mandatory=False, + default_value=track.Documents.SOURCE_FORMAT_BULK) + default_action_and_meta_data = self._r(corpus_spec, "includes-action-and-meta-data", mandatory=False, + default_value=False) + corpus_target_idx = None + corpus_target_ds = None + corpus_target_type = None if len(indices) == 1: corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False, default_value=indices[0].name) - else: + elif len(indices) > 0: corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False) + if len(data_streams) == 1: + corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False, + default_value=data_streams[0].name) + elif len(data_streams) > 0: + corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False) + if len(indices) == 1 and len(indices[0].types) == 1: - corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False, default_value=indices[0].types[0]) - else: + corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False, + default_value=indices[0].types[0]) + elif len(indices) > 0: corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False) for doc_spec in self._r(corpus_spec, "documents"): @@ -1103,13 +1127,37 @@ def _create_corpora(self, corpora_specs, indices): if includes_action_and_meta_data: target_idx = None target_type = None + target_ds = None else: - # we need an index if no meta-data are present. - target_idx = self._r(doc_spec, "target-index", mandatory=corpus_target_idx is None, - default_value=corpus_target_idx, error_ctx=docs) target_type = self._r(doc_spec, "target-type", mandatory=False, default_value=corpus_target_type, error_ctx=docs) + # require to be specified if we're using data streams and we have no default + target_ds = self._r(doc_spec, "target-data-stream", + mandatory=len(data_streams) > 0 and corpus_target_ds is None, + default_value=corpus_target_ds, + error_ctx=docs) + if target_ds and len(indices) > 0: + # if indices are in use we error + raise TrackSyntaxError("target-data-stream cannot be used when using indices") + elif target_ds and target_type: + raise TrackSyntaxError("target-type cannot be used when using data-streams") + + # need an index if we're using indices and no meta-data are present and we don't have a default + target_idx = self._r(doc_spec, "target-index", + mandatory=len(indices) > 0 and corpus_target_idx is None, + default_value=corpus_target_idx, + error_ctx=docs) + # either target_idx or target_ds + if target_idx and len(data_streams) > 0: + # if data streams are in use we error + raise TrackSyntaxError("target-index cannot be used when using data-streams") + + # we need one or the other + if target_idx is None and target_ds is None: + raise TrackSyntaxError(f"a {'target-index' if len(indices) > 0 else 'target-data-stream'} " + f"is required for {docs}" ) + docs = track.Documents(source_format=source_format, document_file=document_file, document_archive=document_archive, @@ -1118,11 +1166,11 @@ def _create_corpora(self, corpora_specs, indices): number_of_documents=num_docs, compressed_size_in_bytes=compressed_bytes, uncompressed_size_in_bytes=uncompressed_bytes, - target_index=target_idx, target_type=target_type) + target_index=target_idx, target_type=target_type, + target_data_stream=target_ds) corpus.documents.append(docs) else: self._error("Unknown source-format [%s] in document corpus [%s]." % (source_format, name)) - document_corpora.append(corpus) return document_corpora @@ -1257,7 +1305,8 @@ def parse_task(self, task_spec, ops, challenge_name, default_warmup_iterations=N warmup_iterations=self._r(task_spec, "warmup-iterations", error_ctx=op.name, mandatory=False, default_value=default_warmup_iterations), iterations=self._r(task_spec, "iterations", error_ctx=op.name, mandatory=False, default_value=default_iterations), - warmup_time_period=self._r(task_spec, "warmup-time-period", error_ctx=op.name, mandatory=False, + warmup_time_period=self._r(task_spec, "warmup-time-period", error_ctx=op.name, + mandatory=False, default_value=default_warmup_time_period), time_period=self._r(task_spec, "time-period", error_ctx=op.name, mandatory=False, default_value=default_time_period), diff --git a/esrally/track/params.py b/esrally/track/params.py index 885943542..18f9c7352 100644 --- a/esrally/track/params.py +++ b/esrally/track/params.py @@ -202,6 +202,68 @@ def params(self): return p +class CreateDataStreamParamSource(ParamSource): + def __init__(self, track, params, **kwargs): + super().__init__(track, params, **kwargs) + self.request_params = params.get("request-params", {}) + self.data_stream_definitions = [] + if track.data_streams: + filter_ds = params.get("data-stream") + if isinstance(filter_ds, str): + filter_ds = [filter_ds] + for ds in track.data_streams: + if not filter_ds or ds.name in filter_ds: + self.data_stream_definitions.append(ds.name) + else: + try: + data_stream = params["data-stream"] + data_streams = [data_stream] if isinstance(data_stream, str) else data_stream + for ds in data_streams: + self.data_stream_definitions.append(ds) + except KeyError: + raise exceptions.InvalidSyntax("Please set the property 'data-stream' for the create-data-stream operation") + + def params(self): + p = {} + # ensure we pass all parameters... + p.update(self._params) + p.update({ + "data-streams": self.data_stream_definitions, + "request-params": self.request_params + }) + return p + + +class DeleteDataStreamParamSource(ParamSource): + def __init__(self, track, params, **kwargs): + super().__init__(track, params, **kwargs) + self.request_params = params.get("request-params", {}) + self.only_if_exists = params.get("only-if-exists", True) + + self.data_stream_definitions = [] + target_data_stream = params.get("data-stream") + if target_data_stream: + target_data_stream = [target_data_stream] if isinstance(target_data_stream, str) else target_data_stream + for ds in target_data_stream: + self.data_stream_definitions.append(ds) + elif track.data_streams: + for ds in track.data_streams: + self.data_stream_definitions.append(ds.name) + else: + raise exceptions.InvalidSyntax("delete-data-stream operation targets no data stream") + + def params(self): + p = {} + # ensure we pass all parameters... + p.update(self._params) + p.update({ + "data-streams": self.data_stream_definitions, + "request-params": self.request_params, + "only-if-exists": self.only_if_exists + }) + return p + + class DeleteIndexParamSource(ParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) @@ -343,12 +405,19 @@ class SearchParamSource(ParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) if len(track.indices) == 1: - default_index = track.indices[0].name + default_target = track.indices[0].name + elif len(track.data_streams) == 1: + default_target = track.data_streams[0].name else: - default_index = None - - index_name = params.get("index", default_index) + default_target = None + # indices are preferred by data streams can also be queried the same way + target_name = params.get("index") type_name = params.get("type") + if not target_name: + target_name = params.get("data-stream", default_target) + if target_name and type_name: + raise exceptions.InvalidSyntax( + f"'type' not supported with 'data-stream' for operation '{kwargs.get('operation_name')}'") request_cache = params.get("cache", None) query_body = params.get("body", None) query_body_params = params.get("body-params", None) @@ -358,7 +427,7 @@ def __init__(self, track, params, **kwargs): response_compression_enabled = params.get("response-compression-enabled", True) self.query_params = { - "index": index_name, + "index": target_name, "type": type_name, "cache": request_cache, "request-params": request_params, @@ -366,8 +435,9 @@ def __init__(self, track, params, **kwargs): "body": query_body } - if not index_name: - raise exceptions.InvalidSyntax("'index' is mandatory and is missing for operation '{}'".format(kwargs.get("operation_name"))) + if not target_name: + raise exceptions.InvalidSyntax( + f"'index' or 'data-stream' is mandatory and is missing for operation '{kwargs.get('operation_name')}'") if pages: self.query_params["pages"] = pages @@ -443,6 +513,9 @@ def __init__(self, track, params, **kwargs): else: raise exceptions.InvalidSyntax("Unknown 'conflicts' setting [%s]" % id_conflicts) + if "data-streams" in params and self.id_conflicts != IndexIdConflict.NoConflicts: + raise exceptions.InvalidSyntax("'conflicts' cannot be used with 'data-streams'") + if self.id_conflicts != IndexIdConflict.NoConflicts: self.conflict_probability = self.float_param(params, name="conflict-probability", default_value=25, min_value=0, max_value=100, min_operator=operator.lt) @@ -517,7 +590,9 @@ def used_corpora(self, t, params): for corpus in t.corpora: if corpus.name in corpora_names: - filtered_corpus = corpus.filter(source_format=track.Documents.SOURCE_FORMAT_BULK, target_indices=params.get("indices")) + filtered_corpus = corpus.filter(source_format=track.Documents.SOURCE_FORMAT_BULK, + target_indices=params.get("indices"), + target_data_streams=params.get("data-streams")) if filtered_corpus.number_of_documents(source_format=track.Documents.SOURCE_FORMAT_BULK) > 0: corpora.append(filtered_corpus) @@ -613,12 +688,16 @@ def percent_completed(self): class ForceMergeParamSource(ParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) - if len(track.indices) > 0: - default_index = ','.join(map(str, track.indices)) + if len(track.indices) > 0 or len(track.data_streams) > 0: + # force merge data streams and indices - API call is the same so treat as indices + default_target = ','.join(map(str, track.indices + track.data_streams)) else: - default_index = "_all" + default_target = "_all" + + self._target_name = params.get("index") + if not self._target_name: + self._target_name = params.get("data-stream", default_target) - self._index_name = params.get("index", default_index) self._max_num_segments = params.get("max-num-segments") self._request_timeout = params.get("request-timeout") self._poll_period = params.get("poll-period", 10) @@ -626,7 +705,7 @@ def __init__(self, track, params, **kwargs): def params(self): return { - "index": self._index_name, + "index": self._target_name, "max-num-segments": self._max_num_segments, "request-timeout": self._request_timeout, "mode": self._mode, @@ -679,14 +758,24 @@ def chain(*iterables): def create_default_reader(docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency): source = Slice(io.MmapSource, offset, num_lines) + target = None + use_create = False + if docs.target_index: + target = docs.target_index + elif docs.target_data_stream: + target = docs.target_data_stream + use_create = True + if id_conflicts != IndexIdConflict.NoConflicts: + # can only create docs in data streams + raise exceptions.RallyError("Conflicts cannot be generated with append only data streams") if docs.includes_action_and_meta_data: - return SourceOnlyIndexDataReader(docs.document_file, batch_size, bulk_size, source, docs.target_index, docs.target_type) + return SourceOnlyIndexDataReader(docs.document_file, batch_size, bulk_size, source, target, docs.target_type) else: - am_handler = GenerateActionMetaData(docs.target_index, docs.target_type, + am_handler = GenerateActionMetaData(target, docs.target_type, build_conflicting_ids(id_conflicts, num_docs, offset), conflict_probability, - on_conflict, recency) - return MetadataIndexDataReader(docs.document_file, batch_size, bulk_size, source, am_handler, docs.target_index, docs.target_type) + on_conflict, recency, use_create=use_create) + return MetadataIndexDataReader(docs.document_file, batch_size, bulk_size, source, am_handler, target, docs.target_type) def create_readers(num_clients, start_client_index, end_client_index, corpora, batch_size, bulk_size, id_conflicts, @@ -698,9 +787,12 @@ def create_readers(num_clients, start_client_index, end_client_index, corpora, b offset, num_docs, num_lines = bounds(docs.number_of_documents, start_client_index, end_client_index, num_clients, docs.includes_action_and_meta_data) if num_docs > 0: - logger.info("Task-relative clients at index [%d-%d] will bulk index [%d] docs starting from line offset [%d] for [%s/%s] " + target = f"{docs.target_index}/{docs.target_type}" if docs.target_index else "/" + if docs.target_data_stream: + target = docs.target_data_stream + logger.info("Task-relative clients at index [%d-%d] will bulk index [%d] docs starting from line offset [%d] for [%s] " "from corpus [%s].", start_client_index, end_client_index, num_docs, offset, - docs.target_index, docs.target_type, corpus.name) + target, corpus.name) readers.append(create_reader(docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency)) else: @@ -790,8 +882,8 @@ def bulk_data_based(num_clients, start_client_index, end_client_index, corpora, class GenerateActionMetaData: RECENCY_SLOPE = 30 - def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probability=None, on_conflict=None, - recency=None, rand=random.random, randint=random.randint, randexp=random.expovariate): + def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probability=None, on_conflict=None, recency=None, + rand=random.random, randint=random.randint, randexp=random.expovariate, use_create=False): if type_name: self.meta_data_index_with_id = '{"index": {"_index": "%s", "_type": "%s", "_id": "%s"}}\n' % \ (index_name, type_name, "%s") @@ -802,9 +894,12 @@ def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probabi self.meta_data_index_with_id = '{"index": {"_index": "%s", "_id": "%s"}}\n' % (index_name, "%s") self.meta_data_update_with_id = '{"update": {"_index": "%s", "_id": "%s"}}\n' % (index_name, "%s") self.meta_data_index_no_id = '{"index": {"_index": "%s"}}\n' % index_name - + self.meta_data_create_no_id = '{"create": {"_index": "%s"}}\n' % index_name + if use_create and conflicting_ids: + raise exceptions.RallyError("Index mode '_create' cannot be used with conflicting ids") self.conflicting_ids = conflicting_ids self.on_conflict = on_conflict + self.use_create = use_create # random() produces numbers between 0 and 1 and the user denotes the probability in percentage between 0 and 100 self.conflict_probability = conflict_probability / 100.0 if conflict_probability is not None else 0 self.recency = recency if recency is not None else 0 @@ -857,6 +952,8 @@ def __next__(self): else: raise exceptions.RallyAssertionError("Unknown action [{}]".format(action)) else: + if self.use_create: + return "create", self.meta_data_create_no_id return "index", self.meta_data_index_no_id @@ -1020,6 +1117,8 @@ def read_bulk(self): register_param_source_for_operation(track.OperationType.Search, SearchParamSource) register_param_source_for_operation(track.OperationType.CreateIndex, CreateIndexParamSource) register_param_source_for_operation(track.OperationType.DeleteIndex, DeleteIndexParamSource) +register_param_source_for_operation(track.OperationType.CreateDataStream, CreateDataStreamParamSource) +register_param_source_for_operation(track.OperationType.DeleteDataStream, DeleteDataStreamParamSource) register_param_source_for_operation(track.OperationType.CreateIndexTemplate, CreateIndexTemplateParamSource) register_param_source_for_operation(track.OperationType.DeleteIndexTemplate, DeleteIndexTemplateParamSource) register_param_source_for_operation(track.OperationType.Sleep, SleepParamSource) diff --git a/esrally/track/track.py b/esrally/track/track.py index 52c0660ed..5a6292710 100644 --- a/esrally/track/track.py +++ b/esrally/track/track.py @@ -68,6 +68,46 @@ def __eq__(self, other): return self.name == other.name +class DataStream: + """ + Defines a data stream in Elasticsearch. + """ + + def __init__(self, name): + """ + + Creates a new data stream. + + :param name: The data stream name. Mandatory. + """ + self.name = name + + def matches(self, pattern): + if pattern is None: + return True + elif pattern in ["_all", "*"]: + return True + elif self.name == pattern: + return True + else: + return False + + def __str__(self): + return self.name + + def __repr__(self): + r = [] + for prop, value in vars(self).items(): + r.append("%s = [%s]" % (prop, repr(value))) + return ", ".join(r) + + def __hash__(self): + return hash(self.name) + + def __eq__(self, other): + return self.name == other.name + + class IndexTemplate: """ Defines an index template in Elasticsearch. @@ -107,8 +147,10 @@ def __eq__(self, other): class Documents: SOURCE_FORMAT_BULK = "bulk" - def __init__(self, source_format, document_file=None, document_archive=None, base_url=None, includes_action_and_meta_data=False, - number_of_documents=0, compressed_size_in_bytes=0, uncompressed_size_in_bytes=0, target_index=None, target_type=None): + def __init__(self, source_format, document_file=None, document_archive=None, base_url=None, + includes_action_and_meta_data=False, + number_of_documents=0, compressed_size_in_bytes=0, uncompressed_size_in_bytes=0, target_index=None, + target_data_stream=None, target_type=None): """ :param source_format: The format of these documents. Mandatory. @@ -123,9 +165,11 @@ def __init__(self, source_format, document_file=None, document_archive=None, bas a document_archive is given. :param compressed_size_in_bytes: The compressed size in bytes of the benchmark document. Needed for verification of the download and user reporting. Only useful if a document_archive is given (optional but recommended to be set). - :param uncompressed_size_in_bytes: The size in bytes of the benchmark document after decompressing it. Only useful if a - document_archive is given (optional but recommended to be set). + :param uncompressed_size_in_bytes: The size in bytes of the benchmark document after decompressing it. + Only useful if a document_archive is given (optional but recommended to be set). :param target_index: The index to target for bulk operations. May be ``None`` if ``includes_action_and_meta_data`` is ``False``. + :param target_data_stream: The data stream to target for bulk operations. + Maybe be ``None`` if ``includes_action_and_meta_data`` is ``False``. :param target_type: The document type to target for bulk operations. May be ``None`` if ``includes_action_and_meta_data`` is ``False``. """ @@ -139,6 +183,7 @@ def __init__(self, source_format, document_file=None, document_archive=None, bas self.compressed_size_in_bytes = compressed_size_in_bytes self.uncompressed_size_in_bytes = uncompressed_size_in_bytes self.target_index = target_index + self.target_data_stream = target_data_stream self.target_type = target_type def has_compressed_corpus(self): @@ -170,16 +215,16 @@ def __repr__(self): def __hash__(self): return hash(self.source_format) ^ hash(self.document_file) ^ hash(self.document_archive) ^ hash(self.base_url) ^ \ hash(self.includes_action_and_meta_data) ^ hash(self.number_of_documents) ^ hash(self.compressed_size_in_bytes) ^ \ - hash(self.uncompressed_size_in_bytes) ^ hash(self.target_index) ^ hash(self.target_type) + hash(self.uncompressed_size_in_bytes) ^ hash(self.target_index) ^ hash(self.target_data_stream) ^ hash(self.target_type) def __eq__(self, othr): return (isinstance(othr, type(self)) and (self.source_format, self.document_file, self.document_archive, self.base_url, self.includes_action_and_meta_data, self.number_of_documents, self.compressed_size_in_bytes, self.uncompressed_size_in_bytes, - self.target_type, self.target_type) == + self.target_type, self.target_data_stream, self.target_type) == (othr.source_format, othr.document_file, othr.document_archive, othr.base_url, othr.includes_action_and_meta_data, othr.number_of_documents, othr.compressed_size_in_bytes, othr.uncompressed_size_in_bytes, - othr.target_type, othr.target_type)) + othr.target_type, othr.target_data_stream, othr.target_type)) class DocumentCorpus: @@ -219,7 +264,7 @@ def uncompressed_size_in_bytes(self, source_format): return None return num - def filter(self, source_format=None, target_indices=None): + def filter(self, source_format=None, target_indices=None, target_data_streams=None): filtered = [] for d in self.documents: # skip if source format or target index does not match @@ -227,6 +272,8 @@ def filter(self, source_format=None, target_indices=None): continue if target_indices and d.target_index not in target_indices: continue + if target_data_streams and d.target_data_stream not in target_data_streams: + continue filtered.append(d) return DocumentCorpus(self.name, filtered) @@ -262,8 +309,8 @@ class Track: A track defines the data set that is used. It corresponds loosely to a use case (e.g. logging, event processing, analytics, ...) """ - def __init__(self, name, description=None, meta_data=None, challenges=None, indices=None, templates=None, corpora=None, - has_plugins=False): + def __init__(self, name, description=None, meta_data=None, challenges=None, indices=None, data_streams=None, + templates=None, corpora=None, has_plugins=False): """ Creates a new track. @@ -274,6 +321,7 @@ def __init__(self, name, description=None, meta_data=None, challenges=None, indi :param challenges: A list of one or more challenges to use. Precondition: If the list is non-empty it contains exactly one element with its ``default`` property set to ``True``. :param indices: A list of indices for this track. May be None. + :param data_streams: A list of data streams for this track. May be None. :param templates: A list of index templates for this track. May be None. :param corpora: A list of document corpus definitions for this track. May be None. :param has_plugins: True iff the track also defines plugins (e.g. custom runners or parameter sources). @@ -283,6 +331,7 @@ def __init__(self, name, description=None, meta_data=None, challenges=None, indi self.description = description if description is not None else "" self.challenges = challenges if challenges else [] self.indices = indices if indices else [] + self.data_streams = data_streams if data_streams else [] self.corpora = corpora if corpora else [] self.templates = templates if templates else [] self.has_plugins = has_plugins @@ -354,12 +403,14 @@ def __repr__(self): def __hash__(self): return hash(self.name) ^ hash(self.meta_data) ^ hash(self.description) ^ hash(self.challenges) ^ \ - hash(self.indices) ^ hash(self.templates) ^ hash(self.corpora) + hash(self.indices) ^ hash(self.data_streams) ^ hash(self.templates) ^ hash(self.corpora) def __eq__(self, othr): return (isinstance(othr, type(self)) and - (self.name, self.meta_data, self.description, self.challenges, self.indices, self.templates, self.corpora) == - (othr.name, othr.meta_data, othr.description, othr.challenges, othr.indices, othr.templates, othr.corpora)) + (self.name, self.meta_data, self.description, self.challenges, self.indices, self.data_streams, + self.templates, self.corpora) == + (othr.name, othr.meta_data, othr.description, othr.challenges, othr.indices, othr.data_streams, + othr.templates, othr.corpora)) class Challenge: @@ -449,6 +500,8 @@ class OperationType(Enum): StartTransform = 1025 WaitForTransform = 1026 DeleteTransform = 1027 + CreateDataStream = 1028 + DeleteDataStream = 1029 @property def admin_op(self): @@ -526,6 +579,10 @@ def from_hyphenated_string(cls, v): return OperationType.WaitForTransform elif v == "delete-transform": return OperationType.DeleteTransform + elif v == "create-data-stream": + return OperationType.CreateDataStream + elif v == "delete-data-stream": + return OperationType.DeleteDataStream else: raise KeyError("No enum value for [%s]" % v) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index f34e85ed6..51527738b 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -2115,6 +2115,52 @@ async def test_param_indices_mandatory(self, es): self.assertEqual(0, es.indices.create.call_count) +class CreateDataStreamRunnerTests(TestCase): + @mock.patch("elasticsearch.Elasticsearch") + @run_async + async def test_creates_multiple_data_streams(self, es): + es.indices.create_data_stream.return_value = as_future() + + r = runner.CreateDataStream() + + request_params = { + "wait_for_active_shards": "true" + } + + params = { + "data-streams": [ + "data-stream-A", + "data-stream-B" + ], + "request-params": request_params + } + + result = await r(es, params) + + self.assertEqual((2, "ops"), result) + + es.indices.create_data_stream.assert_has_calls([ + mock.call("data-stream-A", params=request_params), + mock.call("data-stream-B", params=request_params) + ]) + + + @mock.patch("elasticsearch.Elasticsearch") + @run_async + async def test_param_data_streams_mandatory(self, es): + es.indices.create_data_stream.return_value = as_future() + + r = runner.CreateDataStream() + + params = {} + with self.assertRaisesRegex(exceptions.DataError, + "Parameter source for operation 'create-data-stream' did not provide the " + "mandatory parameter 'data-streams'. Please add it to your parameter source."): + await r(es, params) + + self.assertEqual(0, es.indices.create_data_stream.call_count) + + class DeleteIndexRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -2162,6 +2208,54 @@ async def test_deletes_all_indices(self, es): self.assertEqual(0, es.indices.exists.call_count) +class DeleteDataStreamRunnerTests(TestCase): + @mock.patch("elasticsearch.Elasticsearch") + @run_async + async def test_deletes_existing_data_streams(self, es): + es.indices.exists.side_effect = [as_future(False), as_future(True)] + es.indices.delete_data_stream.return_value = as_future() + + r = runner.DeleteDataStream() + + params = { + "data-streams": ["data-stream-A", "data-stream-B"], + "only-if-exists": True, + "request-params": {} + } + + result = await r(es, params) + + self.assertEqual((1, "ops"), result) + + es.indices.delete_data_stream.assert_called_once_with("data-stream-B", params={}) + + @mock.patch("elasticsearch.Elasticsearch") + @run_async + async def test_deletes_all_data_streams(self, es): + es.indices.delete_data_stream.return_value = as_future() + + r = runner.DeleteDataStream() + + params = { + "data-streams": ["data-stream-A", "data-stream-B"], + "only-if-exists": False, + "request-params": { + "ignore_unavailable": "true", + "expand_wildcards": "none" + } + } + + result = await r(es, params) + + self.assertEqual((2, "ops"), result) + + es.indices.delete_data_stream.assert_has_calls([ + mock.call("data-stream-A", ignore=[404], params=params["request-params"]), + mock.call("data-stream-B", ignore=[404], params=params["request-params"]) + ]) + self.assertEqual(0, es.indices.exists.call_count) + + class CreateIndexTemplateRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async diff --git a/tests/track/loader_test.py b/tests/track/loader_test.py index 1b59bf025..2795608fd 100644 --- a/tests/track/loader_test.py +++ b/tests/track/loader_test.py @@ -851,7 +851,7 @@ def key_globber(e): source = io.DictStringFileSourceFactory({ "dynamic-key-1": [ textwrap.dedent('"dkey1": "value1"') - ], + ], "dynamic-key-2": [ textwrap.dedent('"dkey2": "value2"') ], @@ -1437,6 +1437,7 @@ def test_filters_exclude_tasks(self): self.assertEqual("node-stats", schedule[1].name) self.assertEqual("cluster-stats", schedule[2].name) + class TrackSpecificationReaderTests(TestCase): def test_description_is_optional(self): track_specification = { @@ -1453,6 +1454,7 @@ def test_can_read_track_info(self): track_specification = { "description": "description for unit test", "indices": [{"name": "test-index", "types": ["test-type"]}], + "data-streams": [], "corpora": [], "operations": [], "challenges": [] @@ -1470,8 +1472,7 @@ def test_document_count_mandatory_if_file_present(self): { "name": "test", "base-url": "https://localhost/data", - "documents": [{ "source-file": "documents-main.json.bz2"} - ] + "documents": [{"source-file": "documents-main.json.bz2"}] } ], "challenges": [] @@ -1489,7 +1490,7 @@ def test_parse_with_mixed_warmup_iterations_and_measurement(self, mocked_params_ { "name": "test-index", "body": "index.json", - "types": [ "docs" ] + "types": ["docs"] } ], "corpora": [ @@ -1545,7 +1546,7 @@ def test_parse_missing_challenge_or_challenges(self, mocked_params_checker): { "name": "test-index", "body": "index.json", - "types": [ "docs" ] + "types": ["docs"] } ], "corpora": [ @@ -1579,7 +1580,7 @@ def test_parse_challenge_and_challenges_are_defined(self, mocked_params_checker) { "name": "test-index", "body": "index.json", - "types": [ "docs" ] + "types": ["docs"] } ], "corpora": [ @@ -1816,7 +1817,7 @@ def test_parse_unique_task_names(self): self.assertEqual("search-two-clients", schedule[1].name) self.assertEqual("search", schedule[1].operation.name) - def test_parse_valid_track_specification(self): + def test_parse_indices_valid_track_specification(self): track_specification = { "description": "description for unit test", "indices": [ @@ -1893,7 +1894,7 @@ def test_parse_valid_track_specification(self): track_params={"number_of_shards": 3}, complete_track_params=complete_track_params, source=io.DictStringFileSourceFactory({ - "/mappings/body.json": [""" + "/mappings/body.json": [""" { "settings": { "number_of_shards": {{ number_of_shards }} @@ -1904,7 +1905,7 @@ def test_parse_valid_track_specification(self): } } """] - })) + })) resulting_track = reader("unittest", track_specification, "/mappings") # j2 variables defined in the track -- used for checking mismatching user track params self.assertEqual( @@ -1967,6 +1968,143 @@ def test_parse_valid_track_specification(self): self.assertEqual({"append": True}, resulting_track.challenges[0].schedule[0].operation.meta_data) self.assertEqual({"operation-index": 0}, resulting_track.challenges[0].schedule[0].meta_data) + def test_parse_data_streams_valid_track_specification(self): + track_specification = { + "description": "description for unit test", + "data-streams": [ + { + "name": "data-stream-historical" + } + ], + "corpora": [ + { + "name": "test", + "base-url": "https://localhost/data", + "documents": [ + { + "source-file": "documents-main.json.bz2", + "document-count": 10, + "compressed-bytes": 100, + "uncompressed-bytes": 10000, + "target-data-stream": "data-stream-historical" + }, + { + "source-file": "documents-secondary.json.bz2", + "includes-action-and-meta-data": True, + "document-count": 20, + "compressed-bytes": 200, + "uncompressed-bytes": 20000 + }, + { + "source-file": "documents-main.json.bz2", + "document-count": 10, + "compressed-bytes": 100, + "uncompressed-bytes": 10000, + "target-data-stream": "data-stream-historical" + } + ] + } + ], + "operations": [ + { + "name": "index-append", + "operation-type": "index", + "bulk-size": 5000, + "meta": { + "append": True + } + }, + { + "name": "search", + "operation-type": "search", + "data-stream": "data-stream-historical" + } + ], + "challenges": [ + { + "name": "default-challenge", + "description": "Default challenge", + "meta": { + "mixed": True, + "max-clients": 8 + }, + "schedule": [ + { + "clients": 8, + "operation": "index-append", + "meta": { + "operation-index": 0 + } + }, + { + "clients": 1, + "operation": "search" + } + ] + } + ] + } + complete_track_params = loader.CompleteTrackParams() + reader = loader.TrackSpecificationReader( + complete_track_params=complete_track_params) + resulting_track = reader("unittest", track_specification, "/mappings") + # j2 variables defined in the track -- used for checking mismatching user track params + self.assertEqual("unittest", resulting_track.name) + self.assertEqual("description for unit test", resulting_track.description) + # data streams + self.assertEqual(1, len(resulting_track.data_streams)) + self.assertEqual("data-stream-historical", resulting_track.data_streams[0].name) + # corpora + self.assertEqual(1, len(resulting_track.corpora)) + self.assertEqual("test", resulting_track.corpora[0].name) + self.assertEqual(3, len(resulting_track.corpora[0].documents)) + + docs_primary = resulting_track.corpora[0].documents[0] + self.assertEqual(track.Documents.SOURCE_FORMAT_BULK, docs_primary.source_format) + self.assertEqual("documents-main.json", docs_primary.document_file) + self.assertEqual("documents-main.json.bz2", docs_primary.document_archive) + self.assertEqual("https://localhost/data", docs_primary.base_url) + self.assertFalse(docs_primary.includes_action_and_meta_data) + self.assertEqual(10, docs_primary.number_of_documents) + self.assertEqual(100, docs_primary.compressed_size_in_bytes) + self.assertEqual(10000, docs_primary.uncompressed_size_in_bytes) + self.assertEqual("data-stream-historical", docs_primary.target_data_stream) + self.assertIsNone(docs_primary.target_index) + self.assertIsNone(docs_primary.target_type) + + docs_secondary = resulting_track.corpora[0].documents[1] + self.assertEqual(track.Documents.SOURCE_FORMAT_BULK, docs_secondary.source_format) + self.assertEqual("documents-secondary.json", docs_secondary.document_file) + self.assertEqual("documents-secondary.json.bz2", docs_secondary.document_archive) + self.assertEqual("https://localhost/data", docs_secondary.base_url) + self.assertTrue(docs_secondary.includes_action_and_meta_data) + self.assertEqual(20, docs_secondary.number_of_documents) + self.assertEqual(200, docs_secondary.compressed_size_in_bytes) + self.assertEqual(20000, docs_secondary.uncompressed_size_in_bytes) + # This is defined by the action-and-meta-data line! + self.assertIsNone(docs_secondary.target_data_stream) + self.assertIsNone(docs_secondary.target_index) + self.assertIsNone(docs_secondary.target_type) + + docs_tertiary = resulting_track.corpora[0].documents[2] + self.assertEqual(track.Documents.SOURCE_FORMAT_BULK, docs_tertiary.source_format) + self.assertEqual("documents-main.json", docs_tertiary.document_file) + self.assertEqual("documents-main.json.bz2", docs_tertiary.document_archive) + self.assertEqual("https://localhost/data", docs_tertiary.base_url) + self.assertFalse(docs_tertiary.includes_action_and_meta_data) + self.assertEqual(10, docs_tertiary.number_of_documents) + self.assertEqual(100, docs_tertiary.compressed_size_in_bytes) + self.assertIsNone(docs_tertiary.target_index) + self.assertIsNone(docs_tertiary.target_type) + self.assertEqual("data-stream-historical", docs_tertiary.target_data_stream) + + # challenges + self.assertEqual(1, len(resulting_track.challenges)) + self.assertEqual("default-challenge", resulting_track.challenges[0].name) + self.assertEqual("Default challenge", resulting_track.challenges[0].description) + self.assertEqual({"mixed": True, "max-clients": 8}, resulting_track.challenges[0].meta_data) + self.assertEqual({"append": True}, resulting_track.challenges[0].schedule[0].operation.meta_data) + self.assertEqual({"operation-index": 0}, resulting_track.challenges[0].schedule[0].meta_data) @mock.patch("esrally.track.loader.register_all_params_in_track") def test_parse_valid_without_types(self, mocked_param_checker): @@ -2043,6 +2181,252 @@ def test_parse_valid_without_types(self, mocked_param_checker): self.assertEqual(10000, docs_primary.uncompressed_size_in_bytes) self.assertEqual("index-historical", docs_primary.target_index) self.assertIsNone(docs_primary.target_type) + self.assertIsNone(docs_primary.target_data_stream) + + # challenges + self.assertEqual(1, len(resulting_track.challenges)) + + @mock.patch("esrally.track.loader.register_all_params_in_track") + def test_parse_invalid_data_streams_with_indices(self, mocked_param_checker): + track_specification = { + "description": "description for unit test", + "indices": [ + { + "name": "index-historical", + # no type information here + } + ], + "data-streams": [ + { + "name": "historical-data-stream" + } + ], + "corpora": [ + { + "name": "test", + "base-url": "https://localhost/data", + "documents": [ + { + "source-file": "documents-main.json.bz2", + "document-count": 10, + "compressed-bytes": 100, + "uncompressed-bytes": 10000, + }, + ] + } + ], + "schedule": [ + { + "clients": 8, + "operation": { + "name": "index-append", + "operation-type": "bulk", + "bulk-size": 5000 + } + } + ] + } + complete_track_params = loader.CompleteTrackParams() + reader = loader.TrackSpecificationReader( + complete_track_params=complete_track_params) + with self.assertRaises(loader.TrackSyntaxError) as ctx: + reader("unittest", track_specification, "/mapping") + + @mock.patch("esrally.track.loader.register_all_params_in_track") + def test_parse_invalid_data_streams_with_target_index(self, mocked_param_checker): + track_specification = { + "description": "description for unit test", + "data-streams": [ + { + "name": "historical-data-stream" + } + ], + "corpora": [ + { + "name": "test", + "base-url": "https://localhost/data", + "documents": [ + { + "source-file": "documents-main.json.bz2", + "document-count": 10, + "compressed-bytes": 100, + "uncompressed-bytes": 10000, + "target-index": "historical-index", + }, + ] + } + ], + "schedule": [ + { + "clients": 8, + "operation": { + "name": "index-append", + "operation-type": "bulk", + "bulk-size": 5000 + } + } + ] + } + complete_track_params = loader.CompleteTrackParams() + reader = loader.TrackSpecificationReader( + complete_track_params=complete_track_params) + with self.assertRaises(loader.TrackSyntaxError) as ctx: + reader("unittest", track_specification, "/mapping") + + @mock.patch("esrally.track.loader.register_all_params_in_track") + def test_parse_invalid_data_streams_with_target_type(self, mocked_param_checker): + track_specification = { + "description": "description for unit test", + "data-streams": [ + { + "name": "historical-data-stream" + } + ], + "corpora": [ + { + "name": "test", + "base-url": "https://localhost/data", + "documents": [ + { + "source-file": "documents-main.json.bz2", + "document-count": 10, + "compressed-bytes": 100, + "uncompressed-bytes": 10000, + "target-type": "_doc", + }, + ] + } + ], + "schedule": [ + { + "clients": 8, + "operation": { + "name": "index-append", + "operation-type": "bulk", + "bulk-size": 5000 + } + } + ] + } + complete_track_params = loader.CompleteTrackParams() + reader = loader.TrackSpecificationReader( + complete_track_params=complete_track_params) + with self.assertRaises(loader.TrackSyntaxError) as ctx: + reader("unittest", track_specification, "/mapping") + + @mock.patch("esrally.track.loader.register_all_params_in_track") + def test_parse_invalid_no_data_stream_target(self, mocked_param_checker): + track_specification = { + "description": "description for unit test", + "data-streams": [ + { + "name": "historical-data-stream" + }, + { + "name": "historical-data-stream-2" + } + ], + "corpora": [ + { + "name": "test", + "base-url": "https://localhost/data", + "documents": [ + { + "source-file": "documents-main.json.bz2", + "document-count": 10, + "compressed-bytes": 100, + "uncompressed-bytes": 10000 + } + ] + } + ], + "schedule": [ + { + "clients": 8, + "operation": { + "name": "index-append", + "operation-type": "bulk", + "bulk-size": 5000 + } + } + ] + } + complete_track_params = loader.CompleteTrackParams() + reader = loader.TrackSpecificationReader( + complete_track_params=complete_track_params) + with self.assertRaises(loader.TrackSyntaxError) as ctx: + reader("unittest", track_specification, "/mapping") + + @mock.patch("esrally.track.loader.register_all_params_in_track") + def test_parse_valid_without_indices(self, mocked_param_checker): + track_specification = { + "description": "description for unit test", + "data-streams": [ + { + "name": "historical-data-stream" + } + ], + "corpora": [ + { + "name": "test", + "base-url": "https://localhost/data", + "documents": [ + { + "source-file": "documents-main.json.bz2", + "document-count": 10, + "compressed-bytes": 100, + "uncompressed-bytes": 10000, + }, + ] + } + ], + "schedule": [ + { + "clients": 8, + "operation": { + "name": "index-append", + "operation-type": "bulk", + "bulk-size": 5000 + } + } + ] + } + reader = loader.TrackSpecificationReader( + track_params={"number_of_shards": 3}, + source=io.DictStringFileSourceFactory({ + "/mappings/body.json": [""" + { + "settings": { + "number_of_shards": {{ number_of_shards }} + } + } + """] + })) + resulting_track = reader("unittest", track_specification, "/mappings") + self.assertEqual("unittest", resulting_track.name) + self.assertEqual("description for unit test", resulting_track.description) + # indices + self.assertEqual(0, len(resulting_track.indices)) + # data streams + self.assertEqual(1, len(resulting_track.data_streams)) + self.assertEqual("historical-data-stream", resulting_track.data_streams[0].name) + # corpora + self.assertEqual(1, len(resulting_track.corpora)) + self.assertEqual("test", resulting_track.corpora[0].name) + self.assertEqual(1, len(resulting_track.corpora[0].documents)) + + docs_primary = resulting_track.corpora[0].documents[0] + self.assertEqual(track.Documents.SOURCE_FORMAT_BULK, docs_primary.source_format) + self.assertEqual("documents-main.json", docs_primary.document_file) + self.assertEqual("documents-main.json.bz2", docs_primary.document_archive) + self.assertEqual("https://localhost/data", docs_primary.base_url) + self.assertFalse(docs_primary.includes_action_and_meta_data) + self.assertEqual(10, docs_primary.number_of_documents) + self.assertEqual(100, docs_primary.compressed_size_in_bytes) + self.assertEqual(10000, docs_primary.uncompressed_size_in_bytes) + self.assertEqual("historical-data-stream", docs_primary.target_data_stream) + self.assertIsNone(docs_primary.target_type) + self.assertIsNone(docs_primary.target_index) # challenges self.assertEqual(1, len(resulting_track.challenges)) diff --git a/tests/track/params_test.py b/tests/track/params_test.py index b52c5a165..83833d767 100644 --- a/tests/track/params_test.py +++ b/tests/track/params_test.py @@ -143,6 +143,16 @@ def test_generate_action_meta_data_without_id_conflicts(self): self.assertEqual(("index", '{"index": {"_index": "test_index", "_type": "test_type"}}\n'), next(params.GenerateActionMetaData("test_index", "test_type"))) + def test_generate_action_meta_data_create(self): + self.assertEqual(("create", '{"create": {"_index": "test_index"}}\n'), + next(params.GenerateActionMetaData("test_index", None, use_create=True))) + + def test_generate_action_meta_data_create_with_conflicts(self): + with self.assertRaises(exceptions.RallyError) as ctx: + params.GenerateActionMetaData("test_index", None, conflicting_ids=[100, 200, 300, 400], use_create=True) + self.assertEqual("Index mode '_create' cannot be used with conflicting ids", + ctx.exception.args[0]) + def test_generate_action_meta_data_typeless(self): self.assertEqual(("index", '{"index": {"_index": "test_index"}}\n'), next(params.GenerateActionMetaData("test_index", type_name=None))) @@ -471,16 +481,16 @@ def test_read_bulk_with_id_conflicts(self): bulks.append(bulk) self.assertEqual([ - b'{"index": {"_index": "test_index", "_type": "test_type", "_id": "100"}}\n' + - b'{"key": "value1"}\n' + - b'{"update": {"_index": "test_index", "_type": "test_type", "_id": "200"}}\n' + - b'{"doc":{"key": "value2"}}\n', - b'{"update": {"_index": "test_index", "_type": "test_type", "_id": "400"}}\n' + - b'{"doc":{"key": "value3"}}\n' + - b'{"update": {"_index": "test_index", "_type": "test_type", "_id": "300"}}\n' + - b'{"doc":{"key": "value4"}}\n', - b'{"index": {"_index": "test_index", "_type": "test_type", "_id": "200"}}\n' + - b'{"key": "value5"}\n' + b'{"index": {"_index": "test_index", "_type": "test_type", "_id": "100"}}\n' + + b'{"key": "value1"}\n' + + b'{"update": {"_index": "test_index", "_type": "test_type", "_id": "200"}}\n' + + b'{"doc":{"key": "value2"}}\n', + b'{"update": {"_index": "test_index", "_type": "test_type", "_id": "400"}}\n' + + b'{"doc":{"key": "value3"}}\n' + + b'{"update": {"_index": "test_index", "_type": "test_type", "_id": "300"}}\n' + + b'{"doc":{"key": "value4"}}\n', + b'{"index": {"_index": "test_index", "_type": "test_type", "_id": "200"}}\n' + + b'{"key": "value5"}\n' ], bulks) def test_read_bulk_with_external_id_and_zero_conflict_probability(self): @@ -513,15 +523,15 @@ def test_read_bulk_with_external_id_and_zero_conflict_probability(self): bulks.append(bulk) self.assertEqual([ - b'{"index": {"_index": "test_index", "_type": "test_type", "_id": "100"}}\n' + - b'{"key": "value1"}\n' + - b'{"index": {"_index": "test_index", "_type": "test_type", "_id": "200"}}\n' + - b'{"key": "value2"}\n', - - b'{"index": {"_index": "test_index", "_type": "test_type", "_id": "300"}}\n' + - b'{"key": "value3"}\n' + - b'{"index": {"_index": "test_index", "_type": "test_type", "_id": "400"}}\n' + - b'{"key": "value4"}\n' + b'{"index": {"_index": "test_index", "_type": "test_type", "_id": "100"}}\n' + + b'{"key": "value1"}\n' + + b'{"index": {"_index": "test_index", "_type": "test_type", "_id": "200"}}\n' + + b'{"key": "value2"}\n', + + b'{"index": {"_index": "test_index", "_type": "test_type", "_id": "300"}}\n' + + b'{"key": "value3"}\n' + + b'{"index": {"_index": "test_index", "_type": "test_type", "_id": "400"}}\n' + + b'{"key": "value4"}\n' ], bulks) def assert_bulks_sized(self, reader, expected_bulk_sizes, expected_line_sizes): @@ -588,22 +598,22 @@ def test_calculate_bounds(self): num_docs = 1000 clients = 2 - self.assertEqual(( 0, 500, 500), params.bounds(num_docs, 0, 0, clients, includes_action_and_meta_data=False)) + self.assertEqual((0, 500, 500), params.bounds(num_docs, 0, 0, clients, includes_action_and_meta_data=False)) self.assertEqual((500, 500, 500), params.bounds(num_docs, 1, 1, clients, includes_action_and_meta_data=False)) num_docs = 800 clients = 4 - self.assertEqual(( 0, 200, 400), params.bounds(num_docs, 0, 0, clients, includes_action_and_meta_data=True)) - self.assertEqual(( 400, 200, 400), params.bounds(num_docs, 1, 1, clients, includes_action_and_meta_data=True)) - self.assertEqual(( 800, 200, 400), params.bounds(num_docs, 2, 2, clients, includes_action_and_meta_data=True)) + self.assertEqual((0, 200, 400), params.bounds(num_docs, 0, 0, clients, includes_action_and_meta_data=True)) + self.assertEqual((400, 200, 400), params.bounds(num_docs, 1, 1, clients, includes_action_and_meta_data=True)) + self.assertEqual((800, 200, 400), params.bounds(num_docs, 2, 2, clients, includes_action_and_meta_data=True)) self.assertEqual((1200, 200, 400), params.bounds(num_docs, 3, 3, clients, includes_action_and_meta_data=True)) num_docs = 2000 clients = 8 - self.assertEqual(( 0, 250, 250), params.bounds(num_docs, 0, 0, clients, includes_action_and_meta_data=False)) - self.assertEqual(( 250, 250, 250), params.bounds(num_docs, 1, 1, clients, includes_action_and_meta_data=False)) - self.assertEqual(( 500, 250, 250), params.bounds(num_docs, 2, 2, clients, includes_action_and_meta_data=False)) - self.assertEqual(( 750, 250, 250), params.bounds(num_docs, 3, 3, clients, includes_action_and_meta_data=False)) + self.assertEqual((0, 250, 250), params.bounds(num_docs, 0, 0, clients, includes_action_and_meta_data=False)) + self.assertEqual((250, 250, 250), params.bounds(num_docs, 1, 1, clients, includes_action_and_meta_data=False)) + self.assertEqual((500, 250, 250), params.bounds(num_docs, 2, 2, clients, includes_action_and_meta_data=False)) + self.assertEqual((750, 250, 250), params.bounds(num_docs, 3, 3, clients, includes_action_and_meta_data=False)) self.assertEqual((1000, 250, 250), params.bounds(num_docs, 4, 4, clients, includes_action_and_meta_data=False)) self.assertEqual((1250, 250, 250), params.bounds(num_docs, 5, 5, clients, includes_action_and_meta_data=False)) self.assertEqual((1500, 250, 250), params.bounds(num_docs, 6, 6, clients, includes_action_and_meta_data=False)) @@ -614,14 +624,14 @@ def test_calculate_non_multiple_bounds_16_clients(self): # lines and every third client, one line more (1334). num_docs = 16000 clients = 12 - self.assertEqual(( 0, 1333, 1333), params.bounds(num_docs, 0, 0, clients, includes_action_and_meta_data=False)) - self.assertEqual(( 1333, 1334, 1334), params.bounds(num_docs, 1, 1, clients, includes_action_and_meta_data=False)) - self.assertEqual(( 2667, 1333, 1333), params.bounds(num_docs, 2, 2, clients, includes_action_and_meta_data=False)) - self.assertEqual(( 4000, 1333, 1333), params.bounds(num_docs, 3, 3, clients, includes_action_and_meta_data=False)) - self.assertEqual(( 5333, 1334, 1334), params.bounds(num_docs, 4, 4, clients, includes_action_and_meta_data=False)) - self.assertEqual(( 6667, 1333, 1333), params.bounds(num_docs, 5, 5, clients, includes_action_and_meta_data=False)) - self.assertEqual(( 8000, 1333, 1333), params.bounds(num_docs, 6, 6, clients, includes_action_and_meta_data=False)) - self.assertEqual(( 9333, 1334, 1334), params.bounds(num_docs, 7, 7, clients, includes_action_and_meta_data=False)) + self.assertEqual((0, 1333, 1333), params.bounds(num_docs, 0, 0, clients, includes_action_and_meta_data=False)) + self.assertEqual((1333, 1334, 1334), params.bounds(num_docs, 1, 1, clients, includes_action_and_meta_data=False)) + self.assertEqual((2667, 1333, 1333), params.bounds(num_docs, 2, 2, clients, includes_action_and_meta_data=False)) + self.assertEqual((4000, 1333, 1333), params.bounds(num_docs, 3, 3, clients, includes_action_and_meta_data=False)) + self.assertEqual((5333, 1334, 1334), params.bounds(num_docs, 4, 4, clients, includes_action_and_meta_data=False)) + self.assertEqual((6667, 1333, 1333), params.bounds(num_docs, 5, 5, clients, includes_action_and_meta_data=False)) + self.assertEqual((8000, 1333, 1333), params.bounds(num_docs, 6, 6, clients, includes_action_and_meta_data=False)) + self.assertEqual((9333, 1334, 1334), params.bounds(num_docs, 7, 7, clients, includes_action_and_meta_data=False)) self.assertEqual((10667, 1333, 1333), params.bounds(num_docs, 8, 8, clients, includes_action_and_meta_data=False)) self.assertEqual((12000, 1333, 1333), params.bounds(num_docs, 9, 9, clients, includes_action_and_meta_data=False)) self.assertEqual((13333, 1334, 1334), params.bounds(num_docs, 10, 10, clients, includes_action_and_meta_data=False)) @@ -632,7 +642,7 @@ def test_calculate_non_multiple_bounds_6_clients(self): # 2 * 583.333 docs = 1166.6666 lines per client. We let them read 1166 and 1168 lines respectively (583 and 584 docs). num_docs = 3500 clients = 6 - self.assertEqual(( 0, 583, 1166), params.bounds(num_docs, 0, 0, clients, includes_action_and_meta_data=True)) + self.assertEqual((0, 583, 1166), params.bounds(num_docs, 0, 0, clients, includes_action_and_meta_data=True)) self.assertEqual((1166, 584, 1168), params.bounds(num_docs, 1, 1, clients, includes_action_and_meta_data=True)) self.assertEqual((2334, 583, 1166), params.bounds(num_docs, 2, 2, clients, includes_action_and_meta_data=True)) self.assertEqual((3500, 583, 1166), params.bounds(num_docs, 3, 3, clients, includes_action_and_meta_data=True)) @@ -643,11 +653,11 @@ def test_calculate_bounds_for_multiple_clients_per_worker(self): num_docs = 2000 clients = 8 # four clients per worker, each reads 250 lines - self.assertEqual(( 0, 1000, 1000), params.bounds(num_docs, 0, 3, clients, includes_action_and_meta_data=False)) + self.assertEqual((0, 1000, 1000), params.bounds(num_docs, 0, 3, clients, includes_action_and_meta_data=False)) self.assertEqual((1000, 1000, 1000), params.bounds(num_docs, 4, 7, clients, includes_action_and_meta_data=False)) # four clients per worker, each reads 500 lines (includes action and metadata) - self.assertEqual(( 0, 1000, 2000), params.bounds(num_docs, 0, 3, clients, includes_action_and_meta_data=True)) + self.assertEqual((0, 1000, 2000), params.bounds(num_docs, 0, 3, clients, includes_action_and_meta_data=True)) self.assertEqual((2000, 1000, 2000), params.bounds(num_docs, 4, 7, clients, includes_action_and_meta_data=True)) def test_calculate_number_of_bulks(self): @@ -717,7 +727,6 @@ def test_create_without_corpora_definition(self): self.assertEqual("There is no document corpus definition for track unit-test. " "You must add at least one before making bulk requests to Elasticsearch.", ctx.exception.args[0]) - def test_create_with_non_numeric_bulk_size(self): corpus = track.DocumentCorpus(name="default", documents=[ track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, @@ -814,6 +823,15 @@ def test_create_with_unknown_on_conflict_setting(self): self.assertEqual("Unknown 'on-conflict' setting [delete]", ctx.exception.args[0]) + def test_create_with_conflicts_and_data_streams(self): + with self.assertRaises(exceptions.InvalidSyntax) as ctx: + params.BulkIndexParamSource(track=track.Track(name="unit-test"), params={ + "data-streams": ["test-data-stream-1", "test-data-stream-2"], + "conflicts": "sequential" + }) + + self.assertEqual("'conflicts' cannot be used with 'data-streams'", ctx.exception.args[0]) + def test_create_with_ingest_percentage_too_low(self): corpus = track.DocumentCorpus(name="default", documents=[ track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, @@ -939,6 +957,41 @@ def test_filters_corpora(self): partition = source.partition(0, 1) self.assertEqual(partition.corpora, [corpora[1]]) + def test_filters_corpora_by_data_stream(self): + corpora = [ + track.DocumentCorpus(name="default", documents=[ + track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, + number_of_documents=10, + target_data_stream="test-data-stream-1" + ) + ]), + track.DocumentCorpus(name="special", documents=[ + track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, + number_of_documents=100, + target_index="test-idx2", + target_type="type" + ) + ]), + track.DocumentCorpus(name="special-2", documents=[ + track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, + number_of_documents=10, + target_data_stream="test-data-stream-2" + ) + ]) + ] + + source = params.BulkIndexParamSource( + track=track.Track(name="unit-test", corpora=corpora), + params={ + "data-streams": ["test-data-stream-1", "test-data-stream-2"], + "bulk-size": 5000, + "batch-size": 20000, + "pipeline": "test-pipeline" + }) + + partition = source.partition(0, 1) + self.assertEqual(partition.corpora, [corpora[0], corpora[2]]) + def test_raises_exception_if_no_corpus_matches(self): corpus = track.DocumentCorpus(name="default", documents=[ track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, @@ -1499,6 +1552,47 @@ def test_filter_index(self): self.assertEqual("index2", index) +class CreateDataStreamParamSourceTests(TestCase): + def test_create_data_stream(self): + source = params.CreateDataStreamParamSource(track.Track(name="unit-test"), params={ + "data-stream": "test-data-stream" + }) + p = source.params() + self.assertEqual(1, len(p["data-streams"])) + ds = p["data-streams"][0] + self.assertEqual("test-data-stream", ds) + self.assertEqual({}, p["request-params"]) + + def test_create_data_stream_inline_without_body(self): + source = params.CreateDataStreamParamSource(track.Track(name="unit-test"), params={ + "data-stream": "test-data-stream", + "request-params": { + "wait_for_active_shards": True + } + }) + + p = source.params() + self.assertEqual(1, len(p["data-streams"])) + ds = p["data-streams"][0] + self.assertEqual("test-data-stream", ds) + self.assertDictEqual({ + "wait_for_active_shards": True + }, p["request-params"]) + + def test_filter_data_stream(self): + source = params.CreateDataStreamParamSource( + track.Track(name="unit-test", data_streams=[track.DataStream(name="data-stream-1"), + track.DataStream(name="data-stream-2"), + track.DataStream(name="data-stream-3")]), + params={"data-stream": "data-stream-2"}) + + p = source.params() + self.assertEqual(1, len(p["data-streams"])) + + ds = p["data-streams"][0] + self.assertEqual("data-stream-2", ds) + + class DeleteIndexParamSourceTests(TestCase): def test_delete_index_from_track(self): source = params.DeleteIndexParamSource(track.Track(name="unit-test", indices=[ @@ -1539,6 +1633,48 @@ def test_delete_no_index(self): self.assertEqual("delete-index operation targets no index", ctx.exception.args[0]) +class DeleteDataStreamParamSourceTests(TestCase): + def test_delete_data_stream_from_track(self): + source = params.DeleteDataStreamParamSource(track.Track(name="unit-test", data_streams=[ + track.DataStream(name="data-stream-1"), + track.DataStream(name="data-stream-2"), + track.DataStream(name="data-stream-3") + ]), params={}) + + p = source.params() + + self.assertEqual(["data-stream-1", "data-stream-2", "data-stream-3"], p["data-streams"]) + self.assertDictEqual({}, p["request-params"]) + self.assertTrue(p["only-if-exists"]) + + def test_filter_data_stream_from_track(self): + source = params.DeleteDataStreamParamSource(track.Track(name="unit-test", data_streams=[ + track.DataStream(name="data-stream-1"), + track.DataStream(name="data-stream-2"), + track.DataStream(name="data-stream-3") + ]), params={"data-stream": "data-stream-2", "only-if-exists": False, + "request-params": {"allow_no_indices": True}}) + + p = source.params() + + self.assertEqual(["data-stream-2"], p["data-streams"]) + self.assertDictEqual({"allow_no_indices": True}, p["request-params"]) + self.assertFalse(p["only-if-exists"]) + + def test_delete_data_stream_by_name(self): + source = params.DeleteDataStreamParamSource(track.Track(name="unit-test"), + params={"data-stream": "data-stream-2"}) + + p = source.params() + + self.assertEqual(["data-stream-2"], p["data-streams"]) + + def test_delete_no_data_stream(self): + with self.assertRaises(exceptions.InvalidSyntax) as ctx: + params.DeleteDataStreamParamSource(track.Track(name="unit-test"), params={}) + self.assertEqual("delete-data-stream operation targets no data stream", ctx.exception.args[0]) + + class CreateIndexTemplateParamSourceTests(TestCase): def test_create_index_template_inline(self): source = params.CreateIndexTemplateParamSource(track=track.Track(name="unit-test"), params={ @@ -1711,6 +1847,31 @@ def test_passes_cache(self): } }, p["body"]) + def test_uses_data_stream(self): + ds1 = track.DataStream(name="data-stream-1") + + source = params.SearchParamSource(track=track.Track(name="unit-test", data_streams=[ds1]), params={ + "body": { + "query": { + "match_all": {} + } + }, + "cache": True + }) + p = source.params() + + self.assertEqual(6, len(p)) + self.assertEqual("data-stream-1", p["index"]) + self.assertIsNone(p["type"]) + self.assertEqual({}, p["request-params"]) + self.assertEqual(True, p["cache"]) + self.assertEqual(True, p["response-compression-enabled"]) + self.assertEqual({ + "query": { + "match_all": {} + } + }, p["body"]) + def test_create_without_index(self): with self.assertRaises(exceptions.InvalidSyntax) as ctx: params.SearchParamSource(track=track.Track(name="unit-test"), params={ @@ -1718,11 +1879,11 @@ def test_create_without_index(self): "body": { "query": { "match_all": {} - } } + } }, operation_name="test_operation") - self.assertEqual("'index' is mandatory and is missing for operation 'test_operation'", ctx.exception.args[0]) + self.assertEqual("'index' or 'data-stream' is mandatory and is missing for operation 'test_operation'", ctx.exception.args[0]) def test_passes_request_parameters(self): index1 = track.Index(name="index1", types=["type1"]) @@ -1782,6 +1943,34 @@ def test_user_specified_overrides_defaults(self): } }, p["body"]) + def test_user_specified_data_stream_overrides_defaults(self): + ds1 = track.DataStream(name="data-stream-1") + + source = params.SearchParamSource(track=track.Track(name="unit-test", data_streams=[ds1]), params={ + "data-stream": "data-stream-2", + "cache": False, + "response-compression-enabled": False, + "body": { + "query": { + "match_all": {} + } + } + }) + p = source.params() + + self.assertEqual(6, len(p)) + self.assertEqual("data-stream-2", p["index"]) + self.assertIsNone(p["type"]) + self.assertDictEqual({}, p["request-params"]) + # Explicitly check for equality to `False` - assertFalse would also succeed if it is `None`. + self.assertEqual(False, p["cache"]) + self.assertEqual(False, p["response-compression-enabled"]) + self.assertEqual({ + "query": { + "match_all": {} + } + }, p["body"]) + def test_replaces_body_params(self): import copy @@ -1808,9 +1997,27 @@ def test_replaces_body_params(self): # the implementation modifies the internal dict in-place (safe because we only have one client per process) hence we need to copy. first = copy.deepcopy(search.params(choice=lambda d: d[0])) second = copy.deepcopy(search.params(choice=lambda d: d[1])) - self.assertNotEqual(first, second) + def test_invalid_data_stream_with_type(self): + with self.assertRaises(exceptions.InvalidSyntax) as ctx: + ds1 = track.DataStream(name="data-stream-1") + + params.SearchParamSource(track=track.Track(name="unit-test", data_streams=[ds1]), params={ + "data-stream": "data-stream-2", + "type": "_doc", + "cache": False, + "response-compression-enabled": False, + "body": { + "query": { + "match_all": {} + } + } + }, operation_name="test_operation") + + self.assertEqual("'type' not supported with 'data-stream' for operation 'test_operation'", + ctx.exception.args[0]) + class ForceMergeParamSourceTests(TestCase): def test_force_merge_index_from_track(self): @@ -1825,6 +2032,18 @@ def test_force_merge_index_from_track(self): self.assertEqual("index1,index2,index3", p["index"]) self.assertEqual("blocking", p["mode"]) + def test_force_merge_data_stream_from_track(self): + source = params.ForceMergeParamSource(track.Track(name="unit-test", data_streams=[ + track.DataStream(name="data-stream-1"), + track.DataStream(name="data-stream-2"), + track.DataStream(name="data-stream-3") + ]), params={}) + + p = source.params() + + self.assertEqual("data-stream-1,data-stream-2,data-stream-3", p["index"]) + self.assertEqual("blocking", p["mode"]) + def test_force_merge_index_by_name(self): source = params.ForceMergeParamSource(track.Track(name="unit-test"), params={"index": "index2"}) @@ -1833,6 +2052,14 @@ def test_force_merge_index_by_name(self): self.assertEqual("index2", p["index"]) self.assertEqual("blocking", p["mode"]) + def test_force_merge_by_data_stream_name(self): + source = params.ForceMergeParamSource(track.Track(name="unit-test"), params={"data-stream": "data-stream-2"}) + + p = source.params() + + self.assertEqual("data-stream-2", p["index"]) + self.assertEqual("blocking", p["mode"]) + def test_default_force_merge_index(self): source = params.ForceMergeParamSource(track.Track(name="unit-test"), params={}) diff --git a/tests/track/track_test.py b/tests/track/track_test.py index 6b768efc8..14d0e2950 100644 --- a/tests/track/track_test.py +++ b/tests/track/track_test.py @@ -86,6 +86,22 @@ def test_str(self): self.assertEqual("test", str(track.Index("test"))) +class DataStreamTests(TestCase): + def test_matches_exactly(self): + self.assertTrue(track.DataStream("test").matches("test")) + self.assertFalse(track.DataStream("test").matches(" test")) + + def test_matches_if_no_pattern_is_defined(self): + self.assertTrue(track.DataStream("test").matches(pattern=None)) + + def test_matches_if_catch_all_pattern_is_defined(self): + self.assertTrue(track.DataStream("test").matches(pattern="*")) + self.assertTrue(track.DataStream("test").matches(pattern="_all")) + + def test_str(self): + self.assertEqual("test", str(track.DataStream("test"))) + + class DocumentCorpusTests(TestCase): def test_do_not_filter(self): corpus = track.DocumentCorpus("test", documents=[ @@ -129,6 +145,21 @@ def test_filter_documents_by_indices(self): self.assertEqual(1, len(filtered_corpus.documents)) self.assertEqual("logs-02", filtered_corpus.documents[0].target_index) + def test_filter_documents_by_data_streams(self): + corpus = track.DocumentCorpus("test", documents=[ + track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, number_of_documents=5, + target_data_stream="logs-01"), + track.Documents(source_format="other", number_of_documents=6, target_data_stream="logs-02"), + track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, number_of_documents=7, + target_data_stream="logs-03"), + track.Documents(source_format=None, number_of_documents=8, target_data_stream=None) + ]) + + filtered_corpus = corpus.filter(target_data_streams=["logs-02"]) + self.assertEqual("test", filtered_corpus.name) + self.assertEqual(1, len(filtered_corpus.documents)) + self.assertEqual("logs-02", filtered_corpus.documents[0].target_data_stream) + def test_filter_documents_by_format_and_indices(self): corpus = track.DocumentCorpus("test", documents=[ track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, number_of_documents=5, target_index="logs-01"),