Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Datastreams #1092

Merged
merged 9 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def register_default_runners():
register_runner(track.OperationType.Refresh.name, Retry(Refresh()), async_runner=True)
register_runner(track.OperationType.CreateIndex.name, Retry(CreateIndex()), async_runner=True)
register_runner(track.OperationType.DeleteIndex.name, Retry(DeleteIndex()), async_runner=True)
register_runner(track.OperationType.CreateDataStream.name, Retry(CreateDataStream()), async_runner=True)
register_runner(track.OperationType.DeleteDataStream.name, Retry(DeleteDataStream()), async_runner=True)
register_runner(track.OperationType.CreateIndexTemplate.name, Retry(CreateIndexTemplate()), async_runner=True)
register_runner(track.OperationType.DeleteIndexTemplate.name, Retry(DeleteIndexTemplate()), async_runner=True)
register_runner(track.OperationType.ShrinkIndex.name, Retry(ShrinkIndex()), async_runner=True)
Expand Down Expand Up @@ -1031,6 +1033,22 @@ def __repr__(self, *args, **kwargs):
return "create-index"


class CreateDataStream(Runner):
"""
Execute the `create data stream API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-data-stream.html>`_.
"""

async def __call__(self, es, params):
data_streams = mandatory(params, "data-streams", self)
request_params = mandatory(params, "request-params", self)
for data_stream in data_streams:
await es.indices.create_data_stream(data_stream, params=request_params)
return len(data_streams), "ops"

def __repr__(self, *args, **kwargs):
return "create-data-stream"


class DeleteIndex(Runner):
"""
Execute the `delete index API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-index.html>`_.
Expand Down Expand Up @@ -1058,6 +1076,33 @@ def __repr__(self, *args, **kwargs):
return "delete-index"


class DeleteDataStream(Runner):
"""
Execute the `delete data stream API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-data-stream.html>`_.
"""

async def __call__(self, es, params):
ops = 0

data_streams = mandatory(params, "data-streams", self)
only_if_exists = mandatory(params, "only-if-exists", self)
request_params = mandatory(params, "request-params", self)

for data_stream in data_streams:
if not only_if_exists:
await es.indices.delete_data_stream(data_stream, ignore=[404], params=request_params)
ops += 1
elif only_if_exists and await es.indices.exists(index=data_stream):
self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream)
await es.indices.delete_data_stream(data_stream, params=request_params)
ops += 1

return ops, "ops"

def __repr__(self, *args, **kwargs):
return "delete-data-stream"


class CreateIndexTemplate(Runner):
"""
Execute the `PUT index template API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html>`_.
Expand Down
24 changes: 23 additions & 1 deletion esrally/resources/track-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,24 @@
]
}
},
"data-streams": {
"type": "array",
"minItems": 1,
"uniqueItems": true,
"items": {
"title": "Data Stream",
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the data stream to create."
}
},
"required": [
"name"
]
}
},
"corpora": {
"type": "array",
"minItems": 1,
Expand Down Expand Up @@ -291,6 +309,10 @@
"type": "string",
"description": "The name of the associated index (if any)."
},
"target-data-stream": {
"type": "string",
"description": "The name of the associated data stream (if any)."
},
"target-type": {
"type": "string",
"description": "The name of the associated document type (if any)."
Expand Down Expand Up @@ -469,4 +491,4 @@
"$ref": "#/definitions/schedule"
}
}
}
}
79 changes: 64 additions & 15 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ def filter_out_match(task, filters, exclude):

return t


def filters_from_filtered_tasks(filtered_tasks):
filters = []
if filtered_tasks:
Expand Down Expand Up @@ -987,13 +988,19 @@ def __call__(self, track_name, track_specification, mapping_dir):
meta_data = self._r(track_specification, "meta", mandatory=False)
indices = [self._create_index(idx, mapping_dir)
for idx in self._r(track_specification, "indices", mandatory=False, default_value=[])]
data_streams = [self._create_data_stream(idx)
for idx in self._r(track_specification, "data-streams", mandatory=False, default_value=[])]
if len(indices) > 0 and len(data_streams) > 0:
# we guard against this early and support either or
raise TrackSyntaxError("indices and data-streams cannot both be specified")
templates = [self._create_index_template(tpl, mapping_dir)
for tpl in self._r(track_specification, "templates", mandatory=False, default_value=[])]
corpora = self._create_corpora(self._r(track_specification, "corpora", mandatory=False, default_value=[]), indices)
corpora = self._create_corpora(self._r(track_specification, "corpora", mandatory=False, default_value=[]),
indices, data_streams)
challenges = self._create_challenges(track_specification)
# at this point, *all* track params must have been referenced in the templates
return track.Track(name=self.name, meta_data=meta_data, description=description, challenges=challenges, indices=indices,
templates=templates, corpora=corpora)
return track.Track(name=self.name, meta_data=meta_data, description=description, challenges=challenges,
indices=indices, data_streams=data_streams, templates=templates, corpora=corpora)

def _error(self, msg):
raise TrackSyntaxError("Track '%s' is invalid. %s" % (self.name, msg))
Expand Down Expand Up @@ -1031,6 +1038,9 @@ def _create_index(self, index_spec, mapping_dir):

return track.Index(name=index_name, body=body, types=self._r(index_spec, "types", mandatory=False, default_value=[]))

def _create_data_stream(self, data_stream_spec):
return track.DataStream(name=self._r(data_stream_spec, "name"))

def _create_index_template(self, tpl_spec, mapping_dir):
name = self._r(tpl_spec, "name")
template_file = self._r(tpl_spec, "template")
Expand All @@ -1056,7 +1066,9 @@ def _load_template(self, contents, description):
self.logger.exception("Could not load file template for %s.", description)
raise TrackSyntaxError("Could not load file template for '%s'" % description, str(e))

def _create_corpora(self, corpora_specs, indices):
def _create_corpora(self, corpora_specs, indices, data_streams):
if len(indices) > 0 and len(data_streams) > 0:
raise TrackSyntaxError("indices and data-streams cannot both be specified")
document_corpora = []
known_corpora_names = set()
for corpus_spec in corpora_specs:
Expand All @@ -1069,17 +1081,29 @@ def _create_corpora(self, corpora_specs, indices):
corpus = track.DocumentCorpus(name=name)
# defaults on corpus level
default_base_url = self._r(corpus_spec, "base-url", mandatory=False, default_value=None)
default_source_format = self._r(corpus_spec, "source-format", mandatory=False, default_value=track.Documents.SOURCE_FORMAT_BULK)
default_action_and_meta_data = self._r(corpus_spec, "includes-action-and-meta-data", mandatory=False, default_value=False)
default_source_format = self._r(corpus_spec, "source-format", mandatory=False,
default_value=track.Documents.SOURCE_FORMAT_BULK)
default_action_and_meta_data = self._r(corpus_spec, "includes-action-and-meta-data", mandatory=False,
default_value=False)
corpus_target_idx = None
corpus_target_ds = None
corpus_target_type = None

if len(indices) == 1:
corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False, default_value=indices[0].name)
else:
elif len(indices) > 0:
corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False)

if len(data_streams) == 1:
corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False,
default_value=data_streams[0].name)
elif len(data_streams) > 0:
corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False)

if len(indices) == 1 and len(indices[0].types) == 1:
corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False, default_value=indices[0].types[0])
else:
corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False,
default_value=indices[0].types[0])
elif len(indices) > 0:
corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False)

for doc_spec in self._r(corpus_spec, "documents"):
Expand All @@ -1103,13 +1127,37 @@ def _create_corpora(self, corpora_specs, indices):
if includes_action_and_meta_data:
target_idx = None
target_type = None
target_ds = None
else:
# we need an index if no meta-data are present.
target_idx = self._r(doc_spec, "target-index", mandatory=corpus_target_idx is None,
default_value=corpus_target_idx, error_ctx=docs)
target_type = self._r(doc_spec, "target-type", mandatory=False,
default_value=corpus_target_type, error_ctx=docs)

# require to be specified if we're using data streams and we have no default
target_ds = self._r(doc_spec, "target-data-stream",
mandatory=len(data_streams) > 0 and corpus_target_ds is None,
default_value=corpus_target_ds,
error_ctx=docs)
if target_ds and len(indices) > 0:
# if indices are in use we error
raise TrackSyntaxError("target-data-stream cannot be used when using indices")
elif target_ds and target_type:
raise TrackSyntaxError("target-type cannot be used when using data-streams")

# need an index if we're using indices and no meta-data are present and we don't have a default
target_idx = self._r(doc_spec, "target-index",
mandatory=len(indices) > 0 and corpus_target_idx is None,
default_value=corpus_target_idx,
error_ctx=docs)
# either target_idx or target_ds
if target_idx and len(data_streams) > 0:
# if data streams are in use we error
raise TrackSyntaxError("target-index cannot be used when using data-streams")

# we need one or the other
if target_idx is None and target_ds is None:
raise TrackSyntaxError(f"a {'target-index' if len(indices) > 0 else 'target-data-stream'} "
f"is required for {docs}" )

docs = track.Documents(source_format=source_format,
document_file=document_file,
document_archive=document_archive,
Expand All @@ -1118,11 +1166,11 @@ def _create_corpora(self, corpora_specs, indices):
number_of_documents=num_docs,
compressed_size_in_bytes=compressed_bytes,
uncompressed_size_in_bytes=uncompressed_bytes,
target_index=target_idx, target_type=target_type)
target_index=target_idx, target_type=target_type,
target_data_stream=target_ds)
corpus.documents.append(docs)
else:
self._error("Unknown source-format [%s] in document corpus [%s]." % (source_format, name))

document_corpora.append(corpus)
return document_corpora

Expand Down Expand Up @@ -1257,7 +1305,8 @@ def parse_task(self, task_spec, ops, challenge_name, default_warmup_iterations=N
warmup_iterations=self._r(task_spec, "warmup-iterations", error_ctx=op.name, mandatory=False,
default_value=default_warmup_iterations),
iterations=self._r(task_spec, "iterations", error_ctx=op.name, mandatory=False, default_value=default_iterations),
warmup_time_period=self._r(task_spec, "warmup-time-period", error_ctx=op.name, mandatory=False,
warmup_time_period=self._r(task_spec, "warmup-time-period", error_ctx=op.name,
mandatory=False,
default_value=default_warmup_time_period),
time_period=self._r(task_spec, "time-period", error_ctx=op.name, mandatory=False,
default_value=default_time_period),
Expand Down
Loading