From 6e3ecd42bb0d25498e9de87f83eebf9c6a80d692 Mon Sep 17 00:00:00 2001 From: Gohar Irfan Chaudhry Date: Fri, 1 Oct 2021 18:32:22 -0700 Subject: [PATCH] Using the cache for function bindings and not deleting shared memory resources upon request to close (#844) * Separate field in the message to close shared memory maps which indicate if the maps should be deleted or just the reference should be dropped * Check for function data cache capability from host * Update azure_functions_worker/bindings/meta.py * Addressing comments for cleaning up shared memory usage checks in meta.py --- azure_functions_worker/bindings/meta.py | 44 ++++++++++++++++--- .../shared_memory_manager.py | 9 ++-- azure_functions_worker/constants.py | 1 + azure_functions_worker/dispatcher.py | 26 ++++++++--- tests/unittests/test_shared_memory_manager.py | 22 ++++++++++ 5 files changed, 88 insertions(+), 14 deletions(-) diff --git a/azure_functions_worker/bindings/meta.py b/azure_functions_worker/bindings/meta.py index cbf95fcb..52867a00 100644 --- a/azure_functions_worker/bindings/meta.py +++ b/azure_functions_worker/bindings/meta.py @@ -7,6 +7,7 @@ from . import datumdef from . import generic +from .shared_memory_data_transfer import SharedMemoryManager PB_TYPE = 'rpc_data' PB_TYPE_DATA = 'data' @@ -62,7 +63,7 @@ def from_incoming_proto( pb: protos.ParameterBinding, *, pytype: typing.Optional[type], trigger_metadata: typing.Optional[typing.Dict[str, protos.TypedData]], - shmem_mgr) -> typing.Any: + shmem_mgr: SharedMemoryManager) -> typing.Any: binding = get_binding(binding) if trigger_metadata: metadata = { @@ -111,6 +112,39 @@ def get_datum(binding: str, obj: typing.Any, return datum +def _does_datatype_support_caching(datum: datumdef.Datum): + supported_datatypes = ('bytes', 'string') + return datum.type in supported_datatypes + + +def _can_transfer_over_shmem(shmem_mgr: SharedMemoryManager, + is_function_data_cache_enabled: bool, + datum: datumdef.Datum): + """ + If shared memory is enabled and supported for the given datum, try to + transfer to host over shared memory as a default. + If caching is enabled, then also check if this type is supported - if so, + transfer over shared memory. + In case of caching, some conditions like object size may not be + applicable since even small objects are also allowed to be cached. + """ + if not shmem_mgr.is_enabled(): + # If shared memory usage is not enabled, no further checks required + return False + if shmem_mgr.is_supported(datum): + # If transferring this object over shared memory is supported, do so. + return True + if is_function_data_cache_enabled and _does_datatype_support_caching(datum): + # If caching is enabled and this object can be cached, transfer over + # shared memory (since the cache uses shared memory). + # In this case, some requirements (like object size) for using shared + # memory may be ignored since we want to support caching of small + # objects (those that have sizes smaller that the minimum we transfer + # over shared memory when the cache is not enabled) as well. + return True + return False + + def to_outgoing_proto(binding: str, obj: typing.Any, *, pytype: typing.Optional[type]) -> protos.TypedData: datum = get_datum(binding, obj, pytype) @@ -120,13 +154,13 @@ def to_outgoing_proto(binding: str, obj: typing.Any, *, def to_outgoing_param_binding(binding: str, obj: typing.Any, *, pytype: typing.Optional[type], out_name: str, - shmem_mgr) \ + shmem_mgr: SharedMemoryManager, + is_function_data_cache_enabled: bool) \ -> protos.ParameterBinding: datum = get_datum(binding, obj, pytype) shared_mem_value = None - # If shared memory is enabled and supported for the given datum, try to - # transfer to host over shared memory as a default - if shmem_mgr.is_enabled() and shmem_mgr.is_supported(datum): + if _can_transfer_over_shmem(shmem_mgr, is_function_data_cache_enabled, + datum): shared_mem_value = datumdef.Datum.to_rpc_shared_memory(datum, shmem_mgr) # Check if data was written into shared memory if shared_mem_value is not None: diff --git a/azure_functions_worker/bindings/shared_memory_data_transfer/shared_memory_manager.py b/azure_functions_worker/bindings/shared_memory_data_transfer/shared_memory_manager.py index b4cba544..b5fc1ed0 100644 --- a/azure_functions_worker/bindings/shared_memory_data_transfer/shared_memory_manager.py +++ b/azure_functions_worker/bindings/shared_memory_data_transfer/shared_memory_manager.py @@ -157,10 +157,11 @@ def get_string(self, mem_map_name: str, offset: int, count: int) \ content_str = content_bytes.decode('utf-8') return content_str - def free_mem_map(self, mem_map_name: str) -> bool: + def free_mem_map(self, mem_map_name: str, + to_delete_backing_resources: bool = True) -> bool: """ - Frees the memory map and any backing resources (e.g. file in the case of - Unix) associated with it. + Frees the memory map and, if specified, any backing resources (e.g. + file in the case of Unix) associated with it. If there is no memory map with the given name being tracked, then no action is performed. Returns True if the memory map was freed successfully, False otherwise. @@ -170,7 +171,7 @@ def free_mem_map(self, mem_map_name: str) -> bool: f'Cannot find memory map in list of allocations {mem_map_name}') return False shared_mem_map = self.allocated_mem_maps[mem_map_name] - success = shared_mem_map.dispose() + success = shared_mem_map.dispose(to_delete_backing_resources) del self.allocated_mem_maps[mem_map_name] return success diff --git a/azure_functions_worker/constants.py b/azure_functions_worker/constants.py index bad05040..7f7c32a5 100644 --- a/azure_functions_worker/constants.py +++ b/azure_functions_worker/constants.py @@ -10,6 +10,7 @@ RPC_HTTP_TRIGGER_METADATA_REMOVED = "RpcHttpTriggerMetadataRemoved" WORKER_STATUS = "WorkerStatus" SHARED_MEMORY_DATA_TRANSFER = "SharedMemoryDataTransfer" +FUNCTION_DATA_CACHE = "FunctionDataCache" # Debug Flags PYAZURE_WEBHOST_DEBUG = "PYAZURE_WEBHOST_DEBUG" diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 694e5fc9..81c3f6d8 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -74,6 +74,7 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int, self._port = port self._request_id = request_id self._worker_id = worker_id + self._function_data_cache_enabled = False self._functions = functions.Registry() self._shmem_mgr = SharedMemoryManager() @@ -262,6 +263,12 @@ async def _handle__worker_init_request(self, req): 'python version %s, worker version %s, request ID %s', sys.version, __version__, self.request_id) + worker_init_request = req.worker_init_request + host_capabilities = worker_init_request.capabilities + if constants.FUNCTION_DATA_CACHE in host_capabilities: + val = host_capabilities[constants.FUNCTION_DATA_CACHE] + self._function_data_cache_enabled = val == _TRUE + capabilities = { constants.RAW_HTTP_BODY_BYTES: _TRUE, constants.TYPED_DATA_COLLECTION: _TRUE, @@ -408,6 +415,7 @@ async def _handle__invocation_request(self, req): 'binding returned a non-None value') output_data = [] + cache_enabled = self._function_data_cache_enabled if fi.output_types: for out_name, out_type_info in fi.output_types.items(): val = args[out_name].get() @@ -419,7 +427,8 @@ async def _handle__invocation_request(self, req): param_binding = bindings.to_outgoing_param_binding( out_type_info.binding_name, val, pytype=out_type_info.pytype, - out_name=out_name, shmem_mgr=self._shmem_mgr) + out_name=out_name, shmem_mgr=self._shmem_mgr, + is_function_data_cache_enabled=cache_enabled) output_data.append(param_binding) return_value = None @@ -517,6 +526,10 @@ async def _handle__close_shared_memory_resources_request(self, req): invocation. This is called after the functions host is done reading the output from the worker and wants the worker to free up those resources. + If the cache is enabled, let the host decide when to delete the + resources. Just drop the reference from the worker. + If the cache is not enabled, the worker should free the resources as at + this point the host has read the memory maps and does not need them. """ close_request = req.close_shared_memory_resources_request map_names = close_request.map_names @@ -526,12 +539,15 @@ async def _handle__close_shared_memory_resources_request(self, req): results = {mem_map_name: False for mem_map_name in map_names} try: - for mem_map_name in map_names: + for map_name in map_names: try: - success = self._shmem_mgr.free_mem_map(mem_map_name) - results[mem_map_name] = success + to_delete_resources = \ + False if self._function_data_cache_enabled else True + success = self._shmem_mgr.free_mem_map(map_name, + to_delete_resources) + results[map_name] = success except Exception as e: - logger.error(f'Cannot free memory map {mem_map_name} - {e}', + logger.error(f'Cannot free memory map {map_name} - {e}', exc_info=True) finally: response = protos.CloseSharedMemoryResourcesResponse( diff --git a/tests/unittests/test_shared_memory_manager.py b/tests/unittests/test_shared_memory_manager.py index 7a3999c3..0cdb7c23 100644 --- a/tests/unittests/test_shared_memory_manager.py +++ b/tests/unittests/test_shared_memory_manager.py @@ -339,6 +339,28 @@ def test_allocated_mem_maps(self): self.assertFalse(is_mem_map_found) self.assertEqual(0, len(manager.allocated_mem_maps.keys())) + def test_do_not_free_resources_on_dispose(self): + """ + Verify that when the allocated shared memory maps are freed, + their backing resources are not freed. + Note: The shared memory map should no longer be tracked by the + SharedMemoryManager, though. + """ + manager = SharedMemoryManager() + content_size = consts.MIN_BYTES_FOR_SHARED_MEM_TRANSFER + 10 + content = self.get_random_bytes(content_size) + shared_mem_meta = manager.put_bytes(content) + self.assertIsNotNone(shared_mem_meta) + mem_map_name = shared_mem_meta.mem_map_name + is_mem_map_found = mem_map_name in manager.allocated_mem_maps + self.assertTrue(is_mem_map_found) + self.assertEqual(1, len(manager.allocated_mem_maps.keys())) + free_success = manager.free_mem_map(mem_map_name, False) + self.assertTrue(free_success) + is_mem_map_found = mem_map_name in manager.allocated_mem_maps + self.assertFalse(is_mem_map_found) + self.assertEqual(0, len(manager.allocated_mem_maps.keys())) + def test_invalid_put_allocated_mem_maps(self): """ Verify that after an invalid put operation, no shared memory maps were