-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds user model, registration process, validators and schemas. Prepared code for Authentication integration and Accouunt email verification
- Loading branch information
1 parent
4d14a58
commit 4f879a7
Showing
7 changed files
with
303 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
from fastapi import HTTPException | ||
from tortoise import models, fields | ||
|
||
from ezymart_by_evoq.service.base_model import ( | ||
BaseDateCreatedModifiedModel, | ||
AbstractBaseModel, | ||
) | ||
from ezymart_by_evoq.service.constants import NULLABLE | ||
from ezymart_by_evoq.users.utils import hash_password, compare_passwords | ||
from ezymart_by_evoq.users.validatiors import EmailValidator, PasswordValidator | ||
|
||
|
||
class User(AbstractBaseModel, BaseDateCreatedModifiedModel, models.Model): | ||
email = fields.CharField( | ||
unique=True, | ||
max_length=255, | ||
validators=[EmailValidator()] | ||
) | ||
password = fields.CharField( | ||
max_length=128, | ||
validators=[PasswordValidator()] | ||
) | ||
|
||
first_name = fields.CharField(max_length=150, **NULLABLE) | ||
last_name = fields.CharField(max_length=150, **NULLABLE) | ||
telephone = fields.CharField(max_length=50, **NULLABLE) | ||
telegram_user_id = fields.IntField(**NULLABLE) | ||
country = fields.CharField( | ||
max_length=50, | ||
**NULLABLE | ||
) | ||
city = fields.CharField( | ||
max_length=50, | ||
**NULLABLE | ||
) | ||
|
||
last_login = fields.DatetimeField(**NULLABLE) | ||
|
||
is_superuser = fields.BooleanField(default=False) | ||
is_staff = fields.BooleanField(default=False) | ||
is_active = fields.BooleanField(default=False) | ||
is_verified = fields.BooleanField(default=False) | ||
|
||
def __str__(self): | ||
return self.email | ||
|
||
def verify_password(self, password): | ||
return compare_passwords(self.password, password) | ||
|
||
def full_name(self) -> str: | ||
""" | ||
Returns the best name | ||
""" | ||
if self.first_name or self.last_name: | ||
return (f"{self.first_name.title() or ''} " | ||
f"{self.last_name.title() or ''}").strip() | ||
return self.email | ||
|
||
class PydanticMeta: | ||
computed = ["full_name"] | ||
exclude = ["password"] | ||
|
||
@classmethod | ||
def create_superuser(cls): | ||
email = input("Enter email: ") | ||
password = input("Enter password: ") | ||
password2 = input("Enter password again: ") | ||
if password != password2: | ||
raise HTTPException( | ||
status_code=400, | ||
detail="Passwords do not match" | ||
) | ||
|
||
user = cls( | ||
email=email, | ||
is_superuser=True, | ||
is_staff=True, | ||
is_active=True | ||
) | ||
|
||
hashed_password = hash_password(password) | ||
user.password = hashed_password | ||
user.save() | ||
print("Superuser created successfully") | ||
return user |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
from typing import Annotated | ||
|
||
from fastapi import APIRouter, status, HTTPException, Depends | ||
|
||
from ezymart_by_evoq.auth.schemas import oauth2_scheme | ||
from ezymart_by_evoq.users import schemas | ||
from ezymart_by_evoq.users.models import User | ||
from ezymart_by_evoq.users.services import get_current_user | ||
from ezymart_by_evoq.users.utils import hash_password | ||
|
||
user_router = APIRouter(prefix="/users", tags=["Users"]) | ||
|
||
|
||
@user_router.post( | ||
"/register/", | ||
status_code=status.HTTP_201_CREATED, | ||
response_model=schemas.User_Pydantic | ||
) | ||
async def register( | ||
user_data: schemas.UserInPydantic | ||
): | ||
user_data.validate_password() | ||
|
||
user_data = user_data.model_dump( | ||
exclude_unset=True, | ||
exclude={"password2"} | ||
) | ||
|
||
user_data["password"] = hash_password(user_data.get("password")) | ||
user = await User.create( | ||
**user_data | ||
) | ||
|
||
return await schemas.User_Pydantic.from_tortoise_orm(user) | ||
|
||
|
||
@user_router.get("/") | ||
def get_user( | ||
user: Annotated[schemas.User_Pydantic, Depends(get_current_user)] | ||
): | ||
return user | ||
|
||
# @user_router.get("/", response_model=List[User_Pydantic]) | ||
# async def get_users(): | ||
# return await User_Pydantic.from_queryset(User.all()) | ||
|
||
|
||
# @user_router.post("/", response_model=User_Pydantic) | ||
# async def create_user(user: UserIn_Pydantic): | ||
# user_obj = await User.create(**user.model_dump(exclude_unset=True)) | ||
# return await User_Pydantic.from_tortoise_orm(user_obj) | ||
|
||
# | ||
# @user_router.get("/{user_id}", response_model=User_Pydantic) | ||
# async def get_user(user_id: int): | ||
# return await User_Pydantic.from_queryset_single(User.get(id=user_id)) | ||
# | ||
# | ||
# @user_router.put("/{user_id}", response_model=User_Pydantic) | ||
# async def update_user(user_id: int, user: UserIn_Pydantic): | ||
# await User.filter(id=user_id).update(**user.model_dump( | ||
# exclude_unset=True)) | ||
# return await User_Pydantic.from_queryset_single(User.get(id=user_id)) | ||
# | ||
# | ||
# @user_router.delete("/{user_id}", response_model=Status) | ||
# async def delete_user(user_id: int): | ||
# deleted_count = await User.filter(id=user_id).delete() | ||
# if not deleted_count: | ||
# raise HTTPException(status_code=404, detail=f"User {user_id} not | ||
# found") | ||
# return Status(message=f"Deleted user {user_id}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from fastapi import HTTPException, status | ||
from pydantic import BaseModel, Field, field_validator, EmailStr | ||
from tortoise.contrib.pydantic import pydantic_model_creator | ||
|
||
from ezymart_by_evoq.users.models import User | ||
from ezymart_by_evoq.users.validatiors import PasswordValidator | ||
|
||
User_Pydantic = pydantic_model_creator( | ||
User, | ||
name="User", | ||
exclude=("password",) | ||
) | ||
|
||
|
||
class UserInPydantic(BaseModel): | ||
email: EmailStr = Field(..., max_length=255) | ||
password: str = Field(..., max_length=255) | ||
password2: str = Field(..., max_length=255) | ||
first_name: str | None = Field(None, max_length=255) | ||
last_name: str | None = Field(None, max_length=255) | ||
telephone: str | None = Field(None, max_length=255) | ||
telegram_user_id: int | None = Field(None) | ||
country: str | None = Field(None, max_length=255) | ||
city: str | None = Field(None, max_length=255) | ||
|
||
def validate_password(self): | ||
if self.password != self.password2: | ||
raise HTTPException( | ||
status_code=status.HTTP_400_BAD_REQUEST, | ||
detail="Passwords do not match" | ||
) | ||
validator = PasswordValidator() | ||
validator(self.password) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
from typing import Annotated | ||
|
||
from fastapi import Depends | ||
|
||
from ezymart_by_evoq.auth.schemas import oauth2_scheme | ||
from ezymart_by_evoq.auth.services import get_email_from_token | ||
from ezymart_by_evoq.users import schemas | ||
from ezymart_by_evoq.users.models import User | ||
|
||
|
||
async def get_current_user( | ||
access_token: Annotated[str, Depends(oauth2_scheme)] | ||
): | ||
email = get_email_from_token(access_token) | ||
user = await User.filter(email=email).first() | ||
return await schemas.User_Pydantic.from_tortoise_orm(user) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import hashlib | ||
import hmac | ||
|
||
from ezymart_by_evoq.service.constants import ( | ||
PWD_HASH_ITERATIONS, | ||
PWD_HASH_SALT, | ||
CRYPTOGRAPHIC_HASH_FUNCTION, | ||
) | ||
|
||
|
||
def hash_password(password): | ||
""" | ||
Hash a password using a cryptographic hash function. | ||
:param password: The password to hash. | ||
:return: The hashed password. | ||
""" | ||
return hashlib.pbkdf2_hmac( | ||
CRYPTOGRAPHIC_HASH_FUNCTION, | ||
password.encode('utf-8'), | ||
PWD_HASH_SALT, | ||
PWD_HASH_ITERATIONS | ||
) | ||
|
||
|
||
async def compare_passwords(db_pwd, received_pwd) -> bool: | ||
""" | ||
Compares two passwords for equality. | ||
:param db_pwd: A base64 encoded hashed password string from | ||
the database. | ||
:param received_pwd: A plain text password string received from | ||
the user. | ||
:return: A boolean indicating whether the passwords match. | ||
""" | ||
db_password = db_pwd | ||
received_password = str(hashlib.pbkdf2_hmac( | ||
CRYPTOGRAPHIC_HASH_FUNCTION, | ||
received_pwd.encode('utf-8'), | ||
PWD_HASH_SALT, | ||
PWD_HASH_ITERATIONS | ||
)) | ||
is_equal = hmac.compare_digest( | ||
db_password, | ||
received_password | ||
) | ||
return is_equal |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
import re | ||
|
||
from fastapi import HTTPException | ||
from starlette import status | ||
from tortoise.validators import Validator | ||
|
||
|
||
class EmailValidator(Validator): | ||
""" | ||
Validate an email address for basic format compliance. | ||
This function checks if the given email address follows a basic format: | ||
- It contains only valid characters including letters, numbers, and | ||
certain special characters. | ||
- It has the "@" symbol followed by a domain name. | ||
Args: | ||
email_address (str): The email address to validate. | ||
Returns: | ||
bool: True if the email address is valid, False otherwise. | ||
""" | ||
|
||
def __call__(self, email_address: str): | ||
if not re.search( | ||
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", | ||
email_address | ||
): | ||
raise HTTPException( | ||
status_code=status.HTTP_401_UNAUTHORIZED, | ||
detail=f"The email address {email_address} is not valid" | ||
|
||
) | ||
|
||
|
||
class PasswordValidator(Validator): | ||
|
||
def __call__(self, password: str): | ||
if not re.search( | ||
r"^(?=.*[A-Z])(?=.*[$%&!]).{8,}$", | ||
password | ||
): | ||
raise HTTPException( | ||
status_code=status.HTTP_401_UNAUTHORIZED, | ||
detail="The password is not valid" | ||
|
||
) |