Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using the cache for function bindings and not deleting shared memory resources upon request to close #844

Merged
merged 17 commits into from
Oct 2, 2021
Merged
44 changes: 39 additions & 5 deletions azure_functions_worker/bindings/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from . import datumdef
from . import generic
from .shared_memory_data_transfer import SharedMemoryManager

PB_TYPE = 'rpc_data'
PB_TYPE_DATA = 'data'
Expand Down Expand Up @@ -62,7 +63,7 @@ def from_incoming_proto(
pb: protos.ParameterBinding, *,
pytype: typing.Optional[type],
trigger_metadata: typing.Optional[typing.Dict[str, protos.TypedData]],
shmem_mgr) -> typing.Any:
shmem_mgr: SharedMemoryManager) -> typing.Any:
binding = get_binding(binding)
if trigger_metadata:
metadata = {
Expand Down Expand Up @@ -111,6 +112,39 @@ def get_datum(binding: str, obj: typing.Any,
return datum


def _does_datatype_support_caching(datum: datumdef.Datum):
supported_datatypes = ('bytes', 'string')
return datum.type in supported_datatypes


def _can_transfer_over_shmem(shmem_mgr: SharedMemoryManager,
is_function_data_cache_enabled: bool,
datum: datumdef.Datum):
"""
If shared memory is enabled and supported for the given datum, try to
transfer to host over shared memory as a default.
If caching is enabled, then also check if this type is supported - if so,
transfer over shared memory.
In case of caching, some conditions like object size may not be
applicable since even small objects are also allowed to be cached.
"""
if not shmem_mgr.is_enabled():
# If shared memory usage is not enabled, no further checks required
return False
if shmem_mgr.is_supported(datum):
# If transferring this object over shared memory is supported, do so.
return True
if is_function_data_cache_enabled and _does_datatype_support_caching(datum):
# If caching is enabled and this object can be cached, transfer over
# shared memory (since the cache uses shared memory).
# In this case, some requirements (like object size) for using shared
# memory may be ignored since we want to support caching of small
# objects (those that have sizes smaller that the minimum we transfer
# over shared memory when the cache is not enabled) as well.
return True
return False


def to_outgoing_proto(binding: str, obj: typing.Any, *,
pytype: typing.Optional[type]) -> protos.TypedData:
datum = get_datum(binding, obj, pytype)
Expand All @@ -120,13 +154,13 @@ def to_outgoing_proto(binding: str, obj: typing.Any, *,
def to_outgoing_param_binding(binding: str, obj: typing.Any, *,
pytype: typing.Optional[type],
out_name: str,
shmem_mgr) \
shmem_mgr: SharedMemoryManager,
is_function_data_cache_enabled: bool) \
-> protos.ParameterBinding:
datum = get_datum(binding, obj, pytype)
shared_mem_value = None
# If shared memory is enabled and supported for the given datum, try to
# transfer to host over shared memory as a default
if shmem_mgr.is_enabled() and shmem_mgr.is_supported(datum):
if _can_transfer_over_shmem(shmem_mgr, is_function_data_cache_enabled,
datum):
shared_mem_value = datumdef.Datum.to_rpc_shared_memory(datum, shmem_mgr)
# Check if data was written into shared memory
if shared_mem_value is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ def get_string(self, mem_map_name: str, offset: int, count: int) \
content_str = content_bytes.decode('utf-8')
return content_str

def free_mem_map(self, mem_map_name: str) -> bool:
def free_mem_map(self, mem_map_name: str,
to_delete_backing_resources: bool = True) -> bool:
"""
Frees the memory map and any backing resources (e.g. file in the case of
Unix) associated with it.
Frees the memory map and, if specified, any backing resources (e.g.
file in the case of Unix) associated with it.
If there is no memory map with the given name being tracked, then no
action is performed.
Returns True if the memory map was freed successfully, False otherwise.
Expand All @@ -170,7 +171,7 @@ def free_mem_map(self, mem_map_name: str) -> bool:
f'Cannot find memory map in list of allocations {mem_map_name}')
return False
shared_mem_map = self.allocated_mem_maps[mem_map_name]
success = shared_mem_map.dispose()
success = shared_mem_map.dispose(to_delete_backing_resources)
del self.allocated_mem_maps[mem_map_name]
return success

Expand Down
1 change: 1 addition & 0 deletions azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
RPC_HTTP_TRIGGER_METADATA_REMOVED = "RpcHttpTriggerMetadataRemoved"
WORKER_STATUS = "WorkerStatus"
SHARED_MEMORY_DATA_TRANSFER = "SharedMemoryDataTransfer"
FUNCTION_DATA_CACHE = "FunctionDataCache"
gohar94 marked this conversation as resolved.
Show resolved Hide resolved

# Debug Flags
PYAZURE_WEBHOST_DEBUG = "PYAZURE_WEBHOST_DEBUG"
Expand Down
26 changes: 21 additions & 5 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
self._port = port
self._request_id = request_id
self._worker_id = worker_id
self._function_data_cache_enabled = False
self._functions = functions.Registry()
self._shmem_mgr = SharedMemoryManager()

Expand Down Expand Up @@ -262,6 +263,12 @@ async def _handle__worker_init_request(self, req):
'python version %s, worker version %s, request ID %s',
sys.version, __version__, self.request_id)

worker_init_request = req.worker_init_request
host_capabilities = worker_init_request.capabilities
gohar94 marked this conversation as resolved.
Show resolved Hide resolved
if constants.FUNCTION_DATA_CACHE in host_capabilities:
val = host_capabilities[constants.FUNCTION_DATA_CACHE]
self._function_data_cache_enabled = val == _TRUE

capabilities = {
constants.RAW_HTTP_BODY_BYTES: _TRUE,
constants.TYPED_DATA_COLLECTION: _TRUE,
Expand Down Expand Up @@ -408,6 +415,7 @@ async def _handle__invocation_request(self, req):
'binding returned a non-None value')

output_data = []
cache_enabled = self._function_data_cache_enabled
if fi.output_types:
for out_name, out_type_info in fi.output_types.items():
val = args[out_name].get()
Expand All @@ -419,7 +427,8 @@ async def _handle__invocation_request(self, req):
param_binding = bindings.to_outgoing_param_binding(
out_type_info.binding_name, val,
pytype=out_type_info.pytype,
out_name=out_name, shmem_mgr=self._shmem_mgr)
out_name=out_name, shmem_mgr=self._shmem_mgr,
is_function_data_cache_enabled=cache_enabled)
output_data.append(param_binding)

return_value = None
Expand Down Expand Up @@ -517,6 +526,10 @@ async def _handle__close_shared_memory_resources_request(self, req):
invocation.
This is called after the functions host is done reading the output from
the worker and wants the worker to free up those resources.
If the cache is enabled, let the host decide when to delete the
resources. Just drop the reference from the worker.
If the cache is not enabled, the worker should free the resources as at
this point the host has read the memory maps and does not need them.
"""
close_request = req.close_shared_memory_resources_request
map_names = close_request.map_names
Expand All @@ -526,12 +539,15 @@ async def _handle__close_shared_memory_resources_request(self, req):
results = {mem_map_name: False for mem_map_name in map_names}

try:
for mem_map_name in map_names:
for map_name in map_names:
try:
success = self._shmem_mgr.free_mem_map(mem_map_name)
results[mem_map_name] = success
to_delete_resources = \
False if self._function_data_cache_enabled else True
success = self._shmem_mgr.free_mem_map(map_name,
to_delete_resources)
results[map_name] = success
except Exception as e:
logger.error(f'Cannot free memory map {mem_map_name} - {e}',
logger.error(f'Cannot free memory map {map_name} - {e}',
exc_info=True)
finally:
response = protos.CloseSharedMemoryResourcesResponse(
Expand Down
22 changes: 22 additions & 0 deletions tests/unittests/test_shared_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,28 @@ def test_allocated_mem_maps(self):
self.assertFalse(is_mem_map_found)
self.assertEqual(0, len(manager.allocated_mem_maps.keys()))

def test_do_not_free_resources_on_dispose(self):
"""
Verify that when the allocated shared memory maps are freed,
their backing resources are not freed.
Note: The shared memory map should no longer be tracked by the
SharedMemoryManager, though.
"""
manager = SharedMemoryManager()
content_size = consts.MIN_BYTES_FOR_SHARED_MEM_TRANSFER + 10
content = self.get_random_bytes(content_size)
shared_mem_meta = manager.put_bytes(content)
self.assertIsNotNone(shared_mem_meta)
mem_map_name = shared_mem_meta.mem_map_name
is_mem_map_found = mem_map_name in manager.allocated_mem_maps
self.assertTrue(is_mem_map_found)
self.assertEqual(1, len(manager.allocated_mem_maps.keys()))
free_success = manager.free_mem_map(mem_map_name, False)
self.assertTrue(free_success)
is_mem_map_found = mem_map_name in manager.allocated_mem_maps
self.assertFalse(is_mem_map_found)
self.assertEqual(0, len(manager.allocated_mem_maps.keys()))

def test_invalid_put_allocated_mem_maps(self):
"""
Verify that after an invalid put operation, no shared memory maps were
Expand Down