Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved the reusability of the middleware by passing all headers ins… #3

Merged
merged 3 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/docs/examples/multiple.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ elegant solution to this. But the current solution is to mount multiple apps ins
**multiple.py**

```python
from typing import Tuple, List
from typing import Tuple, List, Dict
import uvicorn
from fastapi import FastAPI
from starlette.requests import Request
Expand All @@ -16,14 +16,14 @@ from fastapi_auth_middleware import AuthMiddleware, FastAPIUser


# The method you have to provide
def verify_authorization_header(auth_header: str) -> Tuple[List[str], FastAPIUser]:
def verify_header(headers: Dict) -> Tuple[List[str], FastAPIUser]:
user = FastAPIUser(first_name="Code", last_name="Specialist", user_id=1) # Usually you would decode the JWT here and verify its signature to extract the 'sub'
scopes = ["admin"] # You could for instance use the scopes provided in the JWT or request them by looking up the scopes with the 'sub' somewhere
return scopes, user


users_app = FastAPI()
users_app.add_middleware(AuthMiddleware, verify_authorization_header=verify_authorization_header) # Add the middleware with your verification method to the whole application
users_app.add_middleware(AuthMiddleware, verify_header=verify_header) # Add the middleware with your verification method to the whole application


@users_app.get('/') # Sample endpoint (secured)
Expand Down
6 changes: 3 additions & 3 deletions docs/docs/examples/simple_with_scopes.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Basic Example with Scopes

```python
from typing import Tuple, List
from typing import Tuple, List, Dict
import uvicorn
from fastapi import FastAPI
from starlette.authentication import requires
Expand All @@ -11,14 +11,14 @@ from fastapi_auth_middleware import AuthMiddleware, FastAPIUser


# The method you have to provide
def verify_authorization_header(auth_header: str) -> Tuple[List[str], FastAPIUser]:
def verify_header(headers: Dict) -> Tuple[List[str], FastAPIUser]:
user = FastAPIUser(first_name="Code", last_name="Specialist", user_id=1) # Usually you would decode the JWT here and verify its signature to extract the 'sub'
scopes = ["admin"] # You could for instance use the scopes provided in the JWT or request them by looking up the scopes with the 'sub' somewhere
return scopes, user


app = FastAPI()
app.add_middleware(AuthMiddleware, verify_authorization_header=verify_authorization_header) # Add the middleware with your verification method to the whole application
app.add_middleware(AuthMiddleware, verify_header=verify_header) # Add the middleware with your verification method to the whole application


@app.get('/home') # Sample endpoint (secured)
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ from starlette.authentication import BaseUser

...
# Takes a string that will look like 'Bearer eyJhbGc...'
def verify_authorization_header(auth_header: str) -> Tuple[List[str], BaseUser]: # Returns a Tuple of a List of scopes (string) and a BaseUser
def verify_header(headers: List[str]) -> Tuple[List[str], BaseUser]: # Returns a Tuple of a List of scopes (string) and a BaseUser
user = FastAPIUser(first_name="Code", last_name="Specialist", user_id=1) # Usually you would decode the JWT here and verify its signature to extract the 'sub'
scopes = [] # You could for instance use the scopes provided in the JWT or request them by looking up the scopes with the 'sub' somewhere
return scopes, user
Expand All @@ -53,7 +53,7 @@ from fastapi_auth_middleware import AuthMiddleware
...

app = FastAPI()
app.add_middleware(AuthMiddleware, verify_authorization_header=verify_authorization_header)
app.add_middleware(AuthMiddleware, verify_header=verify_header)
```

After adding this middleware, all requests will pass the `verify_authorization_header` function and contain the **scopes** as well as the **user object** as injected dependencies.
Expand Down
6 changes: 3 additions & 3 deletions examples/simple.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Tuple, List
from typing import Tuple, List, Dict
import uvicorn
from fastapi import FastAPI
from starlette.requests import Request
Expand All @@ -7,14 +7,14 @@


# The method you have to provide
def verify_authorization_header(auth_header: str) -> Tuple[List[str], FastAPIUser]:
def verify_header(headers: Dict) -> Tuple[List[str], FastAPIUser]:
user = FastAPIUser(first_name="Code", last_name="Specialist", user_id=1) # Usually you would decode the JWT here and verify its signature to extract the 'sub'
scopes = [] # You could for instance use the scopes provided in the JWT or request them by looking up the scopes with the 'sub' somewhere
return scopes, user


app = FastAPI()
app.add_middleware(AuthMiddleware, verify_authorization_header=verify_authorization_header) # Add the middleware with your verification method to the whole application
app.add_middleware(AuthMiddleware, verify_header=verify_header) # Add the middleware with your verification method to the whole application


@app.get('/') # Sample endpoint (secured)
Expand Down
6 changes: 3 additions & 3 deletions examples/simple_with_scopes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Tuple, List
from typing import Tuple, List, Dict
import uvicorn
from fastapi import FastAPI
from starlette.authentication import requires
Expand All @@ -8,14 +8,14 @@


# The method you have to provide
def verify_authorization_header(auth_header: str) -> Tuple[List[str], FastAPIUser]:
def verify_header(headers: Dict) -> Tuple[List[str], FastAPIUser]:
user = FastAPIUser(first_name="Code", last_name="Specialist", user_id=1) # Usually you would decode the JWT here and verify its signature to extract the 'sub'
scopes = ["admin"] # You could for instance use the scopes provided in the JWT or request them by looking up the scopes with the 'sub' somewhere
return scopes, user


app = FastAPI()
app.add_middleware(AuthMiddleware, verify_authorization_header=verify_authorization_header) # Add the middleware with your verification method to the whole application
app.add_middleware(AuthMiddleware, verify_header=verify_header) # Add the middleware with your verification method to the whole application


@app.get('/home') # Sample endpoint (secured)
Expand Down
21 changes: 10 additions & 11 deletions fastapi_auth_middleware/middleware.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import inspect
from typing import Tuple, Callable, List
from typing import Tuple, Callable, List, Dict

from fastapi import FastAPI
from starlette.authentication import AuthenticationBackend, AuthCredentials, AuthenticationError, BaseUser
Expand Down Expand Up @@ -46,13 +46,13 @@ def identity(self) -> str:
class FastAPIAuthBackend(AuthenticationBackend):
""" Auth Backend for FastAPI """

def __init__(self, verify_authorization_header: Callable[[str], Tuple[List[str], BaseUser]]):
def __init__(self, verify_header: Callable[[Dict], Tuple[List[str], BaseUser]]):
""" Auth Backend constructor. Part of an AuthenticationMiddleware as backend.

Args:
verify_authorization_header (callable): A function handle that returns a list of scopes and a BaseUser
verify_header (callable): A function handle that returns a list of scopes and a BaseUser
"""
self.verify_authorization_header = verify_authorization_header
self.verify_header = verify_header

async def authenticate(self, conn: HTTPConnection) -> Tuple[AuthCredentials, BaseUser]:
""" The 'magic' happens here. The authenticate method is invoked each time a route is called that the middleware is applied to.
Expand All @@ -67,11 +67,10 @@ async def authenticate(self, conn: HTTPConnection) -> Tuple[AuthCredentials, Bas
raise AuthenticationError("Authorization header missing")

try:
authorization_header: str = conn.headers["Authorization"]
if inspect.iscoroutinefunction(self.verify_authorization_header):
scopes, user = await self.verify_authorization_header(authorization_header)
if inspect.iscoroutinefunction(self.verify_header):
scopes, user = await self.verify_header(conn.headers)
else:
scopes, user = self.verify_authorization_header(authorization_header)
scopes, user = self.verify_header(conn.headers)

except Exception as exception:
raise AuthenticationError(exception) from None
Expand All @@ -82,15 +81,15 @@ async def authenticate(self, conn: HTTPConnection) -> Tuple[AuthCredentials, Bas
# noinspection PyPep8Naming
def AuthMiddleware(
app: FastAPI,
verify_authorization_header: Callable[[str], Tuple[List[str], BaseUser]],
verify_header: Callable[[str], Tuple[List[str], BaseUser]],
auth_error_handler: Callable[[Request, AuthenticationError], JSONResponse] = None
):
""" Factory method, returning an AuthenticationMiddleware
Intentionally not named with lower snake case convention as this is a factory method returning a class. Should feel like a class.

Args:
app (FastAPI): The FastAPI instance the middleware should be applied to. The `add_middleware` function of FastAPI adds the app as first argument by default.
verify_authorization_header (Callable[[str], Tuple[List[str], BaseUser]]): A function handle that returns a list of scopes and a BaseUser
verify_header (Callable[[str], Tuple[List[str], BaseUser]]): A function handle that returns a list of scopes and a BaseUser
auth_error_handler (Callable[[Request, Exception], JSONResponse]): Optional error handler for creating responses when an exception was raised in verify_authorization_header

Examples:
Expand All @@ -104,4 +103,4 @@ def verify_authorization_header(auth_header: str) -> Tuple[List[str], FastAPIUse
app.add_middleware(AuthMiddleware, verify_authorization_header=verify_authorization_header)
```
"""
return AuthenticationMiddleware(app, backend=FastAPIAuthBackend(verify_authorization_header), on_error=auth_error_handler)
return AuthenticationMiddleware(app, backend=FastAPIAuthBackend(verify_header), on_error=auth_error_handler)
18 changes: 9 additions & 9 deletions tests/test_middleware.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable
from typing import Callable, List, Dict

from _pytest.fixtures import fixture
from fastapi import FastAPI
Expand All @@ -11,13 +11,13 @@


# Sample verification function, does nothing of effect
def verify_authorization_header_basic(auth_header: str):
def verify_header(headers: Dict):
user = FastAPIUser(first_name="Code", last_name="Specialist", user_id=1)
scopes = ["authenticated"]
return scopes, user


async def verify_authorization_header_basic_admin_scope(auth_header: str):
async def verify_header_basic_admin_scope(headers: List[str]):
user = FastAPIUser(first_name="Code", last_name="Specialist", user_id=1)
scopes = ["admin"]
return scopes, user
Expand All @@ -28,9 +28,9 @@ def raise_exception_in_verify_authorization_header(_):


# Sample app with simple routes, takes a verify_authorization_header callable that is applied to the middleware
def fastapi_app(verify_authorization_header: Callable, auth_error_handler: Callable = None):
def fastapi_app(verify_header: Callable, auth_error_handler: Callable = None):
app = FastAPI()
app.add_middleware(AuthMiddleware, verify_authorization_header=verify_authorization_header, auth_error_handler=auth_error_handler)
app.add_middleware(AuthMiddleware, verify_header=verify_header, auth_error_handler=auth_error_handler)

@app.get("/")
def home():
Expand All @@ -57,12 +57,12 @@ class TestBasicBehaviour:

@fixture
def client(self) -> TestClient:
app = fastapi_app(verify_authorization_header_basic)
app = fastapi_app(verify_header)
return TestClient(app)

@fixture
def client_with_scopes(self) -> TestClient:
app = fastapi_app(verify_authorization_header_basic_admin_scope)
app = fastapi_app(verify_header_basic_admin_scope)
return TestClient(app)

def test_home_fail_no_header(self, client: TestClient):
Expand All @@ -81,7 +81,7 @@ def test_scopes(self, client: TestClient, client_with_scopes: TestClient):
assert client_with_scopes.get("/admin-scope", headers={"Authorization": "ey.."}).status_code == 200 # Contains the requested scope

def test_fail_auth_error(self):
app = fastapi_app(verify_authorization_header=raise_exception_in_verify_authorization_header)
app = fastapi_app(verify_header=raise_exception_in_verify_authorization_header)
client_with_auth_error = TestClient(app=app)

response = client_with_auth_error.get('/', headers={"Authorization": "ey.."})
Expand All @@ -92,7 +92,7 @@ def handle_auth_error(request: Request, exception: AuthenticationError):
assert isinstance(exception, AuthenticationError)
return JSONResponse(content={'message': str(exception)}, status_code=401)

app = fastapi_app(verify_authorization_header=raise_exception_in_verify_authorization_header, auth_error_handler=handle_auth_error)
app = fastapi_app(verify_header=raise_exception_in_verify_authorization_header, auth_error_handler=handle_auth_error)
client_with_auth_error = TestClient(app=app)

response = client_with_auth_error.get('/', headers={"Authorization": "ey.."})
Expand Down