Skip to content

Commit

Permalink
Improve simulation of bulk-indexing conflicts
Browse files Browse the repository at this point in the history
* Improve simulation of bulk-indexing conflicts

With this commit we introduce a new property `conflict-probability` for
the bulk-indexing parameter source. Previously we had a hard-codded
probability of 25% but now the user can control it.

We also add a new property `on-conflict` which allows users
to define whether the action-and-metadata line should use "index" or
"update" on simulated id conflicts (the default is "index").

Closes #422
  • Loading branch information
danielmitterdorfer authored Apr 25, 2018
1 parent 8e6c39a commit 581b7b1
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 55 deletions.
2 changes: 2 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ With the operation type ``bulk`` you can execute `bulk requests <http://www.elas
* ``batch-size`` (optional): Defines how many documents Rally will read at once. This is an expert setting and only meant to avoid accidental bottlenecks for very small bulk sizes (e.g. if you want to benchmark with a bulk-size of 1, you should set ``batch-size`` higher).
* ``pipeline`` (optional): Defines the name of an (existing) ingest pipeline that should be used (only supported from Elasticsearch 5.0).
* ``conflicts`` (optional): Type of index conflicts to simulate. If not specified, no conflicts will be simulated. Valid values are: 'sequential' (A document id is replaced with a document id with a sequentially increasing id), 'random' (A document id is replaced with a document id with a random other id).
* ``conflict-probability`` (optional, defaults to 25 percent): A number between (0, 100] that defines how many of the documents will get replaced.
* ``on-conflict`` (optional, defaults to ``index``): Determines whether Rally should use the action ``index`` or ``update`` on id conflicts.
* ``detailed-results`` (optional, defaults to ``false``): Records more detailed meta-data for bulk requests. As it analyzes the corresponding bulk response in more detail, this might incur additional overhead which can skew measurement results.

Example::
Expand Down
92 changes: 66 additions & 26 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,15 @@ def __init__(self, track, params, **kwargs):
else:
raise exceptions.InvalidSyntax("Unknown 'conflicts' setting [%s]" % id_conflicts)

if self.id_conflicts != IndexIdConflict.NoConflicts:
self.conflict_probability = self.float_param(params, name="conflict-probability", default_value=25, min_value=0, max_value=100)
self.on_conflict = params.get("on-conflict", "index")
if self.on_conflict not in ["index", "update"]:
raise exceptions.InvalidSyntax("Unknown 'on-conflict' setting [{}]".format(self.on_conflict))
else:
self.conflict_probability = None
self.on_conflict = None

self.corpora = self.used_corpora(track, params)

for corpus in self.corpora:
Expand Down Expand Up @@ -473,13 +482,17 @@ def __init__(self, track, params, **kwargs):
except ValueError:
raise exceptions.InvalidSyntax("'batch-size' must be numeric")

self.ingest_percentage = self.float_param(params, name="ingest-percentage", default_value=100, min_value=0, max_value=100)

def float_param(self, params, name, default_value, min_value, max_value):
try:
self.ingest_percentage = float(params.get("ingest-percentage", 100.0))
if self.ingest_percentage <= 0 or self.ingest_percentage > 100.0:
value = float(params.get(name, default_value))
if value <= min_value or value > max_value:
raise exceptions.InvalidSyntax(
"'ingest-percentage' must be in the range (0.0, 100.0] but was {:.1f}".format(self.ingest_percentage))
"'{}' must be in the range ({:.1f}, {:.1f}] but was {:.1f}".format(name, min_value, max_value, value))
return value
except ValueError:
raise exceptions.InvalidSyntax("'ingest-percentage' must be numeric")
raise exceptions.InvalidSyntax("'{}' must be numeric".format(name))

def used_corpora(self, t, params):
corpora = []
Expand All @@ -503,7 +516,8 @@ def used_corpora(self, t, params):

def partition(self, partition_index, total_partitions):
return PartitionBulkIndexParamSource(self.corpora, partition_index, total_partitions, self.batch_size, self.bulk_size,
self.ingest_percentage, self.id_conflicts, self.pipeline, self._params)
self.ingest_percentage, self.id_conflicts, self.conflict_probability, self.on_conflict,
self.pipeline, self._params)

def params(self):
raise exceptions.RallyError("Do not use a BulkIndexParamSource without partitioning")
Expand All @@ -513,8 +527,8 @@ def size(self):


class PartitionBulkIndexParamSource:
def __init__(self, corpora, partition_index, total_partitions, batch_size, bulk_size, ingest_percentage, id_conflicts=None,
pipeline=None, original_params=None):
def __init__(self, corpora, partition_index, total_partitions, batch_size, bulk_size, ingest_percentage,
id_conflicts, conflict_probability, on_conflict, pipeline=None, original_params=None):
"""
:param corpora: Specification of affected document corpora.
Expand All @@ -524,6 +538,8 @@ def __init__(self, corpora, partition_index, total_partitions, batch_size, bulk_
:param bulk_size: The size of bulk index operations (number of documents per bulk).
:param ingest_percentage: A number between (0.0, 100.0] that defines how much of the whole corpus should be ingested.
:param id_conflicts: The type of id conflicts.
:param conflict_probability: A number between (0.0, 100.0] that defines the probability that a document is replaced by another one.
:param on_conflict: A string indicating which action should be taken on id conflicts (either "index" or "update").
:param pipeline: The name of the ingest pipeline to run.
:param original_params: The original dict passed to the parent parameter source.
"""
Expand All @@ -536,7 +552,7 @@ def __init__(self, corpora, partition_index, total_partitions, batch_size, bulk_
self.id_conflicts = id_conflicts
self.pipeline = pipeline
self.internal_params = bulk_data_based(total_partitions, partition_index, corpora, batch_size,
bulk_size, id_conflicts, pipeline, original_params)
bulk_size, id_conflicts, conflict_probability, on_conflict, pipeline, original_params)

def partition(self, partition_index, total_partitions):
raise exceptions.RallyError("Cannot partition a PartitionBulkIndexParamSource further")
Expand Down Expand Up @@ -593,18 +609,20 @@ def chain(*iterables):
yield element


def create_default_reader(docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts):
def create_default_reader(docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict):
source = Slice(io.FileSource, offset, num_lines)

if docs.includes_action_and_meta_data:
am_handler = SourceActionMetaData(source)
else:
am_handler = GenerateActionMetaData(docs.target_index, docs.target_type, build_conflicting_ids(id_conflicts, num_docs, offset))
am_handler = GenerateActionMetaData(docs.target_index, docs.target_type,
build_conflicting_ids(id_conflicts, num_docs, offset), conflict_probability, on_conflict)

return IndexDataReader(docs.document_file, batch_size, bulk_size, source, am_handler, docs.target_index, docs.target_type)


def create_readers(num_clients, client_index, corpora, batch_size, bulk_size, id_conflicts, create_reader):
def create_readers(num_clients, client_index, corpora, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict,
create_reader):
readers = []
for corpus in corpora:
for docs in corpus.documents:
Expand All @@ -613,7 +631,8 @@ def create_readers(num_clients, client_index, corpora, batch_size, bulk_size, id
if num_docs > 0:
logger.info("Task-relative client at index [%d] will bulk index [%d] docs starting from line offset [%d] for [%s/%s] "
"from corpus [%s]." % (client_index, num_docs, offset, docs.target_index, docs.target_type, corpus.name))
readers.append(create_reader(docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts))
readers.append(create_reader(docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability,
on_conflict))
else:
logger.info("Task-relative client at index [%d] skips [%s] (no documents to read)." % (client_index, corpus.name))
return readers
Expand Down Expand Up @@ -673,8 +692,8 @@ def bulk_generator(readers, client_index, pipeline, original_params):
yield params


def bulk_data_based(num_clients, client_index, corpora, batch_size, bulk_size, id_conflicts, pipeline, original_params,
create_reader=create_default_reader):
def bulk_data_based(num_clients, client_index, corpora, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, pipeline,
original_params, create_reader=create_default_reader):
"""
Calculates the necessary schedule for bulk operations.
Expand All @@ -684,38 +703,54 @@ def bulk_data_based(num_clients, client_index, corpora, batch_size, bulk_size, i
:param batch_size: The number of documents to read in one go.
:param bulk_size: The size of bulk index operations (number of documents per bulk).
:param id_conflicts: The type of id conflicts to simulate.
:param conflict_probability: A number between (0.0, 100.0] that defines the probability that a document is replaced by another one.
:param on_conflict: A string indicating which action should be taken on id conflicts (either "index" or "update").
:param pipeline: Name of the ingest pipeline to use. May be None.
:param original_params: A dict of original parameters that were passed from the track. They will be merged into the returned parameters.
:param create_reader: A function to create the index reader. By default a file based index reader will be created. This parameter is
intended for testing only.
:return: A generator for the bulk operations of the given client.
"""
readers = create_readers(num_clients, client_index, corpora, batch_size, bulk_size, id_conflicts, create_reader)
readers = create_readers(num_clients, client_index, corpora, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict,
create_reader)
return bulk_generator(chain(*readers), client_index, pipeline, original_params)


class GenerateActionMetaData:
def __init__(self, index_name, type_name, conflicting_ids, rand=random.randint):
def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probability=None, on_conflict=None,
rand=random.random, randint=random.randint):
self.index_name = index_name
self.type_name = type_name
self.conflicting_ids = conflicting_ids
self.on_conflict = on_conflict
# random() produces numbers between 0 and 1 and the user denotes the probability in percentage between 0 and 100.
self.conflict_probability = conflict_probability / 100.0 if conflict_probability else None

self.rand = rand
self.randint = randint
self.id_up_to = 0

def __iter__(self):
return self

def __next__(self):
if self.conflicting_ids is not None:
# 25% of the time we replace a doc:
if self.id_up_to > 0 and self.rand(0, 3) == 3:
doc_id = self.conflicting_ids[self.rand(0, self.id_up_to - 1)]
if self.id_up_to > 0 and self.rand() <= self.conflict_probability:
doc_id = self.conflicting_ids[self.randint(0, self.id_up_to - 1)]
action = self.on_conflict
else:
doc_id = self.conflicting_ids[self.id_up_to]
self.id_up_to += 1
return '{"index": {"_index": "%s", "_type": "%s", "_id": "%s"}}' % (self.index_name, self.type_name, doc_id)
action = "index"

if action == "index":
return "index", '{"index": {"_index": "%s", "_type": "%s", "_id": "%s"}}' % (self.index_name, self.type_name, doc_id)
elif action == "update":
return "update", '{"update": {"_index": "%s", "_type": "%s", "_id": "%s"}}' % (self.index_name, self.type_name, doc_id)
else:
raise exceptions.RallyAssertionError("Unknown action [{}]".format(action))
else:
return '{"index": {"_index": "%s", "_type": "%s"}}' % (self.index_name, self.type_name)
return "index", '{"index": {"_index": "%s", "_type": "%s"}}' % (self.index_name, self.type_name)


class SourceActionMetaData:
Expand All @@ -726,7 +761,7 @@ def __iter__(self):
return self

def __next__(self):
return next(self.source)
return "source", next(self.source)


class Slice:
Expand Down Expand Up @@ -815,11 +850,16 @@ def __next__(self):
def read_bulk(self):
docs_in_bulk = 0
current_bulk = []

for action_metadata_line, document in zip(self.action_metadata, self.file_source):
if action_metadata_line:
for action_metadata_item, document in zip(self.action_metadata, self.file_source):
if action_metadata_item:
action_type, action_metadata_line = action_metadata_item
current_bulk.append(action_metadata_line)
current_bulk.append(document)
if action_type == "update":
current_bulk.append("{\"doc\":%s}" % document)
else:
current_bulk.append(document)
else:
current_bulk.append(document)
docs_in_bulk += 1
if docs_in_bulk == self.bulk_size:
break
Expand Down
Loading

0 comments on commit 581b7b1

Please sign in to comment.