Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load only required corpora #555

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,19 @@ def operation_parameters(t, op):
return params.param_source_for_operation(op.type, t, op.params)


def used_corpora(t, cfg):
corpora = {}
challenge = t.find_challenge_or_default(cfg.opts("track", "challenge.name"))
for task in challenge.schedule:
for sub_task in task:
param_source = operation_parameters(t, sub_task.operation)
if hasattr(param_source, "corpora"):
for c in param_source.corpora:
# We might have the same corpus *but* they contain different doc sets. Therefore also need to union over doc sets.
corpora[c.name] = corpora.get(c.name, c).union(c)
return corpora.values()


def prepare_track(t, cfg):
"""
Ensures that all track data are available for running the benchmark.
Expand All @@ -238,7 +251,7 @@ def prepare_track(t, cfg):
logger = logging.getLogger(__name__)
offline = cfg.opts("system", "offline.mode")
test_mode = cfg.opts("track", "test.mode.enabled")
for corpus in t.corpora:
for corpus in used_corpora(t, cfg):
data_root = data_dir(cfg, t.name, corpus.name)
logger.info("Resolved data root directory for document corpus [%s] in track [%s] to %s.", corpus.name, t.name, data_root)
prep = DocumentSetPreparator(t.name, offline, test_mode)
Expand Down
8 changes: 8 additions & 0 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ def filter(self, source_format=None, target_indices=None):
filtered.append(d)
return DocumentCorpus(self.name, filtered)

def union(self, other):
if self.name != other.name:
raise exceptions.RallyAssertionError("Both document corpora must have the same name")
if self is other:
return self
else:
return DocumentCorpus(self.name, list(set(self.documents).union(other.documents)))

def __str__(self):
return self.name

Expand Down
134 changes: 134 additions & 0 deletions tests/track/loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,140 @@ def test_prepare_bundled_document_set_does_nothing_if_no_document_files(self, is
self.assertEqual(0, decompress.call_count)
self.assertEqual(0, prepare_file_offset_table.call_count)

def test_used_corpora(self):
cfg = config.Config()
cfg.add(config.Scope.application, "track", "challenge.name", "default-challenge")
track_specification = {
"description": "description for unit test",
"indices": [
{"name": "logs-181998"},
{"name": "logs-191998"},
{"name": "logs-201998"},
],
"corpora": [
{
"name": "http_logs_unparsed",
"target-type": "type",
"documents": [
{
"target-index": "logs-181998",
"source-file": "documents-181998.unparsed.json.bz2",
"document-count": 2708746,
"compressed-bytes": 13064317,
"uncompressed-bytes": 303920342
},
{
"target-index": "logs-191998",
"source-file": "documents-191998.unparsed.json.bz2",
"document-count": 9697882,
"compressed-bytes": 47211781,
"uncompressed-bytes": 1088378738
},
{
"target-index": "logs-201998",
"source-file": "documents-201998.unparsed.json.bz2",
"document-count": 13053463,
"compressed-bytes": 63174979,
"uncompressed-bytes": 1456836090
}
]
},
{
"name": "http_logs",
"target-type": "type",
"documents": [
{
"target-index": "logs-181998",
"source-file": "documents-181998.json.bz2",
"document-count": 2708746,
"compressed-bytes": 13815456,
"uncompressed-bytes": 363512754
},
{
"target-index": "logs-191998",
"source-file": "documents-191998.json.bz2",
"document-count": 9697882,
"compressed-bytes": 49439633,
"uncompressed-bytes": 1301732149
},
{
"target-index": "logs-201998",
"source-file": "documents-201998.json.bz2",
"document-count": 13053463,
"compressed-bytes": 65623436,
"uncompressed-bytes": 1744012279
}
]
}
],
"operations": [
{
"name": "bulk-index-1",
"operation-type": "bulk",
"corpora": ["http_logs"],
"indices": ["logs-181998"],
"bulk-size": 500
},
{
"name": "bulk-index-2",
"operation-type": "bulk",
"corpora": ["http_logs"],
"indices": ["logs-191998"],
"bulk-size": 500
},
{
"name": "bulk-index-3",
"operation-type": "bulk",
"corpora": ["http_logs_unparsed"],
"indices": ["logs-201998"],
"bulk-size": 500
},
{
"name": "node-stats",
"operation-type": "node-stats"
},
],
"challenges": [
{
"name": "default-challenge",
"schedule": [
{
"parallel": {
"tasks": [
{
"name": "index-1",
"operation": "bulk-index-1",
},
{
"name": "index-2",
"operation": "bulk-index-2",
},
{
"name": "index-3",
"operation": "bulk-index-3",
},
]
}
},
{
"operation": "node-stats"
}
]
}
]
}
reader = loader.TrackSpecificationReader()
full_track = reader("unittest", track_specification, "/mappings")
used_corpora = sorted(loader.used_corpora(full_track, cfg), key=lambda c: c.name)
self.assertEqual(2, len(used_corpora))
self.assertEqual("http_logs", used_corpora[0].name)
# each bulk operation requires a different data file but they should have been merged properly.
self.assertEqual({"documents-181998.json.bz2", "documents-191998.json.bz2"},
{d.document_archive for d in used_corpora[0].documents})

self.assertEqual("http_logs_unparsed", used_corpora[1].name)
self.assertEqual({"documents-201998.unparsed.json.bz2"}, {d.document_archive for d in used_corpora[1].documents})

@mock.patch("esrally.utils.io.prepare_file_offset_table")
@mock.patch("esrally.utils.io.decompress")
@mock.patch("os.path.getsize")
Expand Down
29 changes: 29 additions & 0 deletions tests/track/track_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,32 @@ def test_filter_documents_by_format_and_indices(self):
self.assertEqual(2, len(filtered_corpus.documents))
self.assertEqual("logs-01", filtered_corpus.documents[0].target_index)
self.assertEqual("logs-02", filtered_corpus.documents[1].target_index)

def test_union_document_corpus_is_reflexive(self):
corpus = track.DocumentCorpus("test", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, number_of_documents=5, target_index="logs-01"),
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, number_of_documents=6, target_index="logs-02"),
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, number_of_documents=7, target_index="logs-03"),
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, number_of_documents=8, target_index=None)
])
self.assertTrue(corpus.union(corpus) is corpus)

def test_union_document_corpora_is_symmetric(self):
a = track.DocumentCorpus("test", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, number_of_documents=5, target_index="logs-01"),
])
b = track.DocumentCorpus("test", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, number_of_documents=5, target_index="logs-02"),
])
self.assertEqual(b.union(a), a.union(b))
self.assertEqual(2, len(a.union(b).documents))

def test_cannot_union_mixed_document_corpora(self):
a = track.DocumentCorpus("test", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, number_of_documents=5, target_index="logs-01"),
])
b = track.DocumentCorpus("other", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK, number_of_documents=5, target_index="logs-02"),
])
with self.assertRaisesRegex(exceptions.RallyAssertionError, "Both document corpora must have the same name"):
a.union(b)