Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solution #5

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:

- name: Run flake8
run: |
poetry run flake8 src
poetry run flake8 src --exclude=src/database/migrations/*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flake8 command in the test-accounts job excludes the src/database/migrations/* directory. Ensure that this exclusion is intentional and aligns with your project's linting requirements. If you want to lint the entire src directory without exclusions, remove the --exclude=src/database/migrations/* option.


- name: Run accounts tests
run: |
Expand All @@ -50,7 +50,7 @@ jobs:

- name: Run flake8
run: |
poetry run flake8 src
poetry run flake8 src --exclude=src/database/migrations/*

- name: Run movies tests
run: |
Expand Down
1 change: 1 addition & 0 deletions src/database/populate.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def seed(self):
print(f"Unexpected error: {e}")
raise


def main():
settings = get_settings()
with get_db_contextmanager() as db_session:
Expand Down
253 changes: 252 additions & 1 deletion src/routes/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,259 @@
RefreshTokenModel
)
from exceptions import BaseSecurityError
from schemas import (
UserRegistrationRequestSchema,
UserRegistrationResponseSchema,
UserActivationRequestSchema,
MessageResponseSchema,
PasswordResetRequestSchema,
PasswordResetCompleteRequestSchema,
UserLoginRequestSchema,
UserLoginResponseSchema,
TokenRefreshRequestSchema,
TokenRefreshResponseSchema
)
from security.interfaces import JWTAuthManagerInterface


router = APIRouter()

# Write your code here

@router.post(
"/register",
response_model=UserRegistrationResponseSchema,
status_code=status.HTTP_201_CREATED
)
def register(
user: UserRegistrationRequestSchema,
db: Session = Depends(get_db)
):
db_user = db.query(UserModel).filter(UserModel.email == user.email).first()
if db_user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"A user with this email {user.email} already exists."
)

user_group = db.query(UserGroupModel).filter(UserGroupModel.name == UserGroupEnum.USER).first()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that user_group is not None before accessing user_group.id. If the UserGroupEnum.USER group does not exist, this will raise an AttributeError. Consider adding a check to handle this case.

try:
user = UserModel.create(email=user.email, raw_password=user.password, group_id=user_group.id)
db.add(user)
db.flush()
db.refresh(user)

activation_token = ActivationTokenModel(user_id=user.id)
db.add(activation_token)
db.commit()

return user
except SQLAlchemyError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An error occurred during user creation."
)


@router.post(
"/activate",
response_model=MessageResponseSchema
)
def activate(
user: UserActivationRequestSchema,
db: Session = Depends(get_db)
):
db_user = db.query(UserModel).filter(UserModel.email == user.email).first()
if db_user.is_active:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that db_user is not None before accessing db_user.is_active. If the user does not exist, this will raise an AttributeError. Consider adding a check to handle this case.

raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User account is already active."
)

token = db.query(ActivationTokenModel).filter(ActivationTokenModel.token == user.token).first()
if not token:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired activation token."
)

expires_at = cast(datetime, token.expires_at).replace(tzinfo=timezone.utc)
if expires_at < datetime.now(timezone.utc):
db.delete(token)
db.commit()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired activation token.",
)

db_user.is_active = True
db.delete(token)
db.commit()
return MessageResponseSchema(message="User account activated successfully.")


@router.post(
"/password-reset/request",
response_model=MessageResponseSchema
)
def password_reset_request(
user: PasswordResetRequestSchema,
db: Session = Depends(get_db)
):
db_user = db.query(UserModel).filter(UserModel.email == user.email).first()
if db_user and db_user.is_active:
token = (
db.query(PasswordResetTokenModel).filter(PasswordResetTokenModel.user_id == db_user.id).first()
)
if token:
db.delete(token)

reset_token = PasswordResetTokenModel(user_id=cast(int, db_user.id))
db.add(reset_token)
db.commit()

return MessageResponseSchema(
message="If you are registered, you will receive an email with instructions."
)


@router.post(
"/reset-password/complete",
response_model=MessageResponseSchema
)
def password_reset_complete(
user: PasswordResetCompleteRequestSchema,
db: Session = Depends(get_db)
):
db_user = db.query(UserModel).filter(UserModel.email == user.email).first()
if not db_user or not db_user.is_active:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition if not db_user or not db_user.is_active: might be misleading. If db_user is None, accessing db_user.is_active will raise an AttributeError. Consider separating the checks for db_user existence and is_active status.

token = db.query(PasswordResetTokenModel).filter(PasswordResetTokenModel.token == user.token).first()
if token:
db.delete(token)
db.commit()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid email or token."
)

reset_token = db.query(PasswordResetTokenModel).filter(PasswordResetTokenModel.token == user.token).first()
if not reset_token:
token = db.query(PasswordResetTokenModel).filter(PasswordResetTokenModel.user_id == db_user.id).first()
if token:
db.delete(token)
db.commit()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid email or token."
)

expires_at = cast(datetime, reset_token.expires_at).replace(tzinfo=timezone.utc)
if expires_at < datetime.now(timezone.utc):
db.delete(reset_token)
db.commit()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid email or token."
)

try:
db_user.password = user.password
db.delete(reset_token)
db.commit()

return MessageResponseSchema(message="Password reset successfully.")
except SQLAlchemyError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An error occurred while resetting the password."
)


@router.post(
"/login",
response_model=UserLoginResponseSchema,
status_code=status.HTTP_201_CREATED
)
def login(
user_data: UserLoginRequestSchema,
jwt_auth_manager: JWTAuthManagerInterface = Depends(get_jwt_auth_manager),
settings: BaseAppSettings = Depends(get_settings),
db: Session = Depends(get_db)
):
db_user = db.query(UserModel).filter_by(email=user_data.email).first()

if not db_user or not db_user.verify_password(raw_password=user_data.password):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that db_user is not None before calling db_user.verify_password. If the user does not exist, this will raise an AttributeError. Consider adding a check to handle this case.

raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password."
)

if not db_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is not activated."
)

try:
access_token = jwt_auth_manager.create_access_token(
{"user_id": db_user.id}
)
refresh_token = jwt_auth_manager.create_refresh_token(
{"user_id": db_user.id}
)
db_refresh = RefreshTokenModel.create(
user_id=cast(int, db_user.id),
days_valid=settings.LOGIN_TIME_DAYS,
token=refresh_token
)
db.add(db_refresh)
db.commit()

return UserLoginResponseSchema(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer"
)
except SQLAlchemyError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An error occurred while processing the request."
)


@router.post(
"/refresh",
response_model=TokenRefreshResponseSchema
)
def refresh(
token_data: TokenRefreshRequestSchema,
jwt_auth_manager: JWTAuthManagerInterface = Depends(get_jwt_auth_manager),
db: Session = Depends(get_db)
):
try:
token_decode = jwt_auth_manager.decode_refresh_token(token_data.refresh_token)
except BaseSecurityError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token has expired."
)

refresh_token = (
db.query(RefreshTokenModel).filter(RefreshTokenModel.token == token_data.refresh_token).first()
)
if not refresh_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token not found."
)

user = db.query(UserModel).filter(UserModel.id == token_decode["user_id"]).first()

if not user:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that user is not None before accessing user.id. If the user does not exist, this will raise an AttributeError. Consider adding a check to handle this case.

raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found.")

access_token = jwt_auth_manager.create_access_token({"user_id": user.id})

return TokenRefreshResponseSchema(access_token=access_token)
4 changes: 2 additions & 2 deletions src/routes/movies.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def create_movie(
return MovieDetailSchema.model_validate(movie)
except IntegrityError:
db.rollback()
raise HTTPException(status_code=400, detail=f"Invalid input data.")
raise HTTPException(status_code=400, detail="Invalid input data.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message 'Invalid input data.' is quite generic. Consider providing more specific details about what might be wrong with the input data to help clients debug their requests.



@router.get(
Expand Down Expand Up @@ -394,6 +394,6 @@ def update_movie(
db.refresh(movie)
except IntegrityError:
db.rollback()
raise HTTPException(status_code=400, detail=f"Invalid input data.")
raise HTTPException(status_code=400, detail="Invalid input data.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the create_movie function, the error message 'Invalid input data.' is generic. Consider providing more specific details about what might be wrong with the input data to help clients debug their requests.

else:
return {"detail": "Movie updated successfully."}
63 changes: 61 additions & 2 deletions src/schemas/accounts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,64 @@
from pydantic import BaseModel, EmailStr, field_validator
from pydantic import BaseModel, field_validator, EmailStr

from database import accounts_validators

# Write your code here

class UserRegistrationRequestSchema(BaseModel):
email: EmailStr
password: str

@field_validator("email")
@classmethod
def validate_email(cls, value: str) -> str:
return accounts_validators.validate_email(value)

@field_validator("password")
@classmethod
def validate_password(cls, value: str) -> str:
return accounts_validators.validate_password_strength(value)


class UserRegistrationResponseSchema(BaseModel):
id: int
email: str

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using EmailStr for the email field in UserRegistrationResponseSchema to maintain consistency with the UserRegistrationRequestSchema and ensure that the email format is validated.

model_config = {"from_attributes": True}


class UserActivationRequestSchema(BaseModel):
email: str
token: str

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using EmailStr for the email field in UserActivationRequestSchema to ensure that the email format is validated.



class MessageResponseSchema(BaseModel):
message: str


class PasswordResetRequestSchema(BaseModel):
email: str

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using EmailStr for the email field in PasswordResetRequestSchema to ensure that the email format is validated.


class PasswordResetCompleteRequestSchema(UserRegistrationRequestSchema):
token: str


class UserLoginRequestSchema(UserRegistrationRequestSchema):
pass


class UserLoginResponseSchema(BaseModel):
access_token: str
refresh_token: str
token_type: str

model_config = {"from_attributes": True}


class TokenRefreshRequestSchema(BaseModel):
refresh_token: str


class TokenRefreshResponseSchema(BaseModel):
access_token: str

model_config = {"from_attributes": True}
1 change: 1 addition & 0 deletions src/schemas/movies.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class MovieListResponseSchema(BaseModel):
}
}


class MovieCreateSchema(BaseModel):
name: str
date: date
Expand Down
Loading