-
Notifications
You must be signed in to change notification settings - Fork 206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New feature: File download support #702
Open
EnotShow
wants to merge
11
commits into
aminalaee:main
Choose a base branch
from
EnotShow:works_with_files
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
c403a30
File download support
EnotShow c6ad417
Merge branch 'main' into works_with_files
aminalaee aa56f69
File read update
EnotShow 9a7b32a
Merge branch 'works_with_files' of https://github.com/EnotShow/sqladm…
EnotShow 515b621
update test for files view
EnotShow 8f5fd29
update mypy types
EnotShow ac2c6d5
file read/write tests
EnotShow 759a60e
tests check fix
EnotShow 2b05618
test format
EnotShow c10690b
Merge branch 'aminalaee:main' into works_with_files
EnotShow 18cd6c0
Merge branch 'main' into works_with_files
EnotShow File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
import inspect | ||
import io | ||
import logging | ||
from pathlib import Path | ||
from types import MethodType | ||
from typing import ( | ||
TYPE_CHECKING, | ||
|
@@ -27,7 +28,12 @@ | |
from starlette.exceptions import HTTPException | ||
from starlette.middleware import Middleware | ||
from starlette.requests import Request | ||
from starlette.responses import JSONResponse, RedirectResponse, Response | ||
from starlette.responses import ( | ||
FileResponse, | ||
JSONResponse, | ||
RedirectResponse, | ||
Response, | ||
) | ||
from starlette.routing import Mount, Route | ||
from starlette.staticfiles import StaticFiles | ||
|
||
|
@@ -37,9 +43,11 @@ | |
from sqladmin.authentication import AuthenticationBackend, login_required | ||
from sqladmin.forms import WTFORMS_ATTRS, WTFORMS_ATTRS_REVERSED | ||
from sqladmin.helpers import ( | ||
get_filename_from_path, | ||
get_object_identifier, | ||
is_async_session_maker, | ||
slugify_action_name, | ||
value_is_filepath, | ||
) | ||
from sqladmin.models import BaseView, ModelView | ||
from sqladmin.templating import Jinja2Templates | ||
|
@@ -116,6 +124,8 @@ | |
templates.env.globals["admin"] = self | ||
templates.env.globals["is_list"] = lambda x: isinstance(x, list) | ||
templates.env.globals["get_object_identifier"] = get_object_identifier | ||
templates.env.globals["value_is_filepath"] = value_is_filepath | ||
templates.env.globals["get_filename_from_path"] = get_filename_from_path | ||
|
||
return templates | ||
|
||
|
@@ -311,6 +321,21 @@ | |
if request.path_params["export_type"] not in model_view.export_types: | ||
raise HTTPException(status_code=404) | ||
|
||
async def _get_file(self, request: Request) -> Path: | ||
"""Get file path""" | ||
|
||
identity = request.path_params["identity"] | ||
identifier = request.path_params["pk"] | ||
column_name = request.path_params["column_name"] | ||
|
||
model_view = self._find_model_view(identity) | ||
file_path = await model_view.get_object_filepath(identifier, column_name) | ||
|
||
request_path = Path(file_path) | ||
if not request_path.is_file(): | ||
raise HTTPException(status_code=404) | ||
return request_path | ||
|
||
|
||
class Admin(BaseAdminView): | ||
"""Main entrypoint to admin interface. | ||
|
@@ -417,6 +442,18 @@ | |
), | ||
Route("/login", endpoint=self.login, name="login", methods=["GET", "POST"]), | ||
Route("/logout", endpoint=self.logout, name="logout", methods=["GET"]), | ||
Route( | ||
"/{identity}/{pk:path}/{column_name}/download/", | ||
endpoint=self.download_file, | ||
name="file_download", | ||
methods=["GET"], | ||
), | ||
Route( | ||
"/{identity}/{pk:path}/{column_name}/read/", | ||
endpoint=self.reed_file, | ||
name="file_read", | ||
methods=["GET"], | ||
), | ||
] | ||
|
||
self.admin.router.routes = routes | ||
|
@@ -490,7 +527,6 @@ | |
@login_required | ||
async def create(self, request: Request) -> Response: | ||
"""Create model endpoint.""" | ||
|
||
await self._create(request) | ||
|
||
identity = request.path_params["identity"] | ||
|
@@ -626,6 +662,18 @@ | |
await self.authentication_backend.logout(request) | ||
return RedirectResponse(request.url_for("admin:index"), status_code=302) | ||
|
||
async def download_file(self, request: Request) -> Response: | ||
"""Download file endpoint.""" | ||
request_path = await self._get_file(request) | ||
return FileResponse(request_path, filename=request_path.name) | ||
|
||
async def reed_file(self, request: Request) -> Response: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need to add deco |
||
"""Read file endpoint.""" | ||
request_path = await self._get_file(request) | ||
return FileResponse( | ||
request_path, filename=request_path.name, content_disposition_type="inline" | ||
) | ||
|
||
async def ajax_lookup(self, request: Request) -> Response: | ||
"""Ajax lookup route.""" | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
import io | ||
import re | ||
from typing import Any, AsyncGenerator | ||
|
||
import pytest | ||
from fastapi_storages import FileSystemStorage, StorageFile | ||
from fastapi_storages.integrations.sqlalchemy import FileType | ||
from httpx import AsyncClient | ||
from sqlalchemy import Column, Integer, select | ||
from sqlalchemy.ext.asyncio import AsyncSession | ||
from sqlalchemy.orm import declarative_base, sessionmaker | ||
from starlette.applications import Starlette | ||
from starlette.datastructures import UploadFile | ||
|
||
from sqladmin import Admin, ModelView | ||
from tests.common import async_engine as engine | ||
|
||
pytestmark = pytest.mark.anyio | ||
|
||
Base = declarative_base() # type: Any | ||
session_maker = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False) | ||
|
||
app = Starlette() | ||
admin = Admin(app=app, engine=engine) | ||
|
||
storage = FileSystemStorage(path=".uploads") | ||
|
||
|
||
class User(Base): | ||
__tablename__ = "users" | ||
|
||
id = Column(Integer, primary_key=True) | ||
file = Column(FileType(FileSystemStorage(".uploads"))) | ||
|
||
|
||
@pytest.fixture | ||
async def prepare_database() -> AsyncGenerator[None, None]: | ||
async with engine.begin() as conn: | ||
await conn.run_sync(Base.metadata.create_all) | ||
yield | ||
async with engine.begin() as conn: | ||
await conn.run_sync(Base.metadata.drop_all) | ||
|
||
await engine.dispose() | ||
|
||
|
||
@pytest.fixture | ||
async def client(prepare_database: Any) -> AsyncGenerator[AsyncClient, None]: | ||
async with AsyncClient(app=app, base_url="http://testserver") as c: | ||
yield c | ||
|
||
|
||
class UserAdmin(ModelView, model=User): | ||
column_list = [User.id, User.file] | ||
|
||
|
||
admin.add_view(UserAdmin) | ||
|
||
|
||
async def _query_user() -> Any: | ||
stmt = select(User).limit(1) | ||
async with session_maker() as s: | ||
result = await s.execute(stmt) | ||
return result.scalar_one() | ||
|
||
|
||
async def test_detail_view(client: AsyncClient) -> None: | ||
async with session_maker() as session: | ||
user = User(file=UploadFile(filename="upload.txt", file=io.BytesIO(b"abc"))) | ||
session.add(user) | ||
await session.commit() | ||
|
||
response = await client.get("/admin/user/details/1") | ||
|
||
user = await _query_user() | ||
|
||
assert response.status_code == 200 | ||
assert isinstance(user.file, StorageFile) is True | ||
assert user.file.name == "upload.txt" | ||
assert user.file.path == ".uploads/upload.txt" | ||
assert user.file.open().read() == b"abc" | ||
|
||
assert ( | ||
'<span class="me-1"><i class="fa-solid fa-download"></i></span>' | ||
in response.text | ||
) | ||
assert '<a href="http://testserver/admin/user/1/file/read/">' in response.text | ||
assert '<a href="http://testserver/admin/user/1/file/download/">' in response.text | ||
|
||
|
||
async def test_list_view(client: AsyncClient) -> None: | ||
async with session_maker() as session: | ||
for i in range(10): | ||
user = User(file=UploadFile(filename="upload.txt", file=io.BytesIO(b"abc"))) | ||
session.add(user) | ||
await session.commit() | ||
|
||
response = await client.get("/admin/user/list") | ||
|
||
user = await _query_user() | ||
|
||
assert response.status_code == 200 | ||
assert isinstance(user.file, StorageFile) is True | ||
assert user.file.name == "upload.txt" | ||
assert user.file.path == ".uploads/upload.txt" | ||
assert user.file.open().read() == b"abc" | ||
|
||
pattern_span = re.compile( | ||
r'<span class="me-1"><i class="fa-solid fa-download"></i></span>' | ||
) | ||
pattern_a_read = re.compile( | ||
r'<a href="http://testserver/admin/user/\d+/file/read/">' | ||
) | ||
pattern_a_download = re.compile( | ||
r'<a href="http://testserver/admin/user/\d+/file/download/">' | ||
) | ||
|
||
count_span = len(pattern_span.findall(response.text)) | ||
count_a_read = len(pattern_a_read.findall(response.text)) | ||
count_a_download = len(pattern_a_download.findall(response.text)) | ||
|
||
assert count_span == count_a_read == count_a_download == 10 | ||
|
||
|
||
async def test_file_download(client: AsyncClient) -> None: | ||
async with session_maker() as session: | ||
for i in range(10): | ||
user = User(file=UploadFile(filename="upload.txt", file=io.BytesIO(b"abc"))) | ||
session.add(user) | ||
await session.commit() | ||
|
||
response = await client.get("/admin/user/1/file/download/") | ||
|
||
assert response.status_code == 200 | ||
|
||
with open(".uploads/download.txt", "wb") as local_file: | ||
local_file.write(response.content) | ||
|
||
assert open(".uploads/download.txt", "rb").read() == b"abc" | ||
|
||
|
||
async def test_file_read(client: AsyncClient) -> None: | ||
async with session_maker() as session: | ||
for i in range(10): | ||
user = User(file=UploadFile(filename="upload.txt", file=io.BytesIO(b"abc"))) | ||
session.add(user) | ||
await session.commit() | ||
|
||
response = await client.get("/admin/user/1/file/read/") | ||
assert response.status_code == 200 | ||
assert response.text == "abc" |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to add deco
@login_required
to check authorization