-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauthentication.py
75 lines (61 loc) · 2.5 KB
/
authentication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from datetime import datetime, timedelta
import jwt
from django.conf import settings
from django.contrib.auth import get_user_model
from rest_framework import authentication
from rest_framework.exceptions import AuthenticationFailed, ParseError
User = get_user_model()
class JWTAuthentication(authentication.BaseAuthentication):
def authenticate(self, request):
# Extract the JWT from the Authorization header
jwt_token = request.META.get("HTTP_AUTHORIZATION")
if jwt_token is None:
return None
jwt_token = JWTAuthentication.get_the_token_from_header(
jwt_token
) # clean the token
# Decode the JWT and verify its signature
try:
payload = jwt.decode(jwt_token, settings.SECRET_KEY, algorithms=["HS256"])
except jwt.exceptions.InvalidSignatureError:
raise AuthenticationFailed("Geçersiz imza.")
except Exception:
raise ParseError()
# Get the user from the database
username_or_phone_number = payload.get("user_identifier")
if username_or_phone_number is None:
raise AuthenticationFailed(
"JWT içerisinde kullanıcı tanımlayıcı bulunamadı."
)
user = User.objects.filter(username=username_or_phone_number).first()
if user is None:
user = User.objects.filter(phone_number=username_or_phone_number).first()
if user is None:
raise AuthenticationFailed("Kullanıcı bulunamadı.")
# Return the user and token payload
return user, payload
def authenticate_header(self, request):
return "Bearer"
@classmethod
def create_jwt(cls, user):
# Create the JWT payload
payload = {
"user_identifier": user.username,
"exp": int(
(
datetime.now()
+ timedelta(hours=settings.JWT_CONF["TOKEN_LIFETIME_HOURS"])
).timestamp()
),
# set the expiration time for 5 hour from now
"iat": datetime.now().timestamp(),
"username": user.username,
"phone_number": user.phone_number,
}
# Encode the JWT with your secret key
jwt_token = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
return jwt_token
@classmethod
def get_the_token_from_header(cls, token):
token = token.replace("Bearer", "").replace(" ", "") # clean the token
return token