Skip to content

Commit

Permalink
Merge pull request #160 from lsst-ts/tickets/LOVE-118
Browse files Browse the repository at this point in the history
LDAP Implementation
  • Loading branch information
areyesd14 authored Nov 8, 2022
2 parents fd62bcc + f86facb commit 11b70ca
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 17 deletions.
140 changes: 139 additions & 1 deletion manager/api/tests/tests_auth_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,55 @@
import json
from django.test import TestCase
from django.urls import reverse
from django.contrib.auth.models import User, Permission
from django.contrib.auth.models import User, Permission, Group
from freezegun import freeze_time
from rest_framework.test import APIClient
from rest_framework import status
from api.models import ConfigFile, Token
from django.conf import settings
from django.core.files.base import ContentFile
from unittest.mock import patch
from manager import utils
import ldap

LDAP_USERNAME = "ldap_user"
LDAP_USERNAME_NON_COMMANDS = "ldap_user_non_commands"
LDAP_USERNAME_EXISTENT = "ldap_user_existent"
LDAP_SEARCH_RESPONSE = [
[
None,
{"memberUid": [bytes(LDAP_USERNAME, encoding="utf-8"), b"user2", b"user3"]},
],
]


class MockLDAPUser:
_username = LDAP_USERNAME

def authenticate(self, password):
aux_user = User.objects.filter(username=self._username).first()
if aux_user is None:
ldap_user = User.objects.create_user(
username=self._username,
password=password,
email=f"{self._username}@user.cl",
first_name="First",
last_name="Last",
)
return ldap_user
return aux_user


class MockLDAPUserCommands(MockLDAPUser):
_username = LDAP_USERNAME


class MockLDAPUserNonCommands(MockLDAPUser):
_username = LDAP_USERNAME_NON_COMMANDS


class MockLDAPUserExistent(MockLDAPUser):
_username = LDAP_USERNAME_EXISTENT


class AuthApiTestCase(TestCase):
Expand Down Expand Up @@ -42,8 +83,20 @@ def setUp(self):
first_name="First2",
last_name="Last2",
)
self.user_ldap = User.objects.create_user(
username=LDAP_USERNAME_EXISTENT,
password="password",
email="[email protected]",
first_name="First LDAP",
last_name="Last LDAP",
)
self.user.user_permissions.add(Permission.objects.get(name="Execute Commands"))
self.user2.user_permissions.add(Permission.objects.get(name="Execute Commands"))

cmd_group = Group.objects.create(name="cmd")
cmd_group.permissions.add(Permission.objects.get(name="Execute Commands"))
cmd_group.user_set.add(self.user_ldap)

self.login_url = reverse("login")
self.validate_token_url = reverse("validate-token")
self.validate_token_no_config_url = reverse(
Expand Down Expand Up @@ -110,6 +163,91 @@ def test_user_login(self):
"The config was not requested",
)

@patch("django_auth_ldap.backend._LDAPUser", return_value=MockLDAPUserCommands())
@patch("ldap.initialize", return_value=ldap.ldapobject.LDAPObject("ldap://test/"))
@patch("ldap.ldapobject.LDAPObject.search_s", return_value=LDAP_SEARCH_RESPONSE)
def test_ldap_nonexistent_cmd_user_login(
self, mockLDAPObject, mockLDAPInitialize, mockLDAPUser
):
# Arrange:
data = {"username": LDAP_USERNAME, "password": "password"}
total_users_before = User.objects.count()

# Act:
with self.settings(
AUTHENTICATION_BACKENDS=[
"api.views.IPABackend1",
"django.contrib.auth.backends.ModelBackend",
]
):
response = self.client.post(self.login_url, data, format="json")
user = User.objects.filter(username=LDAP_USERNAME).first()
user_group = user.groups.filter(name="cmd").first()

total_users_after = User.objects.count()

# Assert:
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(total_users_before + 1, total_users_after)
self.assertEqual(user_group.name, "cmd")

@patch("django_auth_ldap.backend._LDAPUser", return_value=MockLDAPUserExistent())
@patch("ldap.initialize", return_value=ldap.ldapobject.LDAPObject("ldap://test/"))
@patch("ldap.ldapobject.LDAPObject.search_s", return_value=LDAP_SEARCH_RESPONSE)
def test_ldap_existent_cmd_user_login(
self, mockLDAPObject, mockLDAPInitialize, mockLDAPUser
):
# Arrange:
data = {"username": LDAP_USERNAME_EXISTENT, "password": "password"}
total_users_before = User.objects.count()

# Act:
with self.settings(
AUTHENTICATION_BACKENDS=[
"api.views.IPABackend1",
"django.contrib.auth.backends.ModelBackend",
]
):
response = self.client.post(self.login_url, data, format="json")
user = User.objects.filter(username=LDAP_USERNAME_EXISTENT).first()
user_group = user.groups.filter(name="cmd").first()

total_users_after = User.objects.count()

# Assert:
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(total_users_before, total_users_after)
self.assertEqual(user_group.name, "cmd")

@patch("django_auth_ldap.backend._LDAPUser", return_value=MockLDAPUserNonCommands())
@patch("ldap.initialize", return_value=ldap.ldapobject.LDAPObject("ldap://test/"))
@patch(
"ldap.ldapobject.LDAPObject.search_s", return_value=LDAP_SEARCH_RESPONSE,
)
def test_ldap_nonexistent_non_cmd_user_login(
self, mockLDAPObject, mockLDAPInitialize, mockLDAPUserNonCmd
):
# Arrange:
data = {"username": LDAP_USERNAME_NON_COMMANDS, "password": "password"}
total_users_before = User.objects.count()

# Act:
with self.settings(
AUTHENTICATION_BACKENDS=[
"api.views.IPABackend1",
"django.contrib.auth.backends.ModelBackend",
]
):
response = self.client.post(self.login_url, data, format="json")
user = User.objects.filter(username=LDAP_USERNAME_NON_COMMANDS).first()

total_users_after = User.objects.count()

# Assert:
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(total_users_before + 1, total_users_after)
self.assertEqual(user.groups.count(), 0)

def test_user_login_failed(self):
"""Test that a user cannot request a token if the credentials are invalid."""
# Arrange:
Expand Down
2 changes: 1 addition & 1 deletion manager/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
router = DefaultRouter()

urlpatterns = [
path("get-token/", api.views.CustomObtainAuthToken.as_view(), name="login"),
path("get-token/", api.views.LDAPLogin.as_view(), name="login"),
path("validate-token/", api.views.validate_token, name="validate-token"),
path("validate-token/<flags>/", api.views.validate_token, name="validate-token"),
path(
Expand Down
103 changes: 102 additions & 1 deletion manager/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@
import yaml
import jsonschema
import collections
import ldap
from background_task import background
from django.core.exceptions import PermissionDenied
from django.utils import timezone
from django.db.models.query_utils import Q
from django.contrib.auth import authenticate
from django.contrib.auth.models import Group, User
from django_auth_ldap.backend import LDAPBackend
from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema
from rest_framework.authtoken.views import ObtainAuthToken
from rest_framework.decorators import api_view
from rest_framework.decorators import permission_classes
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.permissions import IsAuthenticated, AllowAny
from rest_framework.response import Response
from rest_framework import viewsets, status, mixins
from rest_framework.views import APIView
from api.models import (
Token,
ConfigFile,
Expand All @@ -34,6 +39,11 @@
CSCAuthorizationRequestUpdateSerializer,
)
from .schema_validator import DefaultingValidator
from manager.settings import (
AUTH_LDAP_1_SERVER_URI,
AUTH_LDAP_2_SERVER_URI,
AUTH_LDAP_3_SERVER_URI,
)

valid_response = openapi.Response("Valid token", TokenSerializer)
invalid_response = openapi.Response("Invalid token")
Expand Down Expand Up @@ -106,6 +116,97 @@ def logout(request):
)


class IPABackend1(LDAPBackend):
settings_prefix = "AUTH_LDAP_1_"
successful_login = False

def authenticate_ldap_user(self, ldap_user, password):
user = ldap_user.authenticate(password)
if user:
IPABackend1.successful_login = True
return user


class IPABackend2(LDAPBackend):
settings_prefix = "AUTH_LDAP_2_"
successful_login = False

def authenticate_ldap_user(self, ldap_user, password):
user = ldap_user.authenticate(password)
if user:
IPABackend2.successful_login = True
return user


class IPABackend3(LDAPBackend):
settings_prefix = "AUTH_LDAP_3_"
successful_login = False

def authenticate_ldap_user(self, ldap_user, password):
user = ldap_user.authenticate(password)
if user:
IPABackend3.successful_login = True
return user


class LDAPLogin(APIView):
"""
Class to authenticate a user via LDAP and
then creating a login session
"""

authentication_classes = ()

permission_classes = [AllowAny]

def post(self, request):
"""
Api to login a user:
1. It authenticates to an LDAP server, if none is found, the default
login is used.
2. It searches for the 'love_ops' group and if the authenticate user
is present, command permissions are added.
"""

username = request.data["username"]
password = request.data["password"]
user_aux = User.objects.filter(username=username).first()
user_obj = authenticate(username=username, password=password)
if user_obj is None:
data = {"detail": "Login failed."}
return Response(data, status=400)

ldap_result = None
if user_aux is None:
if IPABackend1.successful_login:
ldap_result = ldap.initialize(AUTH_LDAP_1_SERVER_URI)
elif IPABackend2.successful_login:
ldap_result = ldap.initialize(AUTH_LDAP_2_SERVER_URI)
elif IPABackend3.successful_login:
ldap_result = ldap.initialize(AUTH_LDAP_3_SERVER_URI)

baseDN = "cn=love_ops,cn=groups,cn=compat,dc=lsst,dc=cloud"
searchScope = ldap.SCOPE_SUBTREE

if ldap_result is not None:
try:
ldap_result = ldap_result.search_s(baseDN, searchScope)
ops_users = list(
map(lambda u: u.decode(), ldap_result[0][1]["memberUid"])
)
if username in ops_users:
group = Group.objects.filter(name="cmd").first()
group.user_set.add(user_obj)
except Exception:
data = {"detail": "Login failed, add cmd permission error."}
return Response(data, status=400)

token = Token.objects.create(user=user_obj)
return Response(TokenSerializer(token).data)


class CustomObtainAuthToken(ObtainAuthToken):
"""API endpoint to obtain authorization tokens."""

Expand Down
50 changes: 37 additions & 13 deletions manager/manager/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,24 +236,48 @@
"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"},
}

AUTHENTICATION_BACKENDS = [
"django.contrib.auth.backends.ModelBackend",
]

# LDAP
# Baseline configuration:
AUTH_LDAP_SERVER_URI = os.environ.get("AUTH_LDAP_SERVER_URI", False)
AUTH_LDAP_1_SERVER_URI = os.environ.get("AUTH_LDAP_1_SERVER_URI")
AUTH_LDAP_2_SERVER_URI = os.environ.get("AUTH_LDAP_2_SERVER_URI")
AUTH_LDAP_3_SERVER_URI = os.environ.get("AUTH_LDAP_3_SERVER_URI")
"""URL for the LDAP server. Read from `AUTH_LDAP_SERVER_URI` environment variable (`bool`)"""

# Only use LDAP activation backend if there is an AUTH_LDAP_SERVER_URI
# configured in the OS ENV:
if AUTH_LDAP_SERVER_URI and not TESTING:
AUTHENTICATION_BACKENDS = [
"django_auth_ldap.backend.LDAPBackend",
]

AUTH_LDAP_BIND_DN = ""
AUTH_LDAP_BIND_PASSWORD = ""

AUTH_LDAP_USER_SEARCH = LDAPSearch(
"ou=people,dc=planetexpress,dc=com", ldap.SCOPE_SUBTREE, "(uid=%(user)s)",
)
AUTH_LDAP_BIND_DN = "uid=svc_love,cn=users,cn=accounts,dc=lsst,dc=cloud"
AUTH_LDAP_BIND_PASSWORD = os.environ.get("AUTH_LDAP_BIND_PASSWORD")
AUTH_LDAP_USER_SEARCH = LDAPSearch(
"cn=users,cn=accounts,dc=lsst,dc=cloud", ldap.SCOPE_SUBTREE, "(uid=%(user)s)",
)
AUTH_LDAP_USER_ATTR_MAP = {
"first_name": "givenname",
"last_name": "sn",
"email": "mail",
}
if AUTH_LDAP_3_SERVER_URI:
AUTHENTICATION_BACKENDS.insert(0, "api.views.IPABackend3")
AUTH_LDAP_3_BIND_DN = AUTH_LDAP_BIND_DN
AUTH_LDAP_3_BIND_PASSWORD = AUTH_LDAP_BIND_PASSWORD
AUTH_LDAP_3_USER_SEARCH = AUTH_LDAP_USER_SEARCH
AUTH_LDAP_3_USER_ATTR_MAP = AUTH_LDAP_USER_ATTR_MAP

if AUTH_LDAP_2_SERVER_URI:
AUTHENTICATION_BACKENDS.insert(0, "api.views.IPABackend2")
AUTH_LDAP_2_BIND_DN = AUTH_LDAP_BIND_DN
AUTH_LDAP_2_BIND_PASSWORD = AUTH_LDAP_BIND_PASSWORD
AUTH_LDAP_2_USER_SEARCH = AUTH_LDAP_USER_SEARCH
AUTH_LDAP_2_USER_ATTR_MAP = AUTH_LDAP_USER_ATTR_MAP

if AUTH_LDAP_1_SERVER_URI:
AUTHENTICATION_BACKENDS.insert(0, "api.views.IPABackend1")
AUTH_LDAP_1_BIND_DN = AUTH_LDAP_BIND_DN
AUTH_LDAP_1_BIND_PASSWORD = AUTH_LDAP_BIND_PASSWORD
AUTH_LDAP_1_USER_SEARCH = AUTH_LDAP_USER_SEARCH
AUTH_LDAP_1_USER_ATTR_MAP = AUTH_LDAP_USER_ATTR_MAP

TRACE_TIMESTAMPS = True
"""Define wether or not to add tracing timestamps to websocket messages.
Expand Down
2 changes: 1 addition & 1 deletion manager/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ coreschema==0.0.4
cryptography==3.3.2
daphne==3.0.1
Django==3.1.14
django-auth-ldap==2.1.0
django-auth-ldap==4.1.0
django-cors-headers==3.2.1
django-webpack-loader==0.7.0
djangorestframework==3.11.2
Expand Down

0 comments on commit 11b70ca

Please sign in to comment.