diff --git a/alexandria/core/api.py b/alexandria/core/api.py index f7131772..ef85062e 100644 --- a/alexandria/core/api.py +++ b/alexandria/core/api.py @@ -1,8 +1,10 @@ import logging +from os.path import splitext from typing import Optional, Tuple from django.conf import settings from django.core.files import File +from django.utils.translation import gettext from alexandria.core import models, presign_urls from alexandria.core.validations import validate_file @@ -35,6 +37,53 @@ def verify_signed_components( ) +def copy_document( + document: models.Document, user: str, group: str, category: models.Category +): + """ + Copy a document and all its original files to a new document. + + This function eases the copying of documents by automatically setting important fields. + Uses `create_file` to copy the original document files. + """ + + basename, ext = splitext(document.title) + document_title = gettext("{basename} (copy){ext}").format( + basename=basename, ext=ext + ) + new_document = models.Document.objects.create( + title=document_title, + description=document.description, + metainfo=document.metainfo, + category=category, + created_by_user=user, + created_by_group=group, + modified_by_user=user, + modified_by_group=group, + ) + + # Copying only the originals - create_file() will create the thumbnails + document_files = models.File.objects.filter( + document=document, variant=models.File.Variant.ORIGINAL.value + ).order_by("created_at") + new_files = [] + for document_file in document_files: + new_files.append( + create_file( + name=document_file.name, + document=new_document, + content=document_file.content, + mime_type=document_file.mime_type, + size=document_file.size, + user=document_file.created_by_user, + group=document_file.created_by_group, + metainfo=document_file.metainfo, + ) + ) + + return new_document + + def create_document_file( user: str, group: str, diff --git a/alexandria/core/serializers.py b/alexandria/core/serializers.py index ea7921e2..8b9d9af7 100644 --- a/alexandria/core/serializers.py +++ b/alexandria/core/serializers.py @@ -5,7 +5,6 @@ from django.db.transaction import atomic from django.template.defaultfilters import slugify from django.utils import translation -from django.utils.module_loading import import_string from generic_permissions.validation import ValidatorMixin from generic_permissions.visibilities import ( VisibilityResourceRelatedField, @@ -15,6 +14,8 @@ from rest_framework_json_api.relations import SerializerMethodResourceRelatedField from rest_framework_json_api.views import Serializer +from alexandria.core.utils import get_user_and_group_from_request + from . import models log = logging.getLogger(__name__) @@ -27,15 +28,10 @@ class BaseSerializer( created_at = serializers.DateTimeField(read_only=True) - def get_user_and_group(self): - return import_string(settings.ALEXANDRIA_GET_USER_AND_GROUP_FUNCTION)( - self.context.get("request") - ) - def is_valid(self, *args, **kwargs): # Prime data so the validators are called (and default values filled # if client didn't pass them.) - user, group = self.get_user_and_group() + user, group = get_user_and_group_from_request(self.context.get("request")) self.initial_data.setdefault("created_by_group", group) self.initial_data.setdefault("modified_by_group", group) self.initial_data.setdefault("created_by_user", user) @@ -45,7 +41,7 @@ def is_valid(self, *args, **kwargs): def validate(self, *args, **kwargs): validated_data = super().validate(*args, **kwargs) - user, group = self.get_user_and_group() + user, group = get_user_and_group_from_request(self.context.get("request")) validated_data["modified_by_user"] = user validated_data["modified_by_group"] = group @@ -323,7 +319,7 @@ def get_webdav_url(self, instance): request = self.context.get("request") host = request.get_host() if request else "localhost" scheme = request.scheme if request else "http" - user, group = self.get_user_and_group() + user, group = get_user_and_group_from_request(self.context.get("request")) return instance.get_latest_original().get_webdav_url( user, group, f"{scheme}://{host}" ) @@ -363,3 +359,23 @@ class Meta: "name", ) read_only_fields = fields + + +class CopyCategorySerializer(Serializer): + data = serializers.DictField() + + def validate(self, data): + validated_data = super().validate(data) + category_id = validated_data.get("data", {}).get("category") + + if category_id: + try: + category = models.Category.objects.get(pk=category_id) + except models.Category.DoesNotExist: + raise serializers.ValidationError("Category not found") + else: + category = None + + validated_data["category"] = category + + return validated_data diff --git a/alexandria/core/tests/test_api.py b/alexandria/core/tests/test_api.py index 31ad072b..4a5d90c2 100644 --- a/alexandria/core/tests/test_api.py +++ b/alexandria/core/tests/test_api.py @@ -4,6 +4,7 @@ from alexandria.core import api from alexandria.core.factories import FileData +from alexandria.core.models import File def test_create_document_file(db, category): @@ -25,6 +26,89 @@ def test_create_document_file(db, category): assert file.name == "Mee.pdf" +@pytest.mark.parametrize( + "same_category", + [ + True, + False, + ], +) +def test_copy_document_api(db, category, category_factory, same_category): + # initial document with one file + input_doc, first_file = api.create_document_file( + "Foo", + "Baz", + category, + "Bar.pdf", + "Mee.pdf", + SimpleUploadedFile( + name="test.png", + content=FileData.png, + content_type="png", + ), + "image/png", + 1, + ) + + # add an extra file to the document + extra_file = api.create_file( + input_doc, + "Foo2", + "Baz2", + "Mee2.pdf", + SimpleUploadedFile( + name="test2.jpg", + content=FileData.png, + content_type="jpg", + ), + "image/jpeg", + 2, + ) + + to_category = category if same_category else category_factory() + copied_doc = api.copy_document(input_doc, "CopyUser", "CopyGroup", to_category) + files = copied_doc.files.order_by("variant", "created_at") + + assert copied_doc.title == "Bar (copy).pdf" + assert copied_doc.category.pk == to_category.pk + # document copy will have the user/group of the user who copied it + assert copied_doc.created_by_user == "CopyUser" + assert copied_doc.created_by_group == "CopyGroup" + + # 2 copied files + 2 new thumbnails + assert len(files) == 4 + + # original 1 + assert first_file.pk != files[0].pk + assert files[0].document.pk == copied_doc.pk + assert files[0].variant == File.Variant.ORIGINAL + assert files[0].name == "Mee.pdf" + assert files[0].mime_type == "image/png" + assert files[0].size == 1 + # files will retain the user/group of the original document + assert files[0].created_by_user == "Foo" + assert files[0].created_by_group == "Baz" + + # new thumbnail for first file + assert files[1].document.pk == copied_doc.pk + assert files[1].variant == File.Variant.ORIGINAL + + # original 2 + assert extra_file.pk != files[1].pk + assert files[2].document.pk == copied_doc.pk + assert files[2].variant == File.Variant.THUMBNAIL + assert files[1].name == "Mee2.pdf" + assert files[1].mime_type == "image/jpeg" + assert files[1].size == 2 + # files will retain the user/group of the original document + assert files[1].created_by_user == "Foo2" + assert files[1].created_by_group == "Baz2" + + # new thumbnail for extra file + assert files[3].document.pk == copied_doc.pk + assert files[3].variant == File.Variant.THUMBNAIL + + def test_presigning_api(db, file): _, expires, signature = api.make_signature_components( file.pk, "testserver", download_path="/foo" diff --git a/alexandria/core/tests/test_views.py b/alexandria/core/tests/test_views.py index 28a8704d..9d57f30b 100644 --- a/alexandria/core/tests/test_views.py +++ b/alexandria/core/tests/test_views.py @@ -1,4 +1,5 @@ import io +import json import uuid import zipfile from pathlib import Path @@ -398,6 +399,70 @@ def test_move_document_to_new_category( assert response.status_code == HTTP_200_OK +@pytest.mark.parametrize( + "to_category,expected_status", + [ + ("same", HTTP_201_CREATED), + ("new", HTTP_201_CREATED), + ("not_defined", HTTP_201_CREATED), + ("non_existent", HTTP_400_BAD_REQUEST), + ("not_allowed", HTTP_400_BAD_REQUEST), + ], +) +def test_copy_document( + admin_client, + category_factory, + file_factory, + document_factory, + to_category, + expected_status, +): + category_not_allowed = category_factory.create(allowed_mime_types=["plain/text"]) + category = category_factory() + document = document_factory(category=category) + file_factory.create(document=document, name="Image.jpeg", mime_type="image/jpeg") + + if to_category == "non_existent": + target_category_pk = 999 + elif to_category == "new": + target_category_pk = category_factory().pk + elif to_category == "not_allowed": + target_category_pk = category_not_allowed.pk + else: + target_category_pk = document.category.pk + + request_body = { + "data": { + "category": str(target_category_pk), + "type": "documents", + "id": str(document.pk), + }, + } + + if to_category == "not_defined": + request_body["data"].pop("category") + + url = reverse("document-copy", args=[document.pk]) + response = admin_client.post( + url, + data=json.dumps(request_body), + content_type="application/json", + ) + + assert response.status_code == expected_status + if to_category == "not_allowed": + assert ( + response.json()["errors"][0]["detail"] + == f"File type image/jpeg is not allowed in category {category_not_allowed.pk}." + ) + + if expected_status == HTTP_201_CREATED: + assert ( + response.json()["data"]["relationships"]["category"]["data"]["id"] + == target_category_pk + ) + + @pytest.mark.parametrize( "presigned, expected_status", [(True, HTTP_200_OK), (False, HTTP_403_FORBIDDEN)], diff --git a/alexandria/core/utils.py b/alexandria/core/utils.py new file mode 100644 index 00000000..5afce543 --- /dev/null +++ b/alexandria/core/utils.py @@ -0,0 +1,8 @@ +from django.conf import settings +from django.utils.module_loading import import_string + + +def get_user_and_group_from_request(request): + """Return a 2-tuple of `user`, `group` from the given request.""" + getter_fn = import_string(settings.ALEXANDRIA_GET_USER_AND_GROUP_FUNCTION) + return getter_fn(request) diff --git a/alexandria/core/views.py b/alexandria/core/views.py index ee22ba22..5738a3c4 100644 --- a/alexandria/core/views.py +++ b/alexandria/core/views.py @@ -10,7 +10,6 @@ from django.core.exceptions import ValidationError as DjangoCoreValidationError from django.core.files.base import ContentFile from django.http import FileResponse -from django.utils.module_loading import import_string from django.utils.translation import gettext as _ from generic_permissions.permissions import AllowAny, PermissionViewMixin from generic_permissions.visibilities import VisibilityViewMixin @@ -38,8 +37,10 @@ RelatedMixin, ) +from alexandria.core.utils import get_user_and_group_from_request + from . import models, serializers -from .api import create_document_file +from .api import copy_document, create_document_file from .filters import ( CategoryFilterSet, DocumentFilterSet, @@ -114,6 +115,30 @@ def update(self, request, *args, **kwargs): return response + @action( + methods=["post"], + detail=True, + url_path="copy", + ) + def copy(self, request, pk=None): + document = self.get_object() + user, group = get_user_and_group_from_request(request) + + category_serializer = serializers.CopyCategorySerializer(data=request.data) + category_serializer.is_valid(raise_exception=True) + category = category_serializer.validated_data["category"] or document.category + + copied_document = copy_document( + document=document, + category=category, + user=user, + group=group, + ) + + serializer = self.get_serializer(copied_document) + headers = self.get_success_headers(serializer.data) + return Response(serializer.data, status=HTTP_201_CREATED, headers=headers) + @action(methods=["post"], detail=True) def convert(self, request, pk=None): if not settings.ALEXANDRIA_ENABLE_PDF_CONVERSION: @@ -134,9 +159,7 @@ def convert(self, request, pk=None): response.raise_for_status() - user, group = import_string(settings.ALEXANDRIA_GET_USER_AND_GROUP_FUNCTION)( - request - ) + user, group = get_user_and_group_from_request(request) file_name = f"{splitext(file.name)[0]}.pdf" document_title = f"{splitext(document.title)[0]}.pdf"