Skip to content

Commit

Permalink
vdk-impala: add Out Of Memory error handling (#2747)
Browse files Browse the repository at this point in the history
Add Out Of Memory error handling logic in the vdk-impala error handler.

The Impala query planner projects the memory usage for every statement based on existing statistics about the
tables involved in the statement. This statistics could be either unavailable or incorrect which leads to wrong
memory estimates. We try to tackle this problem by gradually increasing the per executor node memory limit for
the query and retrying it. As a last resort, we set the memory limit very high. Neither of these actions should
cause problems for a well configured Admission Control in an Impala instance.

For more information:
https://impala.apache.org/docs/build/html/topics/impala_admission.html
https://impala.apache.org/docs/build3x/html/topics/impala_mem_limit.html
  • Loading branch information
dakodakov authored Oct 3, 2023
1 parent 965bcdc commit a208598
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

from vdk.internal.builtin_plugins.connection.recovery_cursor import RecoveryCursor
from vdk.internal.core import errors
from vdk.plugin.impala.impala_memory_error_handler import ImpalaMemoryErrorHandler

MEMORY_LIMIT_PATTERN = r"Limit=(\d+\.\d+)\s*([KMGTP]B)"


class ImpalaErrorHandler:
Expand All @@ -22,6 +25,7 @@ def __init__(self, log=None, num_retries=5, backoff_seconds=30):

self._num_retries = num_retries
self._backoff_seconds = backoff_seconds
self._memory_error_handler = ImpalaMemoryErrorHandler(log=log)

def handle_error(
self, caught_exception: Exception, recovery_cursor: RecoveryCursor
Expand Down Expand Up @@ -107,6 +111,9 @@ def _handle_exception(self, exception, recovery_cursor) -> bool:
or self._handle_metadata_exception_with_invalidate_and_retry(
exception, recovery_cursor
)
or self._memory_error_handler.handle_memory_error(
exception, recovery_cursor
)
or self._handle_impala_network_error(exception, recovery_cursor)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import re

from vdk.internal.core import errors

MEMORY_LIMIT_PATTERN = r"Limit=(\d+\.\d+)\s*([KMGTP]B)"
UNITS = {"KB": 1024, "MB": 1024**2, "GB": 1024**3, "TB": 1024**4}
MULTIPLIER = [1.2, 1.5, 2.0]


class ImpalaMemoryErrorHandler:
def __init__(self, log=None, num_retries=5, backoff_seconds=30):
"""
This module offers error handling for Impala Memory Errors.
The Impala query planner projects the memory usage for every statement based on existing statistics about the
tables involved in the statement. This statistics could be either unavailable or incorrect which leads to wrong
memory estimates. We try to tackle this problem by gradually increasing the per executor node memory limit for
the query and retrying it. As a last resort, we set the memory limit very high. Neither of these actions should
cause problems for a well configured Admission Control in an Impala instance.
For more information:
https://impala.apache.org/docs/build/html/topics/impala_admission.html
https://impala.apache.org/docs/build3x/html/topics/impala_mem_limit.html
:param log: a logger object, creates a new one if one isn't passed as a parameter
"""
if not log:
log = logging.getLogger(__name__)
self._log = log

def handle_memory_error(self, exception, recovery_cursor):
if errors.exception_matches(
exception,
classname_with_package="impala.error.OperationalError",
exception_message_matcher_regex=".*Memory limit exceeded:.*",
):
# We are going to try to increase the memory limits and see if the query passes
# But we won't do anything if the sql statement itself sets a memory limit
self._log.info(
"Query failed with memory error. We are going to increase the memory limit."
f"Error was: {exception.__class__}: {str(exception)}"
)
if (
"set memory_limit="
in str(
recovery_cursor.get_managed_operation().get_operation_parameters_tuple()
).lower()
):
self._log.info(
"The SQL statement you are trying to execute contains a set memory_limit option "
"so we are not going to handle the memory error."
)
return False

# Incrementally increase the memory limit and as a last resort try to set the memory to an extreme value
if recovery_cursor.get_retries() == 4:
# We won't be able to handle this error by increasing limits
return False
if recovery_cursor.get_retries() == 3:
# An extreme case but let's try it as a last resort
recovery_cursor.execute("set memory_limit=512GB;")
else:
self._update_memory_limit(
exception,
recovery_cursor,
MULTIPLIER[recovery_cursor.get_retries()],
)
# retry the query
recovery_cursor.retry_operation()
return True
else:
return False

def _update_memory_limit(self, exception, recovery_cursor, multiplier):
new_memory_limit = self._get_new_memory_limit(
exception=exception, multiplier=multiplier
)
if new_memory_limit:
self._log.info(
f"Setting memory limit to {new_memory_limit} bytes and retrying SQL statement."
)
recovery_cursor.execute(f"set mem_limit={new_memory_limit};")
else:
self._log.warning("Unable to determine current memory limit for statement.")

@staticmethod
def _convert_to_bytes(value: str, unit: str):
return int(float(value) * UNITS.get(unit, 1))

def _get_new_memory_limit(self, exception, multiplier: float):
memory_limit_match = re.search(MEMORY_LIMIT_PATTERN, str(exception))

if memory_limit_match:
memory_limit_value = memory_limit_match.group(1)
memory_limit_unit = memory_limit_match.group(2)
current_memory_limit = self._convert_to_bytes(
memory_limit_value, memory_limit_unit
)
return int(current_memory_limit * multiplier)
else:
return None
49 changes: 49 additions & 0 deletions projects/vdk-plugins/vdk-impala/tests/impala_error_handler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,55 @@ def test_handle_failed_to_open_hdfs_new_authorization_exception_is_thrown_after_
calls = [call("refresh `history`.`vm`"), call(original_query)]
mock_native_cursor.execute.assert_has_calls(calls)

def test_handle_memory_error(self, patched_time_sleep):
msg = """Memory limit exceeded: HdfsParquetTableWriter::BaseColumnWriter::Flush() failed to allocate 884879 bytes for dictionary page.
HdfsTableSink could not allocate 864.14 KB without exceeding limit.
Error occurred on backend prd-impala-wdc-08-vc24c06-e14-ix-2.supercollider.vmware.com:22000 by fragment 0842bccde0974578:6fd468a200000042
Memory left in process limit: 116.53 GB
Memory left in query limit: 640.67 KB
Query(0842bccde0974578:6fd468a200000000): Limit=2.00 GB Reservation=1.78 GB ReservationLimit=9.00 GB OtherMemory=227.37 MB Total=2.00 GB Peak=2.00 GB
Fragment 0842bccde0974578:6fd468a200000042: Reservation=1.78 GB OtherMemory=227.37 MB Total=2.00 GB Peak=2.00 GB"""
test_exception = OperationalError(msg)
(
mock_native_cursor,
_,
_,
mock_recovery_cursor,
_,
) = populate_mock_managed_cursor(
mock_exception_to_recover=test_exception, mock_operation=self._query
)
mock_native_cursor.execute.side_effect = [
None,
test_exception,
None,
test_exception,
None,
test_exception,
None,
test_exception,
None,
...,
]

self.assertTrue(
self.error_handler.handle_error(test_exception, mock_recovery_cursor)
)

calls = [
call("set mem_limit=2576980377;"),
call("select 1"),
call("set mem_limit=3221225472;"),
call("select 1"),
call("set mem_limit=4294967296;"),
call("select 1"),
call("set memory_limit=512GB;"),
call("select 1"),
call("select 1"),
]

mock_native_cursor.execute.assert_has_calls(calls)


if __name__ == "__main__":
unittest.main()

0 comments on commit a208598

Please sign in to comment.