From 2949034da4b4f9556274646bc4eb7bd075dce703 Mon Sep 17 00:00:00 2001 From: Jack Keane Date: Mon, 18 Nov 2024 16:41:20 -0800 Subject: [PATCH 1/3] add check of field type before running json.loads --- flask_pydantic_spec/flask_backend.py | 2 +- flask_pydantic_spec/utils.py | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/flask_pydantic_spec/flask_backend.py b/flask_pydantic_spec/flask_backend.py index d10fa12..40bc247 100644 --- a/flask_pydantic_spec/flask_backend.py +++ b/flask_pydantic_spec/flask_backend.py @@ -137,7 +137,7 @@ def request_validation( ) -> None: raw_query = request.args or None if raw_query is not None: - req_query = parse_multi_dict(raw_query) + req_query = parse_multi_dict(raw_query, query) else: req_query = {} if request.content_type and "application/json" in request.content_type: diff --git a/flask_pydantic_spec/utils.py b/flask_pydantic_spec/utils.py index 2e38c31..ed9a5f8 100644 --- a/flask_pydantic_spec/utils.py +++ b/flask_pydantic_spec/utils.py @@ -13,10 +13,12 @@ List, Dict, Iterable, + Type, ) from werkzeug.datastructures import MultiDict from pydantic import BaseModel +from pydantic.fields import SHAPE_LIST, SHAPE_DICT from werkzeug.routing import Rule from .types import Response, RequestBase, Request @@ -197,14 +199,16 @@ def default_after_handler( ) -def parse_multi_dict(input: MultiDict) -> Dict[str, Any]: +def parse_multi_dict(input: MultiDict, schema: Optional[Type[BaseModel]] = None) -> Dict[str, Any]: result = {} for key, value in input.to_dict(flat=False).items(): if len(value) == 1: - try: - value_to_use = json.loads(value[0]) - except (TypeError, JSONDecodeError): - value_to_use = value[0] + value_to_use = value[0] + if not schema or any(shape == schema.__fields__[key].shape for shape in [SHAPE_LIST, SHAPE_DICT]): + try: + value_to_use = json.loads(value[0]) + except (TypeError, JSONDecodeError): + pass else: value_to_use = value result[key] = value_to_use From 822ac188c6a9ca0cb758a6104fa0cc242e9b8416 Mon Sep 17 00:00:00 2001 From: Jack Keane Date: Mon, 18 Nov 2024 16:46:10 -0800 Subject: [PATCH 2/3] run formatter --- flask_pydantic_spec/spec.py | 4 +++- flask_pydantic_spec/utils.py | 9 +++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/flask_pydantic_spec/spec.py b/flask_pydantic_spec/spec.py index b248162..5fb9a9d 100644 --- a/flask_pydantic_spec/spec.py +++ b/flask_pydantic_spec/spec.py @@ -217,7 +217,9 @@ def sync_validate(*args: Any, **kwargs: Any) -> FlaskResponse: "200": {"description": "ok"} } self.class_view_api_info[view_name][method]["no_api_key"] = no_api_key - self.class_view_api_info[view_name][method]["is_token_route"] = is_token_route + self.class_view_api_info[view_name][method][ + "is_token_route" + ] = is_token_route # register for name, model in zip( diff --git a/flask_pydantic_spec/utils.py b/flask_pydantic_spec/utils.py index ed9a5f8..fe4853f 100644 --- a/flask_pydantic_spec/utils.py +++ b/flask_pydantic_spec/utils.py @@ -199,12 +199,17 @@ def default_after_handler( ) -def parse_multi_dict(input: MultiDict, schema: Optional[Type[BaseModel]] = None) -> Dict[str, Any]: +def parse_multi_dict( + input: MultiDict, schema: Optional[Type[BaseModel]] = None +) -> Dict[str, Any]: result = {} for key, value in input.to_dict(flat=False).items(): if len(value) == 1: value_to_use = value[0] - if not schema or any(shape == schema.__fields__[key].shape for shape in [SHAPE_LIST, SHAPE_DICT]): + if not schema or any( + shape == schema.__fields__[key].shape + for shape in [SHAPE_LIST, SHAPE_DICT] + ): try: value_to_use = json.loads(value[0]) except (TypeError, JSONDecodeError): From cb7428fa6b385f3079888eafdf5fdc8a6a9261b9 Mon Sep 17 00:00:00 2001 From: Jack Keane Date: Mon, 18 Nov 2024 17:12:53 -0800 Subject: [PATCH 3/3] add typing annotation --- flask_pydantic_spec/spec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flask_pydantic_spec/spec.py b/flask_pydantic_spec/spec.py index 5fb9a9d..4c0265a 100644 --- a/flask_pydantic_spec/spec.py +++ b/flask_pydantic_spec/spec.py @@ -430,7 +430,7 @@ def _generate_spec(self) -> Mapping[str, Any]: return self._generate_spec_common(routes) - def _get_route_security(self, no_api_key, is_token_route): + def _get_route_security(self, no_api_key: bool, is_token_route: bool) -> dict: if no_api_key: return {"security": []} elif is_token_route: