Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert more of the media code to async/await #7873

Merged
merged 8 commits into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7873.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert more media code to async/await.
15 changes: 8 additions & 7 deletions synapse/rest/media/v1/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import os
import urllib

from twisted.internet import defer
from twisted.protocols.basic import FileSender

from synapse.api.errors import Codes, SynapseError, cs_error
Expand Down Expand Up @@ -77,8 +76,9 @@ def respond_404(request):
)


@defer.inlineCallbacks
def respond_with_file(request, media_type, file_path, file_size=None, upload_name=None):
async def respond_with_file(
request, media_type, file_path, file_size=None, upload_name=None
):
logger.debug("Responding with %r", file_path)

if os.path.isfile(file_path):
Expand All @@ -89,7 +89,7 @@ def respond_with_file(request, media_type, file_path, file_size=None, upload_nam
add_file_headers(request, media_type, file_size, upload_name)

with open(file_path, "rb") as f:
yield make_deferred_yieldable(FileSender().beginFileTransfer(f, request))
await make_deferred_yieldable(FileSender().beginFileTransfer(f, request))

finish_request(request)
else:
Expand Down Expand Up @@ -198,8 +198,9 @@ def _can_encode_filename_as_token(x):
return True


@defer.inlineCallbacks
def respond_with_responder(request, responder, media_type, file_size, upload_name=None):
async def respond_with_responder(
request, responder, media_type, file_size, upload_name=None
):
"""Responds to the request with given responder. If responder is None then
returns 404.
Expand All @@ -218,7 +219,7 @@ def respond_with_responder(request, responder, media_type, file_size, upload_nam
add_file_headers(request, media_type, file_size, upload_name)
try:
with responder:
yield responder.write_to_consumer(request)
await responder.write_to_consumer(request)
except Exception as e:
# The majority of the time this will be due to the client having gone
# away. Unfortunately, Twisted simply throws a generic exception at us
Expand Down
60 changes: 34 additions & 26 deletions synapse/rest/media/v1/media_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
# limitations under the License.

import contextlib
import inspect
import logging
import os
import shutil
from typing import Optional

from twisted.internet import defer
from twisted.protocols.basic import FileSender

from synapse.logging.context import defer_to_thread, make_deferred_yieldable
from synapse.util.file_consumer import BackgroundFileConsumer

from ._base import Responder
from ._base import FileInfo, Responder

logger = logging.getLogger(__name__)

Expand All @@ -46,25 +47,24 @@ def __init__(self, hs, local_media_directory, filepaths, storage_providers):
self.filepaths = filepaths
self.storage_providers = storage_providers

@defer.inlineCallbacks
def store_file(self, source, file_info):
async def store_file(self, source, file_info: FileInfo) -> str:
"""Write `source` to the on disk media store, and also any other
configured storage providers
Args:
source: A file like object that should be written
file_info (FileInfo): Info about the file to store
file_info: Info about the file to store
Returns:
Deferred[str]: the file path written to in the primary media store
the file path written to in the primary media store
"""

with self.store_into_file(file_info) as (f, fname, finish_cb):
# Write to the main repository
yield defer_to_thread(
await defer_to_thread(
self.hs.get_reactor(), _write_file_synchronously, source, f
)
yield finish_cb()
await finish_cb()

return fname

Expand All @@ -75,7 +75,7 @@ def store_into_file(self, file_info):
Actually yields a 3-tuple (file, fname, finish_cb), where file is a file
like object that can be written to, fname is the absolute path of file
on disk, and finish_cb is a function that returns a Deferred.
on disk, and finish_cb is a function that returns an awaitable.
fname can be used to read the contents from after upload, e.g. to
generate thumbnails.
Expand All @@ -91,7 +91,7 @@ def store_into_file(self, file_info):
with media_storage.store_into_file(info) as (f, fname, finish_cb):
# .. write into f ...
yield finish_cb()
await finish_cb()
"""

path = self._file_info_to_path(file_info)
Expand All @@ -103,10 +103,13 @@ def store_into_file(self, file_info):

finished_called = [False]

@defer.inlineCallbacks
def finish():
async def finish():
for provider in self.storage_providers:
yield provider.store_file(path, file_info)
# store_file is supposed to return an Awaitable, but guard
# against improper implementations.
result = provider.store_file(path, file_info)
if inspect.isawaitable(result):
await result

finished_called[0] = True

Expand All @@ -123,17 +126,15 @@ def finish():
if not finished_called:
raise Exception("Finished callback not called")

@defer.inlineCallbacks
def fetch_media(self, file_info):
async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]:
"""Attempts to fetch media described by file_info from the local cache
and configured storage providers.
Args:
file_info (FileInfo)
file_info
Returns:
Deferred[Responder|None]: Returns a Responder if the file was found,
otherwise None.
Returns a Responder if the file was found, otherwise None.
"""

path = self._file_info_to_path(file_info)
Expand All @@ -142,23 +143,26 @@ def fetch_media(self, file_info):
return FileResponder(open(local_path, "rb"))

for provider in self.storage_providers:
res = yield provider.fetch(path, file_info)
res = provider.fetch(path, file_info)
# Fetch is supposed to return an Awaitable, but guard against
# improper implementations.
if inspect.isawaitable(res):
res = await res
if res:
logger.debug("Streaming %s from %s", path, provider)
return res

return None

@defer.inlineCallbacks
def ensure_media_is_in_local_cache(self, file_info):
async def ensure_media_is_in_local_cache(self, file_info: FileInfo) -> str:
"""Ensures that the given file is in the local cache. Attempts to
download it from storage providers if it isn't.
Args:
file_info (FileInfo)
file_info
Returns:
Deferred[str]: Full path to local file
Full path to local file
"""
path = self._file_info_to_path(file_info)
local_path = os.path.join(self.local_media_directory, path)
Expand All @@ -170,14 +174,18 @@ def ensure_media_is_in_local_cache(self, file_info):
os.makedirs(dirname)

for provider in self.storage_providers:
res = yield provider.fetch(path, file_info)
res = provider.fetch(path, file_info)
# Fetch is supposed to return an Awaitable, but guard against
# improper implementations.
if inspect.isawaitable(res):
res = await res
if res:
with res:
consumer = BackgroundFileConsumer(
open(local_path, "wb"), self.hs.get_reactor()
)
yield res.write_to_consumer(consumer)
yield consumer.wait()
await res.write_to_consumer(consumer)
await consumer.wait()
return local_path

raise Exception("file could not be found")
Expand Down
5 changes: 4 additions & 1 deletion tests/rest/media/v1/test_media_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from parameterized import parameterized_class
from PIL import Image as Image

from twisted.internet import defer
from twisted.internet.defer import Deferred

from synapse.logging.context import make_deferred_yieldable
Expand Down Expand Up @@ -77,7 +78,9 @@ def test_ensure_media_is_in_local_cache(self):

# This uses a real blocking threadpool so we have to wait for it to be
# actually done :/
x = self.media_storage.ensure_media_is_in_local_cache(file_info)
x = defer.ensureDeferred(
self.media_storage.ensure_media_is_in_local_cache(file_info)
)

# Hotloop until the threadpool does its job...
self.wait_on_thread(x)
Expand Down