Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize on corpora basis in bulk task clients #1412

Merged
merged 5 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ def chain(*iterables):
:param iterables: A number of iterable that should be chained.
:return: An iterable that will delegate to all provided iterables in turn.
"""
for it in iterables:
for it in filter(lambda x: x is not None, iterables):
# execute within a context
with it:
for element in it:
Expand Down Expand Up @@ -936,9 +936,11 @@ def create_readers(
create_reader,
):
logger = logging.getLogger(__name__)
readers = []
for corpus in corpora:
for docs in corpus.documents:
# pre-initialize in order to assign parallel tasks for different corpora through assignment
readers = [None for corpus in corpora for _ in corpus.documents] * num_clients
# stagger which corpus each client starts with for better parallelism
for group, corpus in enumerate(corpora[(start_client_index + mod) % len(corpora)] for mod in range(len(corpora))):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

primarily looking for feedback on the round-robin approach implemented between this line and line 961 below. I feel like it's unreadable but was stumped with regards making it better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this approach makes sense. As you correctly identified, there are tracks like http_logs that currently have the assumption that specific corpus files contain a certain timerange of events and to run them in a realistic way we'd need the existing sequential strategy or change the corpora files in the track.

I'd be interested to see the impact of this approach also in the solutions/logs tracks.

for entry, docs in enumerate(corpus.documents):
offset, num_docs, num_lines = bounds(
docs.number_of_documents, start_client_index, end_client_index, num_clients, docs.includes_action_and_meta_data
)
Expand All @@ -956,10 +958,8 @@ def create_readers(
target,
corpus.name,
)
readers.append(
create_reader(
docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency
)
readers[len(corpora) * entry + group] = create_reader(
docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency
)
else:
logger.info(
Expand Down
4 changes: 2 additions & 2 deletions tests/track/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,7 @@ def test_generate_bulks_from_multiple_corpora(self):
"body": ["1", "2", "3", "4", "5"],
"bulk-size": 5,
"unit": "docs",
"index": "logs-2018-02",
"index": "logs-2017-01",
"type": "docs",
"my-custom-parameter": "foo",
"my-custom-parameter-2": True,
Expand All @@ -1481,7 +1481,7 @@ def test_generate_bulks_from_multiple_corpora(self):
"body": ["1", "2", "3", "4", "5"],
"bulk-size": 5,
"unit": "docs",
"index": "logs-2017-01",
"index": "logs-2018-02",
"type": "docs",
"my-custom-parameter": "foo",
"my-custom-parameter-2": True,
Expand Down