From 56748f8ebd9d2e104fa2bc6a357850fe3cee7814 Mon Sep 17 00:00:00 2001 From: Alec Henninger <alechenninger@gmail.com> Date: Wed, 20 Nov 2024 15:33:08 -0500 Subject: [PATCH 1/8] Only commit to outbox from user listener while advisory lock acquired --- rbac/internal/views.py | 7 +-- rbac/management/principal/cleaner.py | 65 +++++++++++++++++++--------- rbac/management/tenant_service/v2.py | 5 --- rbac/rbac/middleware.py | 5 ++- 4 files changed, 51 insertions(+), 31 deletions(-) diff --git a/rbac/internal/views.py b/rbac/internal/views.py index a4832d978..c4d0a40f2 100644 --- a/rbac/internal/views.py +++ b/rbac/internal/views.py @@ -516,9 +516,10 @@ def bootstrap_tenant(request): org_id = request.GET.get("org_id") if not org_id: return HttpResponse('Invalid request, must supply the "org_id" query parameter.', status=400) - tenant = get_object_or_404(Tenant, org_id=org_id) - bootstrap_service = V2TenantBootstrapService(OutboxReplicator()) - bootstrap_service.bootstrap_tenant(tenant) + with transaction.atomic(): + tenant = get_object_or_404(Tenant, org_id=org_id) + bootstrap_service = V2TenantBootstrapService(OutboxReplicator()) + bootstrap_service.bootstrap_tenant(tenant) return HttpResponse(f"Bootstrap tenant with org_id {org_id} finished.", status=200) diff --git a/rbac/management/principal/cleaner.py b/rbac/management/principal/cleaner.py index a8e4af345..4f47cd716 100644 --- a/rbac/management/principal/cleaner.py +++ b/rbac/management/principal/cleaner.py @@ -23,6 +23,7 @@ import xmltodict from django.conf import settings +from django.db import connection, transaction from management.principal.model import Principal from management.principal.proxy import PrincipalProxy, external_principal_to_user from management.relation_replicator.outbox_replicator import OutboxReplicator @@ -42,6 +43,7 @@ PROXY = PrincipalProxy() # pylint: disable=invalid-name CERT_LOC = "/opt/rbac/rbac/management/principal/umb_certs/cert.pem" KEY_LOC = "/opt/rbac/rbac/management/principal/umb_certs/key.pem" +LOCK_ID = hash(settings.ENV_NAME) def clean_tenant_principals(tenant): @@ -177,26 +179,38 @@ def retrieve_user_info(message) -> User: return external_principal_to_user(user_data) -def process_umb_event(frame, umb_client: Stomp, bootstrap_service: TenantBootstrapService): - """Process each umb frame.""" - data_dict = xmltodict.parse(frame.body) - canonical_message = data_dict.get("CanonicalMessage") - if not canonical_message: - # Message is malformed. - # Ensure we dont block the entire queue by discarding it. - umb_client.ack(frame) - return - try: - user = retrieve_user_info(canonical_message) - except Exception as e: # Skip processing and leave the it to be processed later - logger.error("process_umb_event: Error retrieving user info: %s", str(e)) - return - - # By default, only process disabled users. - # If the setting is enabled, process all users. - if not user.is_active or settings.PRINCIPAL_CLEANUP_UPDATE_ENABLED_UMB: - bootstrap_service.update_user(user) +def process_umb_event(frame, umb_client: Stomp, bootstrap_service: TenantBootstrapService) -> bool: + """ + Process each umb frame. + + If the process should continue to listen for more frames, return True. Otherwise, return False. + """ + with transaction.atomic(): + # This is locked per transaction to ensure another listener process does not run concurrently. + if not _lock_listener(): + # If there is another listener, let it run and abort this one. + return False + + data_dict = xmltodict.parse(frame.body) + canonical_message = data_dict.get("CanonicalMessage") + if not canonical_message: + # Message is malformed. + # Ensure we dont block the entire queue by discarding it. + umb_client.ack(frame) + return True + try: + user = retrieve_user_info(canonical_message) + except Exception as e: # Skip processing and leave the it to be processed later + logger.error("process_umb_event: Error retrieving user info: %s", str(e)) + return True + + # By default, only process disabled users. + # If the setting is enabled, process all users. + if not user.is_active or settings.PRINCIPAL_CLEANUP_UPDATE_ENABLED_UMB: + bootstrap_service.update_user(user) + umb_client.ack(frame) + return True def process_principal_events_from_umb(bootstrap_service: Optional[TenantBootstrapService] = None): @@ -211,8 +225,17 @@ def process_principal_events_from_umb(bootstrap_service: Optional[TenantBootstra if not str(e).startswith(("Already connected", "Already subscribed")): raise e - while UMB_CLIENT.canRead(2): # Check if queue is empty, two sec timeout + while UMB_CLIENT.canRead(15): # Check if queue is empty, 15 sec timeout frame = UMB_CLIENT.receiveFrame() - process_umb_event(frame, UMB_CLIENT, bootstrap_service) + if not process_umb_event(frame, UMB_CLIENT, bootstrap_service): + return UMB_CLIENT.disconnect() logger.info("process_tenant_principal_events: Principal event processing finished.") + + +def _lock_listener() -> bool: + """Attempt to acquire a lock for the listener and if acquired return True, else False.""" + with connection.cursor() as cursor: + cursor.execute("SELECT pg_try_advisory_xact_lock(%s);", [LOCK_ID]) + result = cursor.fetchone() + return result[0] # Returns True if lock acquired, False otherwise diff --git a/rbac/management/tenant_service/v2.py b/rbac/management/tenant_service/v2.py index e98567163..a8de0e643 100644 --- a/rbac/management/tenant_service/v2.py +++ b/rbac/management/tenant_service/v2.py @@ -3,7 +3,6 @@ from typing import List, Optional from django.conf import settings -from django.db import transaction from django.db.models import Prefetch, Q from kessel.relations.v1beta1.common_pb2 import Relationship from management.group.model import Group @@ -38,13 +37,11 @@ def __init__(self, replicator: RelationReplicator, public_tenant: Optional[Tenan self._replicator = replicator self._public_tenant = public_tenant - @transaction.atomic def new_bootstrapped_tenant(self, org_id: str, account_number: Optional[str] = None) -> BootstrappedTenant: """Create a new tenant.""" tenant = Tenant.objects.create(org_id=org_id, account_id=account_number) return self._bootstrap_tenant(tenant) - @transaction.atomic def bootstrap_tenant(self, tenant: Tenant) -> BootstrappedTenant: """Bootstrap an existing tenant.""" try: @@ -53,7 +50,6 @@ def bootstrap_tenant(self, tenant: Tenant) -> BootstrappedTenant: except TenantMapping.DoesNotExist: return self._bootstrap_tenant(tenant) - @transaction.atomic def update_user( self, user: User, upsert: bool = False, bootstrapped_tenant: Optional[BootstrappedTenant] = None ) -> Optional[BootstrappedTenant]: @@ -105,7 +101,6 @@ def update_user( return bootstrapped_tenant - @transaction.atomic def import_bulk_users(self, users: list[User]): """ Bootstrap multiple users in a tenant. diff --git a/rbac/rbac/middleware.py b/rbac/rbac/middleware.py index 187040367..e315ff1ca 100644 --- a/rbac/rbac/middleware.py +++ b/rbac/rbac/middleware.py @@ -23,7 +23,7 @@ from django.conf import settings from django.core.handlers.wsgi import WSGIRequest -from django.db import IntegrityError +from django.db import IntegrityError, transaction from django.http import Http404, HttpResponse, QueryDict from django.urls import resolve from django.utils.deprecation import MiddlewareMixin @@ -121,7 +121,8 @@ def get_tenant(self, model, hostname, request): # Tenants are normally bootstrapped via principal job, # but there is a race condition where the user can use the service before the message is processed. try: - bootstrap = self.bootstrap_service.update_user(request.user, upsert=True) + with transaction.atomic(): + bootstrap = self.bootstrap_service.update_user(request.user, upsert=True) if bootstrap is None: # User is inactive. Should never happen but just in case... raise Http404() From f677d1aa398c6b261f7775a0d08a29cd113da65c Mon Sep 17 00:00:00 2001 From: Alec Henninger <alechenninger@gmail.com> Date: Wed, 20 Nov 2024 20:10:19 -0500 Subject: [PATCH 2/8] Ensure lock ID is stable; fix lint issues --- rbac/management/group/definer.py | 5 ++++- rbac/management/principal/cleaner.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/rbac/management/group/definer.py b/rbac/management/group/definer.py index b6001df40..618f22de9 100644 --- a/rbac/management/group/definer.py +++ b/rbac/management/group/definer.py @@ -97,7 +97,10 @@ def clone_default_group_in_public_schema(group, tenant) -> Optional[Group]: if settings.V2_BOOTSTRAP_TENANT: tenant_bootstrap_service = V2TenantBootstrapService(OutboxReplicator()) bootstrapped_tenant = tenant_bootstrap_service.bootstrap_tenant(tenant) - group_uuid = bootstrapped_tenant.mapping.default_group_uuid + mapping = bootstrapped_tenant.mapping + # Mapping is always present with V2 + assert mapping is not None + group_uuid = mapping.default_group_uuid else: group_uuid = uuid4() diff --git a/rbac/management/principal/cleaner.py b/rbac/management/principal/cleaner.py index 4f47cd716..074fc8955 100644 --- a/rbac/management/principal/cleaner.py +++ b/rbac/management/principal/cleaner.py @@ -43,7 +43,7 @@ PROXY = PrincipalProxy() # pylint: disable=invalid-name CERT_LOC = "/opt/rbac/rbac/management/principal/umb_certs/cert.pem" KEY_LOC = "/opt/rbac/rbac/management/principal/umb_certs/key.pem" -LOCK_ID = hash(settings.ENV_NAME) +LOCK_ID = 42 # For Keith, with Love def clean_tenant_principals(tenant): From 889f3caa0a1ce80d97023f31bd95c5ede678088f Mon Sep 17 00:00:00 2001 From: Alec Henninger <alechenninger@gmail.com> Date: Wed, 20 Nov 2024 20:17:12 -0500 Subject: [PATCH 3/8] Ensure task disconnects properly --- rbac/management/principal/cleaner.py | 2 +- tests/management/principal/test_cleaner.py | 39 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/rbac/management/principal/cleaner.py b/rbac/management/principal/cleaner.py index 074fc8955..9b80c5876 100644 --- a/rbac/management/principal/cleaner.py +++ b/rbac/management/principal/cleaner.py @@ -228,7 +228,7 @@ def process_principal_events_from_umb(bootstrap_service: Optional[TenantBootstra while UMB_CLIENT.canRead(15): # Check if queue is empty, 15 sec timeout frame = UMB_CLIENT.receiveFrame() if not process_umb_event(frame, UMB_CLIENT, bootstrap_service): - return + break UMB_CLIENT.disconnect() logger.info("process_tenant_principal_events: Principal event processing finished.") diff --git a/tests/management/principal/test_cleaner.py b/tests/management/principal/test_cleaner.py index 24ed617cb..6efd467f4 100644 --- a/tests/management/principal/test_cleaner.py +++ b/tests/management/principal/test_cleaner.py @@ -497,6 +497,45 @@ def test_principal_creation_event_updates_existing_principal(self, client_mock, self.assertTrue(Tenant.objects.filter(org_id="17685860").exists()) self.assertTrue(Principal.objects.filter(user_id=self.principal_user_id).exists()) + @patch( + "management.principal.proxy.PrincipalProxy.request_filtered_principals", + return_value={ + "status_code": 200, + "data": [ + { + "user_id": 56780000, + "org_id": "17685860", + "username": "principal-test", + "email": "test_user@email.com", + "first_name": "user", + "last_name": "test", + "is_org_admin": False, + "is_active": True, + } + ], + }, + ) + @patch("management.principal.cleaner.UMB_CLIENT") + @patch("management.principal.cleaner._lock_listener") + def test_does_not_update_user_if_lock_not_acquired(self, lock, client_mock, proxy_mock): + """Test that we can run principal creation event.""" + lock.return_value = False + client_mock.canRead.return_value = True + client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY_CREATION) + + public_tenant = Tenant.objects.get(tenant_name="public") + Group.objects.create(name="default", platform_default=True, tenant=public_tenant) + tenant = Tenant.objects.get(org_id="17685860") + Principal.objects.create(tenant=tenant, username="principal-test") + + process_principal_events_from_umb() + + client_mock.receiveFrame.assert_called_once() + client_mock.disconnect.assert_called_once() + client_mock.ack.assert_not_called() + + self.assertFalse(Principal.objects.filter(user_id=self.principal_user_id).exists()) + @override_settings(V2_BOOTSTRAP_TENANT=True, PRINCIPAL_CLEANUP_UPDATE_ENABLED_UMB=True) class PrincipalUMBTestsWithV2TenantBootstrap(PrincipalUMBTests): From a469c2ee44fa3f944b5f1d5ffb8305d59e04a129 Mon Sep 17 00:00:00 2001 From: Alec Henninger <alechenninger@gmail.com> Date: Wed, 20 Nov 2024 20:55:48 -0500 Subject: [PATCH 4/8] Improve the test case to actually excercise the lock --- tests/management/principal/test_cleaner.py | 56 ++++++++++++++++------ 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/tests/management/principal/test_cleaner.py b/tests/management/principal/test_cleaner.py index 6efd467f4..32c566e4d 100644 --- a/tests/management/principal/test_cleaner.py +++ b/tests/management/principal/test_cleaner.py @@ -16,17 +16,19 @@ # """Test the principal cleaner.""" from functools import partial +from threading import Event, Thread import uuid from unittest.mock import MagicMock, patch +from django.db import connections, transaction from django.test import override_settings from rest_framework import status from management.group.definer import seed_group from management.group.model import Group from management.policy.model import Policy -from management.principal.cleaner import clean_tenant_principals +from management.principal.cleaner import LOCK_ID, clean_tenant_principals from management.principal.model import Principal from management.principal.cleaner import process_principal_events_from_umb from management.principal.proxy import external_principal_to_user @@ -516,25 +518,49 @@ def test_principal_creation_event_updates_existing_principal(self, client_mock, }, ) @patch("management.principal.cleaner.UMB_CLIENT") - @patch("management.principal.cleaner._lock_listener") - def test_does_not_update_user_if_lock_not_acquired(self, lock, client_mock, proxy_mock): + def test_does_not_update_user_if_lock_not_acquired(self, client_mock, proxy_mock): """Test that we can run principal creation event.""" - lock.return_value = False - client_mock.canRead.return_value = True - client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY_CREATION) - public_tenant = Tenant.objects.get(tenant_name="public") - Group.objects.create(name="default", platform_default=True, tenant=public_tenant) - tenant = Tenant.objects.get(org_id="17685860") - Principal.objects.create(tenant=tenant, username="principal-test") + def acquire_lock(ready: Event, close: Event): + """Simulates another process acquiring the lock.""" + try: + conn = connections.create_connection("default") + conn.set_autocommit(False) + with conn.cursor() as cursor: + cursor.execute("SELECT pg_try_advisory_xact_lock(%s);", [LOCK_ID]) + assert cursor.fetchone()[0] is True + ready.set() + close.wait(30) + finally: + conn.close() + + ready = Event() + close = Event() - process_principal_events_from_umb() + try: + # Start the parallel "process" + other_thread = Thread(target=acquire_lock, args=(ready, close)) + other_thread.start() + ready.wait(30) - client_mock.receiveFrame.assert_called_once() - client_mock.disconnect.assert_called_once() - client_mock.ack.assert_not_called() + client_mock.canRead.side_effect = [True, False] + client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY_CREATION) - self.assertFalse(Principal.objects.filter(user_id=self.principal_user_id).exists()) + public_tenant = Tenant.objects.get(tenant_name="public") + Group.objects.create(name="default", platform_default=True, tenant=public_tenant) + tenant = Tenant.objects.get(org_id="17685860") + Principal.objects.create(tenant=tenant, username="principal-test") + + process_principal_events_from_umb() + + client_mock.receiveFrame.assert_called_once() + client_mock.disconnect.assert_called_once() + client_mock.ack.assert_not_called() + + self.assertFalse(Principal.objects.filter(user_id=self.principal_user_id).exists()) + finally: + close.set() + other_thread.join() @override_settings(V2_BOOTSTRAP_TENANT=True, PRINCIPAL_CLEANUP_UPDATE_ENABLED_UMB=True) From 2717337a96b1e3ecc42229faf033e9088c61d74e Mon Sep 17 00:00:00 2001 From: Alec Henninger <alechenninger@gmail.com> Date: Thu, 21 Nov 2024 07:56:05 -0500 Subject: [PATCH 5/8] Harden listener --- rbac/management/principal/cleaner.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/rbac/management/principal/cleaner.py b/rbac/management/principal/cleaner.py index 9b80c5876..4590d368d 100644 --- a/rbac/management/principal/cleaner.py +++ b/rbac/management/principal/cleaner.py @@ -21,6 +21,7 @@ import ssl from typing import Optional +from py import log import xmltodict from django.conf import settings from django.db import connection, transaction @@ -189,6 +190,7 @@ def process_umb_event(frame, umb_client: Stomp, bootstrap_service: TenantBootstr # This is locked per transaction to ensure another listener process does not run concurrently. if not _lock_listener(): # If there is another listener, let it run and abort this one. + logger.info("process_umb_event: Another listener is running. Aborting.") return False data_dict = xmltodict.parse(frame.body) @@ -225,12 +227,14 @@ def process_principal_events_from_umb(bootstrap_service: Optional[TenantBootstra if not str(e).startswith(("Already connected", "Already subscribed")): raise e - while UMB_CLIENT.canRead(15): # Check if queue is empty, 15 sec timeout - frame = UMB_CLIENT.receiveFrame() - if not process_umb_event(frame, UMB_CLIENT, bootstrap_service): - break - UMB_CLIENT.disconnect() - logger.info("process_tenant_principal_events: Principal event processing finished.") + try: + while UMB_CLIENT.canRead(15): # Check if queue is empty, 15 sec timeout + frame = UMB_CLIENT.receiveFrame() + if not process_umb_event(frame, UMB_CLIENT, bootstrap_service): + break + finally: + UMB_CLIENT.disconnect() + logger.info("process_tenant_principal_events: Principal event processing finished.") def _lock_listener() -> bool: @@ -238,4 +242,6 @@ def _lock_listener() -> bool: with connection.cursor() as cursor: cursor.execute("SELECT pg_try_advisory_xact_lock(%s);", [LOCK_ID]) result = cursor.fetchone() + if result is None: + raise Exception("Advisory lock returned none, expected bool.") return result[0] # Returns True if lock acquired, False otherwise From 37bb4fa30b9f4a12dbca0bb761955dbff477c620 Mon Sep 17 00:00:00 2001 From: Alec Henninger <alechenninger@gmail.com> Date: Thu, 21 Nov 2024 07:58:56 -0500 Subject: [PATCH 6/8] Remove extra import --- rbac/management/principal/cleaner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/rbac/management/principal/cleaner.py b/rbac/management/principal/cleaner.py index 4590d368d..e91602979 100644 --- a/rbac/management/principal/cleaner.py +++ b/rbac/management/principal/cleaner.py @@ -21,7 +21,6 @@ import ssl from typing import Optional -from py import log import xmltodict from django.conf import settings from django.db import connection, transaction From d2c424785cd616fcc853d35eb15eca2dc985f74b Mon Sep 17 00:00:00 2001 From: Alec Henninger <alechenninger@gmail.com> Date: Thu, 21 Nov 2024 15:47:31 -0500 Subject: [PATCH 7/8] Only increment message count if we actually processed a message --- rbac/management/principal/cleaner.py | 29 ++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/rbac/management/principal/cleaner.py b/rbac/management/principal/cleaner.py index d8b567e6a..1436492b2 100644 --- a/rbac/management/principal/cleaner.py +++ b/rbac/management/principal/cleaner.py @@ -201,23 +201,25 @@ def process_umb_event(frame, umb_client: Stomp, bootstrap_service: TenantBootstr data_dict = xmltodict.parse(frame.body) canonical_message = data_dict.get("CanonicalMessage") - if not canonical_message: + if canonical_message: + try: + user = retrieve_user_info(canonical_message) + except Exception as e: # Skip processing and leave the it to be processed later + logger.error("process_umb_event: Error retrieving user info: %s", str(e)) + return True + + # By default, only process disabled users. + # If the setting is enabled, process all users. + if not user.is_active or settings.PRINCIPAL_CLEANUP_UPDATE_ENABLED_UMB: + bootstrap_service.update_user(user) + else: # Message is malformed. # Ensure we dont block the entire queue by discarding it. - umb_client.ack(frame) - return True - try: - user = retrieve_user_info(canonical_message) - except Exception as e: # Skip processing and leave the it to be processed later - logger.error("process_umb_event: Error retrieving user info: %s", str(e)) - return True - - # By default, only process disabled users. - # If the setting is enabled, process all users. - if not user.is_active or settings.PRINCIPAL_CLEANUP_UPDATE_ENABLED_UMB: - bootstrap_service.update_user(user) + # TODO: this is not the only way a message can be malformed + pass umb_client.ack(frame) + umb_message_processed_count.inc() return True @@ -238,7 +240,6 @@ def process_principal_events_from_umb(bootstrap_service: Optional[TenantBootstra frame = UMB_CLIENT.receiveFrame() if not process_umb_event(frame, UMB_CLIENT, bootstrap_service): break - umb_message_processed_count.inc() finally: UMB_CLIENT.disconnect() logger.info("process_tenant_principal_events: Principal event processing finished.") From c49c230ebe544f639bffec8eaf8d83932fb53212 Mon Sep 17 00:00:00 2001 From: Alec Henninger <alechenninger@gmail.com> Date: Thu, 21 Nov 2024 15:47:42 -0500 Subject: [PATCH 8/8] Log frame info --- rbac/management/principal/cleaner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rbac/management/principal/cleaner.py b/rbac/management/principal/cleaner.py index 1436492b2..58d36b5f5 100644 --- a/rbac/management/principal/cleaner.py +++ b/rbac/management/principal/cleaner.py @@ -238,6 +238,7 @@ def process_principal_events_from_umb(bootstrap_service: Optional[TenantBootstra try: while UMB_CLIENT.canRead(15): # Check if queue is empty, 15 sec timeout frame = UMB_CLIENT.receiveFrame() + logger.info("process_tenant_principal_events: Processing frame. info=%s", frame.info()) if not process_umb_event(frame, UMB_CLIENT, bootstrap_service): break finally: