Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LDAP Implementation #160

Merged
merged 16 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 139 additions & 1 deletion manager/api/tests/tests_auth_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,55 @@
import json
from django.test import TestCase
from django.urls import reverse
from django.contrib.auth.models import User, Permission
from django.contrib.auth.models import User, Permission, Group
from freezegun import freeze_time
from rest_framework.test import APIClient
from rest_framework import status
from api.models import ConfigFile, Token
from django.conf import settings
from django.core.files.base import ContentFile
from unittest.mock import patch
from manager import utils
import ldap

LDAP_USERNAME = "ldap_user"
LDAP_USERNAME_NON_COMMANDS = "ldap_user_non_commands"
LDAP_USERNAME_EXISTENT = "ldap_user_existent"
LDAP_SEARCH_RESPONSE = [
[
None,
{"memberUid": [bytes(LDAP_USERNAME, encoding="utf-8"), b"user2", b"user3"]},
],
]


class MockLDAPUser:
_username = LDAP_USERNAME

def authenticate(self, password):
aux_user = User.objects.filter(username=self._username).first()
if aux_user is None:
ldap_user = User.objects.create_user(
username=self._username,
password=password,
email=f"{self._username}@user.cl",
first_name="First",
last_name="Last",
)
return ldap_user
return aux_user


class MockLDAPUserCommands(MockLDAPUser):
_username = LDAP_USERNAME


class MockLDAPUserNonCommands(MockLDAPUser):
_username = LDAP_USERNAME_NON_COMMANDS


class MockLDAPUserExistent(MockLDAPUser):
_username = LDAP_USERNAME_EXISTENT


class AuthApiTestCase(TestCase):
Expand Down Expand Up @@ -42,8 +83,20 @@ def setUp(self):
first_name="First2",
last_name="Last2",
)
self.user_ldap = User.objects.create_user(
username=LDAP_USERNAME_EXISTENT,
password="password",
email="[email protected]",
first_name="First LDAP",
last_name="Last LDAP",
)
self.user.user_permissions.add(Permission.objects.get(name="Execute Commands"))
self.user2.user_permissions.add(Permission.objects.get(name="Execute Commands"))

cmd_group = Group.objects.create(name="cmd")
cmd_group.permissions.add(Permission.objects.get(name="Execute Commands"))
cmd_group.user_set.add(self.user_ldap)

self.login_url = reverse("login")
self.validate_token_url = reverse("validate-token")
self.validate_token_no_config_url = reverse(
Expand Down Expand Up @@ -110,6 +163,91 @@ def test_user_login(self):
"The config was not requested",
)

@patch("django_auth_ldap.backend._LDAPUser", return_value=MockLDAPUserCommands())
@patch("ldap.initialize", return_value=ldap.ldapobject.LDAPObject("ldap://test/"))
@patch("ldap.ldapobject.LDAPObject.search_s", return_value=LDAP_SEARCH_RESPONSE)
def test_ldap_nonexistent_cmd_user_login(
self, mockLDAPObject, mockLDAPInitialize, mockLDAPUser
):
# Arrange:
data = {"username": LDAP_USERNAME, "password": "password"}
total_users_before = User.objects.count()

# Act:
with self.settings(
AUTHENTICATION_BACKENDS=[
"api.views.IPABackend1",
"django.contrib.auth.backends.ModelBackend",
]
):
response = self.client.post(self.login_url, data, format="json")
user = User.objects.filter(username=LDAP_USERNAME).first()
user_group = user.groups.filter(name="cmd").first()

total_users_after = User.objects.count()

# Assert:
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(total_users_before + 1, total_users_after)
self.assertEqual(user_group.name, "cmd")

@patch("django_auth_ldap.backend._LDAPUser", return_value=MockLDAPUserExistent())
@patch("ldap.initialize", return_value=ldap.ldapobject.LDAPObject("ldap://test/"))
@patch("ldap.ldapobject.LDAPObject.search_s", return_value=LDAP_SEARCH_RESPONSE)
def test_ldap_existent_cmd_user_login(
self, mockLDAPObject, mockLDAPInitialize, mockLDAPUser
):
# Arrange:
data = {"username": LDAP_USERNAME_EXISTENT, "password": "password"}
total_users_before = User.objects.count()

# Act:
with self.settings(
AUTHENTICATION_BACKENDS=[
"api.views.IPABackend1",
"django.contrib.auth.backends.ModelBackend",
]
):
response = self.client.post(self.login_url, data, format="json")
user = User.objects.filter(username=LDAP_USERNAME_EXISTENT).first()
user_group = user.groups.filter(name="cmd").first()

total_users_after = User.objects.count()

# Assert:
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(total_users_before, total_users_after)
self.assertEqual(user_group.name, "cmd")

@patch("django_auth_ldap.backend._LDAPUser", return_value=MockLDAPUserNonCommands())
@patch("ldap.initialize", return_value=ldap.ldapobject.LDAPObject("ldap://test/"))
@patch(
"ldap.ldapobject.LDAPObject.search_s", return_value=LDAP_SEARCH_RESPONSE,
)
def test_ldap_nonexistent_non_cmd_user_login(
self, mockLDAPObject, mockLDAPInitialize, mockLDAPUserNonCmd
):
# Arrange:
data = {"username": LDAP_USERNAME_NON_COMMANDS, "password": "password"}
total_users_before = User.objects.count()

# Act:
with self.settings(
AUTHENTICATION_BACKENDS=[
"api.views.IPABackend1",
"django.contrib.auth.backends.ModelBackend",
]
):
response = self.client.post(self.login_url, data, format="json")
user = User.objects.filter(username=LDAP_USERNAME_NON_COMMANDS).first()

total_users_after = User.objects.count()

# Assert:
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(total_users_before + 1, total_users_after)
self.assertEqual(user.groups.count(), 0)

def test_user_login_failed(self):
"""Test that a user cannot request a token if the credentials are invalid."""
# Arrange:
Expand Down
2 changes: 1 addition & 1 deletion manager/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
router = DefaultRouter()

urlpatterns = [
path("get-token/", api.views.CustomObtainAuthToken.as_view(), name="login"),
path("get-token/", api.views.LDAPLogin.as_view(), name="login"),
path("validate-token/", api.views.validate_token, name="validate-token"),
path("validate-token/<flags>/", api.views.validate_token, name="validate-token"),
path(
Expand Down
103 changes: 102 additions & 1 deletion manager/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@
import yaml
import jsonschema
import collections
import ldap
from background_task import background
from django.core.exceptions import PermissionDenied
from django.utils import timezone
from django.db.models.query_utils import Q
from django.contrib.auth import authenticate
from django.contrib.auth.models import Group, User
from django_auth_ldap.backend import LDAPBackend
from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema
from rest_framework.authtoken.views import ObtainAuthToken
from rest_framework.decorators import api_view
from rest_framework.decorators import permission_classes
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.permissions import IsAuthenticated, AllowAny
from rest_framework.response import Response
from rest_framework import viewsets, status, mixins
from rest_framework.views import APIView
from api.models import (
Token,
ConfigFile,
Expand All @@ -34,6 +39,11 @@
CSCAuthorizationRequestUpdateSerializer,
)
from .schema_validator import DefaultingValidator
from manager.settings import (
AUTH_LDAP_1_SERVER_URI,
AUTH_LDAP_2_SERVER_URI,
AUTH_LDAP_3_SERVER_URI,
)

valid_response = openapi.Response("Valid token", TokenSerializer)
invalid_response = openapi.Response("Invalid token")
Expand Down Expand Up @@ -106,6 +116,97 @@ def logout(request):
)


class IPABackend1(LDAPBackend):
settings_prefix = "AUTH_LDAP_1_"
successful_login = False

def authenticate_ldap_user(self, ldap_user, password):
user = ldap_user.authenticate(password)
if user:
IPABackend1.successful_login = True
return user


class IPABackend2(LDAPBackend):
settings_prefix = "AUTH_LDAP_2_"
successful_login = False

def authenticate_ldap_user(self, ldap_user, password):
user = ldap_user.authenticate(password)
if user:
IPABackend2.successful_login = True
return user


class IPABackend3(LDAPBackend):
settings_prefix = "AUTH_LDAP_3_"
successful_login = False

def authenticate_ldap_user(self, ldap_user, password):
user = ldap_user.authenticate(password)
if user:
IPABackend3.successful_login = True
return user


class LDAPLogin(APIView):
"""
Class to authenticate a user via LDAP and
then creating a login session
"""

authentication_classes = ()

permission_classes = [AllowAny]

def post(self, request):
"""
Api to login a user:

1. It authenticates to an LDAP server, if none is found, the default
login is used.

2. It searches for the 'love_ops' group and if the authenticate user
is present, command permissions are added.
"""

username = request.data["username"]
password = request.data["password"]
user_aux = User.objects.filter(username=username).first()
user_obj = authenticate(username=username, password=password)
if user_obj is None:
data = {"detail": "Login failed."}
return Response(data, status=400)

ldap_result = None
if user_aux is None:
if IPABackend1.successful_login:
ldap_result = ldap.initialize(AUTH_LDAP_1_SERVER_URI)
elif IPABackend2.successful_login:
ldap_result = ldap.initialize(AUTH_LDAP_2_SERVER_URI)
elif IPABackend3.successful_login:
ldap_result = ldap.initialize(AUTH_LDAP_3_SERVER_URI)

baseDN = "cn=love_ops,cn=groups,cn=compat,dc=lsst,dc=cloud"
searchScope = ldap.SCOPE_SUBTREE

if ldap_result is not None:
try:
ldap_result = ldap_result.search_s(baseDN, searchScope)
ops_users = list(
map(lambda u: u.decode(), ldap_result[0][1]["memberUid"])
)
if username in ops_users:
group = Group.objects.filter(name="cmd").first()
group.user_set.add(user_obj)
except Exception:
data = {"detail": "Login failed, add cmd permission error."}
return Response(data, status=400)

token = Token.objects.create(user=user_obj)
return Response(TokenSerializer(token).data)


class CustomObtainAuthToken(ObtainAuthToken):
"""API endpoint to obtain authorization tokens."""

Expand Down
50 changes: 37 additions & 13 deletions manager/manager/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,24 +236,48 @@
"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"},
}

AUTHENTICATION_BACKENDS = [
"django.contrib.auth.backends.ModelBackend",
]

# LDAP
# Baseline configuration:
AUTH_LDAP_SERVER_URI = os.environ.get("AUTH_LDAP_SERVER_URI", False)
AUTH_LDAP_1_SERVER_URI = os.environ.get("AUTH_LDAP_1_SERVER_URI")
AUTH_LDAP_2_SERVER_URI = os.environ.get("AUTH_LDAP_2_SERVER_URI")
AUTH_LDAP_3_SERVER_URI = os.environ.get("AUTH_LDAP_3_SERVER_URI")
"""URL for the LDAP server. Read from `AUTH_LDAP_SERVER_URI` environment variable (`bool`)"""

# Only use LDAP activation backend if there is an AUTH_LDAP_SERVER_URI
# configured in the OS ENV:
if AUTH_LDAP_SERVER_URI and not TESTING:
AUTHENTICATION_BACKENDS = [
"django_auth_ldap.backend.LDAPBackend",
]

AUTH_LDAP_BIND_DN = ""
AUTH_LDAP_BIND_PASSWORD = ""

AUTH_LDAP_USER_SEARCH = LDAPSearch(
"ou=people,dc=planetexpress,dc=com", ldap.SCOPE_SUBTREE, "(uid=%(user)s)",
)
AUTH_LDAP_BIND_DN = "uid=svc_love,cn=users,cn=accounts,dc=lsst,dc=cloud"
AUTH_LDAP_BIND_PASSWORD = os.environ.get("AUTH_LDAP_BIND_PASSWORD")
AUTH_LDAP_USER_SEARCH = LDAPSearch(
"cn=users,cn=accounts,dc=lsst,dc=cloud", ldap.SCOPE_SUBTREE, "(uid=%(user)s)",
)
AUTH_LDAP_USER_ATTR_MAP = {
"first_name": "givenname",
"last_name": "sn",
"email": "mail",
}
if AUTH_LDAP_3_SERVER_URI:
AUTHENTICATION_BACKENDS.insert(0, "api.views.IPABackend3")
AUTH_LDAP_3_BIND_DN = AUTH_LDAP_BIND_DN
AUTH_LDAP_3_BIND_PASSWORD = AUTH_LDAP_BIND_PASSWORD
AUTH_LDAP_3_USER_SEARCH = AUTH_LDAP_USER_SEARCH
AUTH_LDAP_3_USER_ATTR_MAP = AUTH_LDAP_USER_ATTR_MAP

if AUTH_LDAP_2_SERVER_URI:
AUTHENTICATION_BACKENDS.insert(0, "api.views.IPABackend2")
AUTH_LDAP_2_BIND_DN = AUTH_LDAP_BIND_DN
AUTH_LDAP_2_BIND_PASSWORD = AUTH_LDAP_BIND_PASSWORD
AUTH_LDAP_2_USER_SEARCH = AUTH_LDAP_USER_SEARCH
AUTH_LDAP_2_USER_ATTR_MAP = AUTH_LDAP_USER_ATTR_MAP

if AUTH_LDAP_1_SERVER_URI:
AUTHENTICATION_BACKENDS.insert(0, "api.views.IPABackend1")
AUTH_LDAP_1_BIND_DN = AUTH_LDAP_BIND_DN
AUTH_LDAP_1_BIND_PASSWORD = AUTH_LDAP_BIND_PASSWORD
AUTH_LDAP_1_USER_SEARCH = AUTH_LDAP_USER_SEARCH
AUTH_LDAP_1_USER_ATTR_MAP = AUTH_LDAP_USER_ATTR_MAP

TRACE_TIMESTAMPS = True
"""Define wether or not to add tracing timestamps to websocket messages.
Expand Down
2 changes: 1 addition & 1 deletion manager/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ coreschema==0.0.4
cryptography==3.3.2
daphne==3.0.1
Django==3.1.14
django-auth-ldap==2.1.0
django-auth-ldap==4.1.0
django-cors-headers==3.2.1
django-webpack-loader==0.7.0
djangorestframework==3.11.2
Expand Down