Skip to content

Commit

Permalink
Merge branch 'master' into itservice-metrics-inc
Browse files Browse the repository at this point in the history
  • Loading branch information
alechenninger authored Nov 21, 2024
2 parents edf92ec + 966edb5 commit ff32aa0
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 37 deletions.
7 changes: 4 additions & 3 deletions rbac/internal/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,10 @@ def bootstrap_tenant(request):
org_id = request.GET.get("org_id")
if not org_id:
return HttpResponse('Invalid request, must supply the "org_id" query parameter.', status=400)
tenant = get_object_or_404(Tenant, org_id=org_id)
bootstrap_service = V2TenantBootstrapService(OutboxReplicator())
bootstrap_service.bootstrap_tenant(tenant)
with transaction.atomic():
tenant = get_object_or_404(Tenant, org_id=org_id)
bootstrap_service = V2TenantBootstrapService(OutboxReplicator())
bootstrap_service.bootstrap_tenant(tenant)
return HttpResponse(f"Bootstrap tenant with org_id {org_id} finished.", status=200)


Expand Down
5 changes: 4 additions & 1 deletion rbac/management/group/definer.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ def clone_default_group_in_public_schema(group, tenant) -> Optional[Group]:
if settings.V2_BOOTSTRAP_TENANT:
tenant_bootstrap_service = V2TenantBootstrapService(OutboxReplicator())
bootstrapped_tenant = tenant_bootstrap_service.bootstrap_tenant(tenant)
group_uuid = bootstrapped_tenant.mapping.default_group_uuid
mapping = bootstrapped_tenant.mapping
# Mapping is always present with V2
assert mapping is not None
group_uuid = mapping.default_group_uuid
else:
group_uuid = uuid4()

Expand Down
80 changes: 55 additions & 25 deletions rbac/management/principal/cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import xmltodict
from django.conf import settings
from django.db import connection, transaction
from management.principal.model import Principal
from management.principal.proxy import PrincipalProxy, external_principal_to_user
from management.relation_replicator.outbox_replicator import OutboxReplicator
Expand All @@ -43,6 +44,7 @@
PROXY = PrincipalProxy() # pylint: disable=invalid-name
CERT_LOC = "/opt/rbac/rbac/management/principal/umb_certs/cert.pem"
KEY_LOC = "/opt/rbac/rbac/management/principal/umb_certs/key.pem"
LOCK_ID = 42 # For Keith, with Love

METRIC_STOMP_MESSAGE_TOTAL = "stomp_messages_total"
umb_message_processed_count = Counter(
Expand Down Expand Up @@ -184,26 +186,41 @@ def retrieve_user_info(message) -> User:
return external_principal_to_user(user_data)


def process_umb_event(frame, umb_client: Stomp, bootstrap_service: TenantBootstrapService):
"""Process each umb frame."""
data_dict = xmltodict.parse(frame.body)
canonical_message = data_dict.get("CanonicalMessage")
if not canonical_message:
# Message is malformed.
# Ensure we dont block the entire queue by discarding it.
umb_client.ack(frame)
return
try:
user = retrieve_user_info(canonical_message)
except Exception as e: # Skip processing and leave the it to be processed later
logger.error("process_umb_event: Error retrieving user info: %s", str(e))
return

# By default, only process disabled users.
# If the setting is enabled, process all users.
if not user.is_active or settings.PRINCIPAL_CLEANUP_UPDATE_ENABLED_UMB:
bootstrap_service.update_user(user)
def process_umb_event(frame, umb_client: Stomp, bootstrap_service: TenantBootstrapService) -> bool:
"""
Process each umb frame.
If the process should continue to listen for more frames, return True. Otherwise, return False.
"""
with transaction.atomic():
# This is locked per transaction to ensure another listener process does not run concurrently.
if not _lock_listener():
# If there is another listener, let it run and abort this one.
logger.info("process_umb_event: Another listener is running. Aborting.")
return False

data_dict = xmltodict.parse(frame.body)
canonical_message = data_dict.get("CanonicalMessage")
if canonical_message:
try:
user = retrieve_user_info(canonical_message)
except Exception as e: # Skip processing and leave the it to be processed later
logger.error("process_umb_event: Error retrieving user info: %s", str(e))
return True

# By default, only process disabled users.
# If the setting is enabled, process all users.
if not user.is_active or settings.PRINCIPAL_CLEANUP_UPDATE_ENABLED_UMB:
bootstrap_service.update_user(user)
else:
# Message is malformed.
# Ensure we dont block the entire queue by discarding it.
# TODO: this is not the only way a message can be malformed
pass

umb_client.ack(frame)
umb_message_processed_count.inc()
return True


def process_principal_events_from_umb(bootstrap_service: Optional[TenantBootstrapService] = None):
Expand All @@ -218,9 +235,22 @@ def process_principal_events_from_umb(bootstrap_service: Optional[TenantBootstra
if not str(e).startswith(("Already connected", "Already subscribed")):
raise e

while UMB_CLIENT.canRead(2): # Check if queue is empty, two sec timeout
frame = UMB_CLIENT.receiveFrame()
process_umb_event(frame, UMB_CLIENT, bootstrap_service)
umb_message_processed_count.inc()
UMB_CLIENT.disconnect()
logger.info("process_tenant_principal_events: Principal event processing finished.")
try:
while UMB_CLIENT.canRead(15): # Check if queue is empty, 15 sec timeout
frame = UMB_CLIENT.receiveFrame()
logger.info("process_tenant_principal_events: Processing frame. info=%s", frame.info())
if not process_umb_event(frame, UMB_CLIENT, bootstrap_service):
break
finally:
UMB_CLIENT.disconnect()
logger.info("process_tenant_principal_events: Principal event processing finished.")


def _lock_listener() -> bool:
"""Attempt to acquire a lock for the listener and if acquired return True, else False."""
with connection.cursor() as cursor:
cursor.execute("SELECT pg_try_advisory_xact_lock(%s);", [LOCK_ID])
result = cursor.fetchone()
if result is None:
raise Exception("Advisory lock returned none, expected bool.")
return result[0] # Returns True if lock acquired, False otherwise
5 changes: 0 additions & 5 deletions rbac/management/tenant_service/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import List, Optional

from django.conf import settings
from django.db import transaction
from django.db.models import Prefetch, Q
from kessel.relations.v1beta1.common_pb2 import Relationship
from management.group.model import Group
Expand Down Expand Up @@ -38,13 +37,11 @@ def __init__(self, replicator: RelationReplicator, public_tenant: Optional[Tenan
self._replicator = replicator
self._public_tenant = public_tenant

@transaction.atomic
def new_bootstrapped_tenant(self, org_id: str, account_number: Optional[str] = None) -> BootstrappedTenant:
"""Create a new tenant."""
tenant = Tenant.objects.create(org_id=org_id, account_id=account_number)
return self._bootstrap_tenant(tenant)

@transaction.atomic
def bootstrap_tenant(self, tenant: Tenant) -> BootstrappedTenant:
"""Bootstrap an existing tenant."""
try:
Expand All @@ -53,7 +50,6 @@ def bootstrap_tenant(self, tenant: Tenant) -> BootstrappedTenant:
except TenantMapping.DoesNotExist:
return self._bootstrap_tenant(tenant)

@transaction.atomic
def update_user(
self, user: User, upsert: bool = False, bootstrapped_tenant: Optional[BootstrappedTenant] = None
) -> Optional[BootstrappedTenant]:
Expand Down Expand Up @@ -105,7 +101,6 @@ def update_user(

return bootstrapped_tenant

@transaction.atomic
def import_bulk_users(self, users: list[User]):
"""
Bootstrap multiple users in a tenant.
Expand Down
5 changes: 3 additions & 2 deletions rbac/rbac/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from django.conf import settings
from django.core.handlers.wsgi import WSGIRequest
from django.db import IntegrityError
from django.db import IntegrityError, transaction
from django.http import Http404, HttpResponse, QueryDict
from django.urls import resolve
from django.utils.deprecation import MiddlewareMixin
Expand Down Expand Up @@ -121,7 +121,8 @@ def get_tenant(self, model, hostname, request):
# Tenants are normally bootstrapped via principal job,
# but there is a race condition where the user can use the service before the message is processed.
try:
bootstrap = self.bootstrap_service.update_user(request.user, upsert=True)
with transaction.atomic():
bootstrap = self.bootstrap_service.update_user(request.user, upsert=True)
if bootstrap is None:
# User is inactive. Should never happen but just in case...
raise Http404()
Expand Down
67 changes: 66 additions & 1 deletion tests/management/principal/test_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@
#
"""Test the principal cleaner."""
from functools import partial
from threading import Event, Thread
import uuid

from unittest.mock import MagicMock, patch

from django.db import connections, transaction
from django.test import override_settings
from prometheus_client import REGISTRY
from rest_framework import status

from management.group.definer import seed_group
from management.group.model import Group
from management.policy.model import Policy
from management.principal.cleaner import clean_tenant_principals
from management.principal.cleaner import LOCK_ID, clean_tenant_principals
from management.principal.model import Principal
from management.principal.cleaner import process_principal_events_from_umb, METRIC_STOMP_MESSAGE_TOTAL
from management.principal.proxy import external_principal_to_user
Expand Down Expand Up @@ -504,6 +506,69 @@ def test_principal_creation_event_updates_existing_principal(self, client_mock,
self.assertTrue(Tenant.objects.filter(org_id="17685860").exists())
self.assertTrue(Principal.objects.filter(user_id=self.principal_user_id).exists())

@patch(
"management.principal.proxy.PrincipalProxy.request_filtered_principals",
return_value={
"status_code": 200,
"data": [
{
"user_id": 56780000,
"org_id": "17685860",
"username": "principal-test",
"email": "[email protected]",
"first_name": "user",
"last_name": "test",
"is_org_admin": False,
"is_active": True,
}
],
},
)
@patch("management.principal.cleaner.UMB_CLIENT")
def test_does_not_update_user_if_lock_not_acquired(self, client_mock, proxy_mock):
"""Test that we can run principal creation event."""

def acquire_lock(ready: Event, close: Event):
"""Simulates another process acquiring the lock."""
try:
conn = connections.create_connection("default")
conn.set_autocommit(False)
with conn.cursor() as cursor:
cursor.execute("SELECT pg_try_advisory_xact_lock(%s);", [LOCK_ID])
assert cursor.fetchone()[0] is True
ready.set()
close.wait(30)
finally:
conn.close()

ready = Event()
close = Event()

try:
# Start the parallel "process"
other_thread = Thread(target=acquire_lock, args=(ready, close))
other_thread.start()
ready.wait(30)

client_mock.canRead.side_effect = [True, False]
client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY_CREATION)

public_tenant = Tenant.objects.get(tenant_name="public")
Group.objects.create(name="default", platform_default=True, tenant=public_tenant)
tenant = Tenant.objects.get(org_id="17685860")
Principal.objects.create(tenant=tenant, username="principal-test")

process_principal_events_from_umb()

client_mock.receiveFrame.assert_called_once()
client_mock.disconnect.assert_called_once()
client_mock.ack.assert_not_called()

self.assertFalse(Principal.objects.filter(user_id=self.principal_user_id).exists())
finally:
close.set()
other_thread.join()


@override_settings(V2_BOOTSTRAP_TENANT=True, PRINCIPAL_CLEANUP_UPDATE_ENABLED_UMB=True)
class PrincipalUMBTestsWithV2TenantBootstrap(PrincipalUMBTests):
Expand Down

0 comments on commit ff32aa0

Please sign in to comment.