Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using the cache for function bindings and not deleting shared memory resources upon request to close #844

Merged
merged 17 commits into from
Oct 2, 2021
Merged
15 changes: 13 additions & 2 deletions azure_functions_worker/bindings/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ def get_datum(binding: str, obj: typing.Any,
return datum


def is_cache_supported(datum: datumdef.Datum):
gohar94 marked this conversation as resolved.
Show resolved Hide resolved
if datum.type == 'bytes':
return True
elif datum.type == 'string':
gohar94 marked this conversation as resolved.
Show resolved Hide resolved
return True
return False
gohar94 marked this conversation as resolved.
Show resolved Hide resolved


def to_outgoing_proto(binding: str, obj: typing.Any, *,
pytype: typing.Optional[type]) -> protos.TypedData:
datum = get_datum(binding, obj, pytype)
Expand All @@ -120,13 +128,16 @@ def to_outgoing_proto(binding: str, obj: typing.Any, *,
def to_outgoing_param_binding(binding: str, obj: typing.Any, *,
pytype: typing.Optional[type],
out_name: str,
shmem_mgr) \
shmem_mgr,
gohar94 marked this conversation as resolved.
Show resolved Hide resolved
is_function_data_cache_enabled: bool) \
-> protos.ParameterBinding:
datum = get_datum(binding, obj, pytype)
shared_mem_value = None
# If shared memory is enabled and supported for the given datum, try to
# transfer to host over shared memory as a default
if shmem_mgr.is_enabled() and shmem_mgr.is_supported(datum):
can_transfer_over_shmem = shmem_mgr.is_supported(datum) or \
gohar94 marked this conversation as resolved.
Show resolved Hide resolved
(is_function_data_cache_enabled and is_cache_supported(datum))
if shmem_mgr.is_enabled() and can_transfer_over_shmem:
shared_mem_value = datumdef.Datum.to_rpc_shared_memory(datum, shmem_mgr)
# Check if data was written into shared memory
if shared_mem_value is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ def get_string(self, mem_map_name: str, offset: int, count: int) \
content_str = content_bytes.decode('utf-8')
return content_str

def free_mem_map(self, mem_map_name: str) -> bool:
def free_mem_map(self, mem_map_name: str,
is_delete_backing_resources: bool = True) -> bool:
"""
Frees the memory map and any backing resources (e.g. file in the case of
Unix) associated with it.
Frees the memory map and, if specified, any backing resources (e.g.
file in the case of Unix) associated with it.
If there is no memory map with the given name being tracked, then no
action is performed.
Returns True if the memory map was freed successfully, False otherwise.
Expand All @@ -170,7 +171,7 @@ def free_mem_map(self, mem_map_name: str) -> bool:
f'Cannot find memory map in list of allocations {mem_map_name}')
return False
shared_mem_map = self.allocated_mem_maps[mem_map_name]
success = shared_mem_map.dispose()
success = shared_mem_map.dispose(is_delete_backing_resources)
gohar94 marked this conversation as resolved.
Show resolved Hide resolved
del self.allocated_mem_maps[mem_map_name]
return success

Expand Down
1 change: 1 addition & 0 deletions azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
RPC_HTTP_TRIGGER_METADATA_REMOVED = "RpcHttpTriggerMetadataRemoved"
WORKER_STATUS = "WorkerStatus"
SHARED_MEMORY_DATA_TRANSFER = "SharedMemoryDataTransfer"
FUNCTION_DATA_CACHE = "FunctionDataCache"
gohar94 marked this conversation as resolved.
Show resolved Hide resolved

# Debug Flags
PYAZURE_WEBHOST_DEBUG = "PYAZURE_WEBHOST_DEBUG"
Expand Down
13 changes: 11 additions & 2 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
self._port = port
self._request_id = request_id
self._worker_id = worker_id
self._function_data_cache_enabled = False
self._functions = functions.Registry()
self._shmem_mgr = SharedMemoryManager()

Expand Down Expand Up @@ -258,6 +259,12 @@ async def _handle__worker_init_request(self, req):
logger.info('Received WorkerInitRequest, request ID %s',
self.request_id)

worker_init_request = req.worker_init_request
host_capabilities = worker_init_request.capabilities
gohar94 marked this conversation as resolved.
Show resolved Hide resolved
if constants.FUNCTION_DATA_CACHE in host_capabilities:
val = host_capabilities[constants.FUNCTION_DATA_CACHE]
self._function_data_cache_enabled = val == _TRUE

capabilities = {
constants.RAW_HTTP_BODY_BYTES: _TRUE,
constants.TYPED_DATA_COLLECTION: _TRUE,
Expand Down Expand Up @@ -392,6 +399,7 @@ async def _handle__invocation_request(self, req):
'binding returned a non-None value')

output_data = []
cache_enabled = self._function_data_cache_enabled
if fi.output_types:
for out_name, out_type_info in fi.output_types.items():
val = args[out_name].get()
Expand All @@ -403,7 +411,8 @@ async def _handle__invocation_request(self, req):
param_binding = bindings.to_outgoing_param_binding(
out_type_info.binding_name, val,
pytype=out_type_info.pytype,
out_name=out_name, shmem_mgr=self._shmem_mgr)
out_name=out_name, shmem_mgr=self._shmem_mgr,
is_function_data_cache_enabled=cache_enabled)
output_data.append(param_binding)

return_value = None
Expand Down Expand Up @@ -512,7 +521,7 @@ async def _handle__close_shared_memory_resources_request(self, req):
try:
for mem_map_name in map_names:
try:
success = self._shmem_mgr.free_mem_map(mem_map_name)
success = self._shmem_mgr.free_mem_map(mem_map_name, False)
gohar94 marked this conversation as resolved.
Show resolved Hide resolved
results[mem_map_name] = success
except Exception as e:
logger.error(f'Cannot free memory map {mem_map_name} - {e}',
Expand Down