From 559d0bdb4466c7115db889b221a64de89a227dd0 Mon Sep 17 00:00:00 2001 From: Grzegorz Banasiak Date: Wed, 28 Feb 2024 19:44:41 +0100 Subject: [PATCH 1/4] Avoid immense terms in metrics meta data --- esrally/driver/runner.py | 25 ++++- esrally/resources/metrics-template.json | 74 +++++++------- tests/driver/runner_test.py | 125 ++++++++++++++++++++++++ 3 files changed, 186 insertions(+), 38 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index b5c0bf843..20a6bfd9b 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -659,13 +659,28 @@ def extract_error_details(self, error_details, data): error_details.add((data["status"], None)) def error_description(self, error_details): + """ + Generates error description with an arbitrary limit of 5 errors. + + :param error_details: accumulated error details + :return: error description + """ error_descriptions = [] - for status, reason in error_details: - if reason: - error_descriptions.append(f"HTTP status: {status}, message: {reason}") + is_truncated = False + for count, error_detail in enumerate(error_details): + status, reason = error_detail + if count < 5: + if reason: + error_descriptions.append(f"HTTP status: {status}, message: {reason}") + else: + error_descriptions.append(f"HTTP status: {status}") else: - error_descriptions.append(f"HTTP status: {status}") - return " | ".join(sorted(error_descriptions)) + is_truncated = True + break + description = " | ".join(sorted(error_descriptions)) + if is_truncated: + description = description + " | " + return description def __repr__(self, *args, **kwargs): return "bulk-index" diff --git a/esrally/resources/metrics-template.json b/esrally/resources/metrics-template.json index 7a8946c21..da781f307 100644 --- a/esrally/resources/metrics-template.json +++ b/esrally/resources/metrics-template.json @@ -16,7 +16,8 @@ "match": "*", "match_mapping_type": "string", "mapping": { - "type": "keyword" + "type": "keyword", + "ignore_above": 32766 } } } @@ -29,42 +30,18 @@ "type": "date", "format": "epoch_millis" }, - "relative-time": { - "type": "float" - }, - "race-id": { - "type": "keyword" - }, - "race-timestamp": { - "type": "date", - "format": "basic_date_time_no_millis", - "fields": { - "raw": { - "type": "keyword" - } - } - }, - "environment": { - "type": "keyword" - }, - "track": { + "car": { "type": "keyword" }, "challenge": { "type": "keyword" }, - "car": { + "environment": { "type": "keyword" }, - "name": { + "job": { "type": "keyword" }, - "value": { - "type": "float" - }, - "min": { - "type": "float" - }, "max": { "type": "float" }, @@ -74,23 +51,54 @@ "median": { "type": "float" }, - "unit": { + "meta": { + "properties": { + "error-description": { + "type": "wildcard" + } + } + }, + "min": { + "type": "float" + }, + "name": { "type": "keyword" }, + "operation": { + "type": "keyword" + }, + "operation-type": { + "type": "keyword" + }, + "race-id": { + "type": "keyword" + }, + "race-timestamp": { + "type": "date", + "format": "basic_date_time_no_millis", + "fields": { + "raw": { + "type": "keyword" + } + } + }, + "relative-time": { + "type": "float" + }, "sample-type": { "type": "keyword" }, "task": { "type": "keyword" }, - "operation": { + "track": { "type": "keyword" }, - "operation-type": { + "unit": { "type": "keyword" }, - "job": { - "type": "keyword" + "value": { + "type": "float" } } } diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 61abe0dc2..8c7258348 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -1243,6 +1243,131 @@ async def test_bulk_index_error_logs_warning_with_detailed_stats_body(self, es): es.bulk.assert_awaited_with(body=bulk_params["body"], params={}) + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_bulk_index_error_produces_detailed_stats_body_with_limit(self, es): + es.bulk = mock.AsyncMock( + return_value={ + "took": 5, + "errors": True, + "items": [ + { + "create": { + "_index": "test", + "status": 409, + "error": { + "type": "version_conflict_engine_exception", + "reason": "[1]: version conflict, document already exists (current version [1])", + }, + } + }, + { + "create": { + "_index": "test", + "status": 409, + "error": { + "type": "version_conflict_engine_exception", + "reason": "[2]: version conflict, document already exists (current version [1])", + }, + } + }, + { + "create": { + "_index": "test", + "status": 409, + "error": { + "type": "version_conflict_engine_exception", + "reason": "[3]: version conflict, document already exists (current version [1])", + }, + } + }, + { + "create": { + "_index": "test", + "status": 409, + "error": { + "type": "version_conflict_engine_exception", + "reason": "[4]: version conflict, document already exists (current version [1])", + }, + } + }, + { + "create": { + "_index": "test", + "status": 409, + "error": { + "type": "version_conflict_engine_exception", + "reason": "[5]: version conflict, document already exists (current version [1])", + }, + } + }, + { + "create": { + "_index": "test", + "status": 409, + "error": { + "type": "version_conflict_engine_exception", + "reason": "[6]: version conflict, document already exists (current version [1])", + }, + } + }, + ], + } + ) + + bulk = runner.BulkIndex() + + bulk_params = { + "body": _build_bulk_body( + '{ "index" : { "_index" : "test" } }', + '{"message" : "in a bottle #1"}', + '{ "index" : { "_index" : "test" } }', + '{"message" : "in a bottle #2"}', + '{ "index" : { "_index" : "test" } }', + '{"message" : "in a bottle #3"}', + '{ "index" : { "_index" : "test" } }', + '{"message" : "in a bottle #4"}', + '{ "index" : { "_index" : "test" } }', + '{"message" : "in a bottle #5"}', + '{ "index" : { "_index" : "test" } }', + '{"message" : "in a bottle #6"}', + ), + "action-metadata-present": True, + "bulk-size": 6, + "unit": "docs", + "detailed-results": True, + "index": "test", + } + + with mock.patch.object(bulk.logger, "warning") as mocked_warning_logger: + result = await bulk(es, bulk_params) + mocked_warning_logger.assert_has_calls([mock.call("Bulk request failed: [%s]", result["error-description"])]) + + assert result == { + "took": 5, + "index": "test", + "weight": 6, + "unit": "docs", + "success": False, + "success-count": 0, + "error-count": 6, + "error-type": "bulk", + "error-description": ( + "HTTP status: 409, message: [1]: version conflict, document already exists (current version [1]) | " + "HTTP status: 409, message: [2]: version conflict, document already exists (current version [1]) | " + "HTTP status: 409, message: [3]: version conflict, document already exists (current version [1]) | " + "HTTP status: 409, message: [4]: version conflict, document already exists (current version [1]) | " + "HTTP status: 409, message: [5]: version conflict, document already exists (current version [1]) | " + "" + ), + "ops": {"create": collections.Counter({"item-count": 6})}, + "shards_histogram": [], + "total-document-size-bytes": 180, + "bulk-request-size-bytes": 390, + } + + es.bulk.assert_awaited_with(body=bulk_params["body"], params={}) + @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_bulk_index_success_with_refresh_default(self, es): From c300043b79cd0325433a5270723938f571840022 Mon Sep 17 00:00:00 2001 From: Grzegorz Banasiak Date: Wed, 28 Feb 2024 20:28:56 +0100 Subject: [PATCH 2/4] Sort deterministically --- esrally/driver/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 20a6bfd9b..002342228 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -667,7 +667,7 @@ def error_description(self, error_details): """ error_descriptions = [] is_truncated = False - for count, error_detail in enumerate(error_details): + for count, error_detail in enumerate(sorted(error_details)): status, reason = error_detail if count < 5: if reason: @@ -677,7 +677,7 @@ def error_description(self, error_details): else: is_truncated = True break - description = " | ".join(sorted(error_descriptions)) + description = " | ".join(error_descriptions) if is_truncated: description = description + " | " return description From 9ae5cc2a8ee23e7015a7bff70f8e05e0293da3a5 Mon Sep 17 00:00:00 2001 From: Grzegorz Banasiak Date: Thu, 29 Feb 2024 09:37:26 +0100 Subject: [PATCH 3/4] Reduce ignore_above value to accommodate for UTF-8 characters --- esrally/resources/metrics-template.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esrally/resources/metrics-template.json b/esrally/resources/metrics-template.json index da781f307..8699ac73a 100644 --- a/esrally/resources/metrics-template.json +++ b/esrally/resources/metrics-template.json @@ -17,7 +17,7 @@ "match_mapping_type": "string", "mapping": { "type": "keyword", - "ignore_above": 32766 + "ignore_above": 8191 } } } From 6ce175adbd18fe48dc461ed8fc7356156d5dac3f Mon Sep 17 00:00:00 2001 From: Grzegorz Banasiak Date: Tue, 12 Mar 2024 16:40:36 +0100 Subject: [PATCH 4/4] Add status code summary on truncation --- esrally/driver/runner.py | 17 ++++++++++++++++- tests/driver/runner_test.py | 27 ++++++++++++++++++++------- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 002342228..137bf4a4d 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -658,6 +658,21 @@ def extract_error_details(self, error_details, data): else: error_details.add((data["status"], None)) + def _error_status_summary(self, error_details): + """ + Generates error status code summary. + + :param error_details: accumulated error details + :return: error status summary + """ + status_counts = {} + for status, _ in error_details: + status_counts[status] = status_counts.get(status, 0) + 1 + status_summaries = [] + for status in sorted(status_counts.keys()): + status_summaries.append(f"{status_counts[status]}x{status}") + return ", ".join(status_summaries) + def error_description(self, error_details): """ Generates error description with an arbitrary limit of 5 errors. @@ -679,7 +694,7 @@ def error_description(self, error_details): break description = " | ".join(error_descriptions) if is_truncated: - description = description + " | " + description = description + " | TRUNCATED " + self._error_status_summary(error_details) return description def __repr__(self, *args, **kwargs): diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 8c7258348..910c1de95 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -1311,6 +1311,17 @@ async def test_bulk_index_error_produces_detailed_stats_body_with_limit(self, es }, } }, + { + "create": { + "_index": "test", + "status": 429, + "error": { + "type": "cluster_block_exception", + "reason": "index [test] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded " + "flood-stage watermark, index has read-only-allow-delete block];", + }, + } + }, ], } ) @@ -1331,9 +1342,11 @@ async def test_bulk_index_error_produces_detailed_stats_body_with_limit(self, es '{"message" : "in a bottle #5"}', '{ "index" : { "_index" : "test" } }', '{"message" : "in a bottle #6"}', + '{ "index" : { "_index" : "test" } }', + '{"message" : "in a bottle #7"}', ), "action-metadata-present": True, - "bulk-size": 6, + "bulk-size": 7, "unit": "docs", "detailed-results": True, "index": "test", @@ -1346,11 +1359,11 @@ async def test_bulk_index_error_produces_detailed_stats_body_with_limit(self, es assert result == { "took": 5, "index": "test", - "weight": 6, + "weight": 7, "unit": "docs", "success": False, "success-count": 0, - "error-count": 6, + "error-count": 7, "error-type": "bulk", "error-description": ( "HTTP status: 409, message: [1]: version conflict, document already exists (current version [1]) | " @@ -1358,12 +1371,12 @@ async def test_bulk_index_error_produces_detailed_stats_body_with_limit(self, es "HTTP status: 409, message: [3]: version conflict, document already exists (current version [1]) | " "HTTP status: 409, message: [4]: version conflict, document already exists (current version [1]) | " "HTTP status: 409, message: [5]: version conflict, document already exists (current version [1]) | " - "" + "TRUNCATED 6x409, 1x429" ), - "ops": {"create": collections.Counter({"item-count": 6})}, + "ops": {"create": collections.Counter({"item-count": 7})}, "shards_histogram": [], - "total-document-size-bytes": 180, - "bulk-request-size-bytes": 390, + "total-document-size-bytes": 210, + "bulk-request-size-bytes": 455, } es.bulk.assert_awaited_with(body=bulk_params["body"], params={})