-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement administrative and leads user roles (#39)
- Loading branch information
Showing
6 changed files
with
169 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
# Current scopes: | ||
# read: | ||
# all | ||
# projects | ||
# events | ||
# tags | ||
# write: | ||
# all | ||
# events | ||
# projects | ||
# --------------- | ||
# And current roles: admin, leads | ||
import functools | ||
import inspect | ||
from typing import Any, Callable, Coroutine, Optional, TypeVar | ||
|
||
from supertokens_python.exceptions import GeneralError | ||
from supertokens_python.recipe.session import SessionContainer | ||
from supertokens_python.recipe.session.exceptions import ( | ||
ClaimValidationError, | ||
InvalidClaimsError, | ||
) | ||
from supertokens_python.recipe.userroles import UserRoleClaim | ||
|
||
T = TypeVar("T") | ||
|
||
Coro = Coroutine[Any, Any, T] | ||
CoroFunc = Callable[..., Coro[Any]] | ||
|
||
|
||
def validate_parameters(func: CoroFunc): | ||
sig = inspect.signature(func) | ||
if not sig.parameters.get("session"): | ||
raise GeneralError( | ||
f"No <session> argument found within function <{func.__name__}>" | ||
) | ||
|
||
|
||
def has_role(item: str, /): | ||
def decorator(func: CoroFunc) -> CoroFunc: | ||
validate_parameters(func) | ||
|
||
@functools.wraps(func) | ||
async def wrapper( | ||
session: Optional[SessionContainer], *args, **kwargs | ||
) -> CoroFunc: | ||
if not session: | ||
raise GeneralError("Must have valid session") | ||
|
||
roles = await session.get_claim_value(UserRoleClaim) | ||
if not roles or item not in roles: | ||
raise InvalidClaimsError( | ||
f"User does not have role <{item}>", | ||
[ClaimValidationError(UserRoleClaim.key, None)], | ||
) | ||
|
||
return await func(*args, **kwargs) | ||
|
||
return wrapper | ||
|
||
return decorator | ||
|
||
|
||
def has_any_role(*items: str): | ||
def decorator(func: CoroFunc) -> CoroFunc: | ||
validate_parameters(func) | ||
|
||
@functools.wraps(func) | ||
async def wrapper( | ||
session: Optional[SessionContainer], *args, **kwargs | ||
) -> CoroFunc: | ||
if not session: | ||
raise GeneralError("Must have valid session") | ||
|
||
user_roles = await session.get_claim_value(UserRoleClaim) | ||
|
||
if not user_roles: | ||
raise InvalidClaimsError( | ||
f"User does not any roles listed: {', '.join(role for role in items).rstrip()}", | ||
[ClaimValidationError(UserRoleClaim.key, None)], | ||
) | ||
if not any(role in user_roles for role in items): | ||
# May need to be tested more | ||
raise InvalidClaimsError( | ||
f"Missing Roles: {', '.join(role for role in items if role not in user_roles).rstrip()}", | ||
[ClaimValidationError(UserRoleClaim.key, None)], | ||
) | ||
|
||
return await func(*args, **kwargs) | ||
|
||
return wrapper | ||
|
||
return decorator | ||
|
||
|
||
def has_admin_role(): | ||
return has_role("admin") | ||
|
||
|
||
def has_leads_role(): | ||
return has_role("leads") |