Skip to content

Commit

Permalink
Refactor our analytics provider
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathangreen committed Mar 7, 2025
1 parent 7027932 commit 20a6c64
Show file tree
Hide file tree
Showing 14 changed files with 528 additions and 559 deletions.
66 changes: 24 additions & 42 deletions src/palace/manager/celery/tasks/opds_odl.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime
from dataclasses import dataclass

from celery import shared_task
from sqlalchemy import delete, select
Expand All @@ -9,29 +8,21 @@
from palace.manager.api.odl.api import OPDS2WithODLApi
from palace.manager.celery.task import Task
from palace.manager.service.analytics.analytics import Analytics
from palace.manager.service.analytics.eventdata import AnalyticsEventData
from palace.manager.service.celery.celery import QueueNames
from palace.manager.service.redis.models.lock import RedisLock
from palace.manager.service.redis.redis import Redis
from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.library import Library
from palace.manager.sqlalchemy.model.licensing import License, LicensePool
from palace.manager.sqlalchemy.model.patron import Hold, Patron
from palace.manager.sqlalchemy.model.patron import Hold
from palace.manager.util.datetime_helpers import utc_now


@dataclass
class CirculationEventData:
library: Library
license_pool: LicensePool
event_type: str
patron: Patron


def remove_expired_holds_for_collection(
def _remove_expired_holds_for_collection(
db: Session,
collection_id: int,
) -> list[CirculationEventData]:
) -> list[AnalyticsEventData]:
"""
Remove expired holds from the database for this collection.
"""
Expand All @@ -44,10 +35,10 @@ def remove_expired_holds_for_collection(
)

expired_holds = db.scalars(select_query).all()
expired_hold_events: list[CirculationEventData] = []
expired_hold_events = []
for hold in expired_holds:
expired_hold_events.append(
CirculationEventData(
AnalyticsEventData.create(
library=hold.library,
license_pool=hold.license_pool,
event_type=CirculationEvent.CM_HOLD_EXPIRED,
Expand All @@ -67,7 +58,7 @@ def remove_expired_holds_for_collection(
return expired_hold_events


def licensepool_ids_with_holds(
def _licensepool_ids_with_holds(
db: Session, collection_id: int, batch_size: int, after_id: int | None
) -> list[int]:
query = (
Expand All @@ -85,7 +76,7 @@ def licensepool_ids_with_holds(
return db.scalars(query).all()


def lock_licenses(license_pool: LicensePool) -> None:
def _lock_licenses(license_pool: LicensePool) -> None:
"""
Acquire a row level lock on all the licenses for a license pool.
Expand All @@ -100,14 +91,14 @@ def lock_licenses(license_pool: LicensePool) -> None:
).all()


def recalculate_holds_for_licensepool(
def _recalculate_holds_for_licensepool(
license_pool: LicensePool,
reservation_period: datetime.timedelta,
) -> tuple[int, list[CirculationEventData]]:
) -> tuple[int, list[AnalyticsEventData]]:
# We take out row level locks on all the licenses and holds for this license pool, so that
# everything is in a consistent state while we update the hold queue. This means we should be
# quickly committing the transaction, to avoid contention or deadlocks.
lock_licenses(license_pool)
_lock_licenses(license_pool)
holds = license_pool.get_active_holds(for_update=True)

license_pool.update_availability_from_licenses()
Expand All @@ -117,7 +108,7 @@ def recalculate_holds_for_licensepool(
waiting = holds[reserved:]
updated = 0

events: list[CirculationEventData] = []
events = []

# These holds have a copy reserved for them.
for hold in ready:
Expand All @@ -128,7 +119,7 @@ def recalculate_holds_for_licensepool(
hold.end = utc_now() + reservation_period
updated += 1
events.append(
CirculationEventData(
AnalyticsEventData.create(
library=hold.library,
license_pool=hold.license_pool,
event_type=CirculationEvent.CM_HOLD_READY_FOR_CHECKOUT,
Expand Down Expand Up @@ -156,7 +147,7 @@ def remove_expired_holds_for_collection_task(task: Task, collection_id: int) ->

with task.transaction() as session:
collection = Collection.by_id(session, collection_id)
events = remove_expired_holds_for_collection(
events = _remove_expired_holds_for_collection(
session,
collection_id,
)
Expand All @@ -166,7 +157,8 @@ def remove_expired_holds_for_collection_task(task: Task, collection_id: int) ->
f"Removed {len(events)} expired holds for collection {collection_name} ({collection_id})."
)

collect_events(task, events, analytics)
with task.transaction() as session:
_collect_events(session, events, analytics)


@shared_task(queue=QueueNames.default, bind=True)
Expand Down Expand Up @@ -208,27 +200,16 @@ def _redis_lock_recalculate_holds(client: Redis, collection_id: int) -> RedisLoc
)


def collect_events(
task: Task, events: list[CirculationEventData], analytics: Analytics
def _collect_events(
session: Session, events: list[AnalyticsEventData], analytics: Analytics
) -> None:
"""
Collect events after successful database is commit and any row locks are removed.
We perform this operation outside after completed the transaction to ensure that any row locks
are held for the shortest possible duration in case writing to the s3 analytics provider is slow.
"""

for e in events:
with task.transaction() as session:
# one transaction per event to minimize possible database lock durations
library = session.merge(e.library)
license_pool = session.merge(e.license_pool)
patron = session.merge(e.patron)
analytics.collect_event(
event_type=e.event_type,
library=library,
license_pool=license_pool,
patron=patron,
)
for event in events:
analytics.collect(event, session)


@shared_task(queue=QueueNames.default, bind=True)
Expand Down Expand Up @@ -257,7 +238,7 @@ def recalculate_hold_queue_collection(
f"Recalculating hold queue for collection {collection_name} ({collection_id})."
)

license_pool_ids = licensepool_ids_with_holds(
license_pool_ids = _licensepool_ids_with_holds(
session, collection_id, batch_size, after_id
)

Expand All @@ -277,7 +258,7 @@ def recalculate_hold_queue_collection(
)
continue

updated, events = recalculate_holds_for_licensepool(
updated, events = _recalculate_holds_for_licensepool(
license_pool,
reservation_period,
)
Expand All @@ -289,7 +270,8 @@ def recalculate_hold_queue_collection(
f"{updated} holds out of date."
)

collect_events(task, events, analytics)
with task.transaction() as session:
_collect_events(session, events, analytics)

if len(license_pool_ids) == batch_size:
# We are done this batch, but there is probably more work to do, we queue up the next batch.
Expand Down
52 changes: 25 additions & 27 deletions src/palace/manager/service/analytics/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,30 @@
from datetime import datetime
from typing import TYPE_CHECKING

import flask
from sqlalchemy.orm import Session

from palace.manager.service.analytics.eventdata import AnalyticsEventData
from palace.manager.service.analytics.local import LocalAnalyticsProvider
from palace.manager.service.analytics.provider import AnalyticsProvider
from palace.manager.service.analytics.s3 import S3AnalyticsProvider
from palace.manager.sqlalchemy.model.library import Library
from palace.manager.sqlalchemy.model.licensing import LicensePool
from palace.manager.sqlalchemy.model.patron import Patron
from palace.manager.util.datetime_helpers import utc_now
from palace.manager.util.log import LoggerMixin

if TYPE_CHECKING:
from palace.manager.service.storage.s3 import S3Service


class Analytics(LoggerMixin):
class Analytics(LoggerMixin, AnalyticsProvider):
"""Dispatches methods for analytics providers."""

def __init__(
self,
s3_analytics_enabled: bool = False,
s3_service: S3Service | None = None,
) -> None:
self.providers = [LocalAnalyticsProvider()]
self.providers: list[AnalyticsProvider] = [LocalAnalyticsProvider()]

if s3_analytics_enabled:
if s3_service is not None:
Expand All @@ -35,6 +36,14 @@ def __init__(
"S3 analytics is not configured: No analytics bucket was specified."
)

def collect(
self,
event: AnalyticsEventData,
session: Session | None = None,
) -> None:
for provider in self.providers:
provider.collect(event, session)

def collect_event(
self,
library: Library,
Expand All @@ -46,29 +55,18 @@ def collect_event(
patron: Patron | None = None,
neighborhood: str | None = None,
) -> None:
if not time:
time = utc_now()

user_agent: str | None = None
try:
user_agent = flask.request.user_agent.string
if user_agent == "":
user_agent = None
except Exception as e:
self.log.warning(f"Unable to resolve the user_agent: {repr(e)}")

for provider in self.providers:
provider.collect_event(
library,
license_pool,
event_type,
time,
old_value=old_value,
new_value=new_value,
user_agent=user_agent,
patron=patron,
neighborhood=neighborhood,
)
session = Session.object_session(library)
event = AnalyticsEventData.create(
library,
license_pool,
event_type,
time,
old_value,
new_value,
patron,
neighborhood,
)
self.collect(event, session=session)

def is_configured(self) -> bool:
return len(self.providers) > 0
Loading

0 comments on commit 20a6c64

Please sign in to comment.