From d67c1ba239c17ee990837c4df9baea930b79948a Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Thu, 6 Mar 2025 10:41:48 -0400 Subject: [PATCH 01/11] Convert reapers to celery --- src/palace/manager/celery/tasks/reaper.py | 247 +++++++++ .../manager/sqlalchemy/model/credential.py | 4 +- src/palace/manager/sqlalchemy/model/patron.py | 26 +- tests/manager/celery/tasks/test_reaper.py | 522 ++++++++++++++++++ 4 files changed, 792 insertions(+), 7 deletions(-) create mode 100644 src/palace/manager/celery/tasks/reaper.py create mode 100644 tests/manager/celery/tasks/test_reaper.py diff --git a/src/palace/manager/celery/tasks/reaper.py b/src/palace/manager/celery/tasks/reaper.py new file mode 100644 index 0000000000..679958d224 --- /dev/null +++ b/src/palace/manager/celery/tasks/reaper.py @@ -0,0 +1,247 @@ +from datetime import timedelta + +from celery import shared_task +from sqlalchemy import and_, delete, func, select +from sqlalchemy.orm import Session, defer, lazyload +from sqlalchemy.sql import Delete +from sqlalchemy.sql.elements import or_ + +from palace.manager.celery.task import Task +from palace.manager.service.celery.celery import QueueNames +from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent +from palace.manager.sqlalchemy.model.collection import Collection +from palace.manager.sqlalchemy.model.credential import Credential +from palace.manager.sqlalchemy.model.licensing import LicensePool +from palace.manager.sqlalchemy.model.measurement import Measurement +from palace.manager.sqlalchemy.model.patron import Annotation, Hold, Loan, Patron +from palace.manager.sqlalchemy.model.work import Work +from palace.manager.util.datetime_helpers import utc_now + + +def _execute_delete(session: Session, deletion_query: Delete) -> int: + # The transactions in the reaper tasks are short lived, so we don't bother to do the + # extra work of synchronizing the session with the database after the delete. Once + # the transaction is committed, the session contents won't matter anyway. + # If we ever need to do something more complex with the session after the delete, + # we can revisit this. + # https://docs.sqlalchemy.org/en/20/orm/queryguide/dml.html#selecting-a-synchronization-strategy + result = session.execute( + deletion_query, execution_options={"synchronize_session": False} + ) + # We need the type ignores here because result doesn't always have + # a rowcount, but the sqlalchemy docs swear it will in the case of + # a delete statement. + # https://docs.sqlalchemy.org/en/20/tutorial/data_update.html#getting-affected-row-count-from-update-delete + return result.rowcount # type: ignore[attr-defined,no-any-return] + + +def _pluralize(count: int, singular: str, plural: str | None = None) -> str: + if plural is None: + plural = singular + "s" + return singular if count == 1 else plural + + +@shared_task(queue=QueueNames.default, bind=True) +def credential_reaper(task: Task) -> None: + """ + Remove Credentials that expired more than a day ago. + """ + cutoff = utc_now() - timedelta(days=1) + deletion_query = delete(Credential).where(Credential.expires < cutoff) + with task.transaction() as session: + rows_removed = _execute_delete(session, deletion_query) + task.log.info( + f"Deleted {rows_removed} expired {_pluralize(rows_removed, 'credential')}." + ) + + +@shared_task(queue=QueueNames.default, bind=True) +def patron_reaper(task: Task) -> None: + """ + Remove patron records that expired more than 60 days ago. + """ + cutoff = utc_now() - timedelta(days=60) + deletion_query = delete(Patron).where(Patron.authorization_expires < cutoff) + with task.transaction() as session: + rows_removed = _execute_delete(session, deletion_query) + task.log.info( + f"Deleted {rows_removed} expired patron {_pluralize(rows_removed, 'record')}." + ) + + +@shared_task(queue=QueueNames.default, bind=True) +def work_reaper(task: Task, batch_size: int = 1000) -> None: + """ + Remove Works that have no associated LicensePools. + + As soon as a Work loses its last LicensePool it can be removed. + """ + work_query = ( + select(Work) + .outerjoin(LicensePool) + .where(LicensePool.id == None) + .order_by(Work.id) + .limit(batch_size) + # We defer loading of any fields defined as large on the Work to speed up + # our query, and since we are loading works without license pools, we want + # to override the default joined eager loading of license_pools. + .options(*(defer(f) for f in Work.LARGE_FIELDS), lazyload(Work.license_pools)) + ) + search_index = task.services.search.index() + with task.transaction() as session: + works = session.execute(work_query).scalars().all() + for work in works: + task.log.info( + f"Deleting {work!r} because it has no associated LicensePools." + ) + work.delete(search_index=search_index) + + removed = len(works) + task.log.info( + f"Deleted {removed} {_pluralize(removed, 'Work')} with no associated LicensePools." + ) + if len(works) == batch_size: + task.log.info("There may be more Works to delete. Re-queueing the reaper.") + raise task.replace(work_reaper.s(batch_size=batch_size)) + + +@shared_task(queue=QueueNames.default, bind=True) +def collection_reaper(task: Task) -> None: + """ + Remove collections that have been marked for deletion. + """ + collection_query = ( + select(Collection) + .where(Collection.marked_for_deletion == True) + .order_by(Collection.id) + .limit(1) + ) + with task.transaction() as session: + collection = session.execute(collection_query).scalars().one_or_none() + if collection: + task.log.info(f"Deleting {collection!r}.") + collection.delete() + + with task.session() as session: + # Check if there are more collections to delete + collections_awaiting_delete = session.execute( + select(func.count(Collection.id)).where( + Collection.marked_for_deletion == True + ) + ).scalar() + + if collections_awaiting_delete > 0: + task.log.info( + f"{collections_awaiting_delete} {_pluralize(collections_awaiting_delete, 'collection')}" + f" waiting for delete. Re-queueing the reaper." + ) + raise task.replace(collection_reaper.s()) + + +@shared_task(queue=QueueNames.default, bind=True) +def measurement_reaper(task: Task) -> None: + """ + Remove measurements that are not the most recent + """ + deletion_query = delete(Measurement).where(Measurement.is_most_recent == False) + with task.transaction() as session: + rows_removed = _execute_delete(session, deletion_query) + task.log.info( + f"Deleted {rows_removed} outdated {_pluralize(rows_removed, 'measurement')}." + ) + + +@shared_task(queue=QueueNames.default, bind=True) +def annotation_reaper(task: Task) -> None: + """ + The annotation must have motivation=IDLING, must be at least 60 + days old (meaning there has been no attempt to read the book + for 60 days), and must not be associated with one of the + patron's active loans or holds. + """ + cutoff = utc_now() - timedelta(days=60) + + restrictions = [] + for t in Loan, Hold: + active_subquery = ( + select(Annotation.id) + .join(t, t.patron_id == Annotation.patron_id) + .join( + LicensePool, + and_( + LicensePool.id == t.license_pool_id, + LicensePool.identifier_id == Annotation.identifier_id, + ), + ) + ) + restrictions.append(~Annotation.id.in_(active_subquery)) + + deletion_query = delete(Annotation).where( + Annotation.timestamp < cutoff, + Annotation.motivation == Annotation.IDLING, + *restrictions, + ) + + with task.transaction() as session: + rows_removed = _execute_delete(session, deletion_query) + task.log.info( + f"Deleted {rows_removed} outdated idling {_pluralize(rows_removed, 'annotation')}." + ) + + +@shared_task(queue=QueueNames.default, bind=True) +def hold_reaper(task: Task, batch_size: int = 1000) -> None: + """ + Remove seemingly abandoned holds from the database. + """ + cutoff = utc_now() - timedelta(days=365) + query = ( + select(Hold) + .where(Hold.start < cutoff, or_(Hold.end == None, Hold.end < utc_now())) + .order_by(Hold.id) + .limit(batch_size) + ) + events_to_be_logged = [] + with task.transaction() as session: + for count, hold in enumerate(session.execute(query).scalars()): + event = dict( + library=hold.library, + license_pool=hold.license_pool, + event_type=CirculationEvent.CM_HOLD_EXPIRED, + patron=hold.patron, + ) + session.delete(hold) + events_to_be_logged.append(event) + + with task.transaction() as session: + for event in events_to_be_logged: + task.services.analytics.collect_event(**event) + + task.log.info(f"Deleted {count} expired holds.") + + if count == batch_size: + task.log.info("There may be more holds to delete. Re-queueing the reaper.") + raise task.replace(hold_reaper.s(batch_size=batch_size)) + + +@shared_task(queue=QueueNames.default, bind=True) +def loan_reaper(task: Task) -> None: + """ + Remove expired and abandoned loans from the database. + """ + + now = utc_now() + deletion_query = delete(Loan).where( + Loan.license_pool_id == LicensePool.id, + LicensePool.unlimited_access == False, # type: ignore[comparison-overlap] + LicensePool.open_access == False, + or_( + Loan.end < now, + and_(Loan.start < now - timedelta(days=90), Loan.end == None), + ), + ) + + with task.transaction() as session: + rows_removed = _execute_delete(session, deletion_query) + + task.log.info(f"Deleted {rows_removed} expired {_pluralize(rows_removed, 'loan')}.") diff --git a/src/palace/manager/sqlalchemy/model/credential.py b/src/palace/manager/sqlalchemy/model/credential.py index 7b992dd420..9150ef2cd8 100644 --- a/src/palace/manager/sqlalchemy/model/credential.py +++ b/src/palace/manager/sqlalchemy/model/credential.py @@ -30,7 +30,9 @@ class Credential(Base): data_source: Mapped[DataSource | None] = relationship( "DataSource", back_populates="credentials" ) - patron_id = Column(Integer, ForeignKey("patrons.id"), index=True) + patron_id = Column( + Integer, ForeignKey("patrons.id", ondelete="CASCADE"), index=True + ) patron: Mapped[Patron | None] = relationship("Patron", back_populates="credentials") collection_id = Column(Integer, ForeignKey("collections.id"), index=True) collection: Mapped[Collection | None] = relationship( diff --git a/src/palace/manager/sqlalchemy/model/patron.py b/src/palace/manager/sqlalchemy/model/patron.py index 88c7f5912c..aff2cdfb5b 100644 --- a/src/palace/manager/sqlalchemy/model/patron.py +++ b/src/palace/manager/sqlalchemy/model/patron.py @@ -168,7 +168,11 @@ class Patron(Base, RedisKeyMixin): neighborhood: str | None = None loans: Mapped[list[Loan]] = relationship( - "Loan", back_populates="patron", cascade="delete", uselist=True + "Loan", + back_populates="patron", + cascade="delete", + uselist=True, + passive_deletes=True, ) holds: Mapped[list[Hold]] = relationship( "Hold", @@ -176,6 +180,7 @@ class Patron(Base, RedisKeyMixin): cascade="delete", uselist=True, order_by="Hold.id", + passive_deletes=True, ) annotations: Mapped[list[Annotation]] = relationship( @@ -183,15 +188,16 @@ class Patron(Base, RedisKeyMixin): back_populates="patron", order_by="desc(Annotation.timestamp)", cascade="delete", + passive_deletes=True, ) # One Patron can have many associated Credentials. credentials: Mapped[list[Credential]] = relationship( - "Credential", back_populates="patron", cascade="delete" + "Credential", back_populates="patron", cascade="delete", passive_deletes=True ) device_tokens: Mapped[list[DeviceToken]] = relationship( - "DeviceToken", back_populates="patron", passive_deletes=True + "DeviceToken", back_populates="patron", cascade="delete", passive_deletes=True ) __table_args__ = ( @@ -519,7 +525,10 @@ class Loan(Base, LoanAndHoldMixin): id: Mapped[int] = Column(Integer, primary_key=True) patron_id: Mapped[int] = Column( - Integer, ForeignKey("patrons.id"), index=True, nullable=False + Integer, + ForeignKey("patrons.id", ondelete="CASCADE"), + index=True, + nullable=False, ) patron: Mapped[Patron] = relationship("Patron", back_populates="loans") @@ -573,7 +582,10 @@ class Hold(Base, LoanAndHoldMixin): __tablename__ = "holds" id: Mapped[int] = Column(Integer, primary_key=True) patron_id: Mapped[int] = Column( - Integer, ForeignKey("patrons.id"), index=True, nullable=False + Integer, + ForeignKey("patrons.id", ondelete="CASCADE"), + index=True, + nullable=False, ) patron: Mapped[Patron] = relationship( "Patron", back_populates="holds", lazy="joined" @@ -724,7 +736,9 @@ class Annotation(Base): __tablename__ = "annotations" id: Mapped[int] = Column(Integer, primary_key=True) - patron_id = Column(Integer, ForeignKey("patrons.id"), index=True) + patron_id = Column( + Integer, ForeignKey("patrons.id", ondelete="CASCADE"), index=True + ) patron: Mapped[Patron] = relationship("Patron", back_populates="annotations") identifier_id = Column(Integer, ForeignKey("identifiers.id"), index=True) diff --git a/tests/manager/celery/tasks/test_reaper.py b/tests/manager/celery/tasks/test_reaper.py new file mode 100644 index 0000000000..b2a26d5cf6 --- /dev/null +++ b/tests/manager/celery/tasks/test_reaper.py @@ -0,0 +1,522 @@ +import datetime +from unittest.mock import MagicMock + +import pytest + +from palace.manager.celery.tasks.reaper import ( + annotation_reaper, + collection_reaper, + credential_reaper, + hold_reaper, + loan_reaper, + measurement_reaper, + patron_reaper, + work_reaper, +) +from palace.manager.service.logging.configuration import LogLevel +from palace.manager.sqlalchemy.model.classification import Genre +from palace.manager.sqlalchemy.model.collection import Collection +from palace.manager.sqlalchemy.model.coverage import WorkCoverageRecord +from palace.manager.sqlalchemy.model.credential import Credential +from palace.manager.sqlalchemy.model.datasource import DataSource +from palace.manager.sqlalchemy.model.devicetokens import DeviceToken, DeviceTokenTypes +from palace.manager.sqlalchemy.model.edition import Edition +from palace.manager.sqlalchemy.model.licensing import LicensePool +from palace.manager.sqlalchemy.model.measurement import Measurement +from palace.manager.sqlalchemy.model.patron import Annotation, Hold, Loan, Patron +from palace.manager.sqlalchemy.model.work import Work +from palace.manager.sqlalchemy.util import get_one_or_create +from palace.manager.util.datetime_helpers import utc_now +from tests.fixtures.celery import CeleryFixture +from tests.fixtures.database import DatabaseTransactionFixture +from tests.fixtures.services import ServicesFixture + + +def test_credential_reaper( + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + caplog: pytest.LogCaptureFixture, +) -> None: + caplog.set_level(LogLevel.info) + + # Create four Credentials: two expired, two valid. + expired1 = db.credential() + expired2 = db.credential() + now = utc_now() + expiration_date = now - datetime.timedelta(days=1, seconds=1) + for e in [expired1, expired2]: + e.expires = expiration_date + + active = db.credential() + active.expires = now + + eternal = db.credential() + + # Run the reaper. + credential_reaper.delay().wait() + + # The expired credentials have been reaped; the others + # are still in the database. + remaining = set(db.session.query(Credential).all()) + assert {active, eternal} == remaining + + # The reaper logged its work. + assert "Deleted 2 expired credentials." in caplog.messages + + +def test_patron_reaper( + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + caplog: pytest.LogCaptureFixture, +) -> None: + caplog.set_level(LogLevel.info) + + # Patron that has expired. The patron has some related objects that should be deleted along with the patron. + expired = db.patron() + now = utc_now() + expired.authorization_expires = now - datetime.timedelta(days=61) + db.credential(patron=expired) + db.session.add(Annotation(patron=expired)) + DeviceToken.create(db.session, DeviceTokenTypes.FCM_ANDROID, "token", expired) + + # Patron that is about to expire + active = db.patron() + active.authorization_expires = now - datetime.timedelta(days=59) + + # Patron that has no expiration + no_expiration = db.patron() + no_expiration.authorization_expires = None + + # Run the reaper. + patron_reaper.delay().wait() + + # The expired patron has been reaped; the others are still in the database. + assert set(db.session.query(Patron).all()) == {active, no_expiration} + + # The reaper logged its work. + assert "Deleted 1 expired patron record." in caplog.messages + + +class TestWorkReaper: + def test_reap( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + services_fixture: ServicesFixture, + ) -> None: + # Set up our search mock to track calls to remove_work + removed = set() + mock_remove_work = services_fixture.search_fixture.index_mock.remove_work + mock_remove_work.side_effect = lambda x: removed.add(x.id) + + # First, create three works. + + # This work has a license pool. + has_license_pool = db.work(with_license_pool=True) + + # This work had a license pool and then lost it. + had_license_pool = db.work(with_license_pool=True) + db.session.delete(had_license_pool.license_pools[0]) + + # This work never had a license pool. + never_had_license_pool = db.work(with_license_pool=False) + + # Each work has a presentation edition -- keep track of these + # for later. + works = db.session.query(Work) + presentation_editions = [x.presentation_edition for x in works] + + # If and when Work gets database-level cascading deletes, this + # is where they will all be triggered, with no chance that an + # ORM-level delete is doing the work. So let's verify that all + # the cascades work. + + # First, set up some related items for each Work. + + # Each work is assigned to a genre. + genre, ignore = Genre.lookup(db.session, "Science Fiction") + for work in works: + work.genres = [genre] + + # Each work is on the same CustomList. + l, ignore = db.customlist("a list", num_entries=0) + for work in works: + l.add_entry(work) + + # Each work has a WorkCoverageRecord. + for work in works: + WorkCoverageRecord.add_for(work, operation="some operation") + + # Run the reaper. + work_reaper.delay().wait() + + # Search index was updated + assert len(removed) == 2 + assert has_license_pool.id not in removed + assert had_license_pool.id in removed + assert never_had_license_pool.id in removed + + # Only the work with a license pool remains. + assert db.session.query(Work).all() == [has_license_pool] + + # The presentation editions are still around, since they might + # theoretically be used by other parts of the system. + assert set(db.session.query(Edition).all()) == set(presentation_editions) + + # The surviving work is still assigned to the Genre, and still + # has WorkCoverageRecords. + assert genre.works == [has_license_pool] + surviving_records = db.session.query(WorkCoverageRecord) + assert surviving_records.count() > 0 + assert all(x.work == has_license_pool for x in surviving_records) + + # The CustomListEntries still exist, but two of them have lost + # their work. + assert len([x for x in l.entries if not x.work]) == 2 + assert [x.work for x in l.entries if x.work] == [has_license_pool] + + def test_batch( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + services_fixture: ServicesFixture, + ) -> None: + # Create some works that will be reaped. + [db.work(with_license_pool=False) for i in range(6)] + + # Run the reaper, with a batch size of 2, so it will have to + # requeue itself to fully process all the works. + work_reaper.delay(batch_size=2).wait() + + # Make sure the works were deleted + assert db.session.query(Work).all() == [] + + +class TestCollectionReaper: + def test_reap( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + services_fixture: ServicesFixture, + caplog: pytest.LogCaptureFixture, + ): + # End-to-end test + caplog.set_level(LogLevel.info) + + # Three collections: two marked for deletion (one active, and one inactive), one not. + c1 = db.collection() + c2 = db.collection(inactive=True) + c2.marked_for_deletion = True + c3 = db.collection(inactive=False) + c3.marked_for_deletion = True + + # Run reaper + collection_reaper.delay().wait() + + # The Collections marked for deletion have been deleted; the other + # one is unaffected. + assert [c1] == db.session.query(Collection).all() + assert f"Deleting {c2!r}." in caplog.messages + assert ( + f"1 collection waiting for delete. Re-queueing the reaper." + in caplog.messages + ) + assert f"Deleting {c3!r}." in caplog.messages + + def test_reaper_delete_calls_collection_delete( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + monkeypatch: pytest.MonkeyPatch, + ): + # The collection reaper should call the delete method on the collection + # rather than deleting the collection directly in the database. + collection = db.collection() + collection.marked_for_deletion = True + + mock_delete = MagicMock(side_effect=collection.delete) + monkeypatch.setattr(Collection, "delete", mock_delete) + + # Run reaper + collection_reaper.delay().wait() + + # Make sure we called the delete method on the collection + mock_delete.assert_called_once() + + +def test_measurement_reaper( + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + caplog: pytest.LogCaptureFixture, +): + # End-to-end test + caplog.set_level(LogLevel.info) + + recent_measurement, _ = get_one_or_create( + db.session, + Measurement, + quantity_measured="answer", + value=12, + is_most_recent=True, + ) + outdated_measurement, _ = get_one_or_create( + db.session, + Measurement, + quantity_measured="answer", + value=42, + is_most_recent=False, + ) + + measurement_reaper.delay().wait() + + assert db.session.query(Measurement).all() == [recent_measurement] + assert "Deleted 1 outdated measurement." in caplog.messages + + +def test_annotation_reaper( + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + caplog: pytest.LogCaptureFixture, +): + caplog.set_level(LogLevel.info) + + # Two books. + ignore, lp1 = db.edition(with_license_pool=True) + ignore, lp2 = db.edition(with_license_pool=True) + + # Two patrons who sync their annotations. + p1 = db.patron() + p2 = db.patron() + for p in [p1, p2]: + p.synchronize_annotations = True + now = utc_now() + not_that_old = now - datetime.timedelta(days=59) + very_old = now - datetime.timedelta(days=61) + + def _annotation( + patron: Patron, + pool: LicensePool, + content: str, + motivation: str = Annotation.IDLING, + timestamp: datetime.datetime = very_old, + ) -> Annotation: + annotation, _ = get_one_or_create( + db.session, + Annotation, + patron=patron, + identifier=pool.identifier, + motivation=motivation, + ) + annotation.timestamp = timestamp + annotation.content = content + return annotation + + # The first patron will not be affected by the + # reaper. Although their annotations are very old, they have + # an active loan for one book and a hold on the other. + loan = lp1.loan_to(p1) + old_loan = _annotation(p1, lp1, "old loan") + + hold = lp2.on_hold_to(p1) + old_hold = _annotation(p1, lp2, "old hold") + + # The second patron has a very old annotation for the first + # book. This is the only annotation that will be affected by + # the reaper. + should_be_reaped = _annotation(p2, lp1, "abandoned") + + # The second patron also has a very old non-idling annotation + # for the first book, which will not be reaped because only + # idling annotations are reaped. + not_idling = _annotation(p2, lp1, "not idling", motivation="some other motivation") + + # The second patron has a non-old idling annotation for the + # second book, which will not be reaped (even though there is + # no active loan or hold) because it's not old enough. + new_idling = _annotation(p2, lp2, "recent", timestamp=not_that_old) + + # Run the reaper + annotation_reaper.delay().wait() + + # The reaper logged its work. + assert "Deleted 1 outdated idling annotation." in caplog.messages + + # The annotation that should have been reaped is gone + assert db.session.query(Annotation).all() == [ + old_loan, + old_hold, + not_idling, + new_idling, + ] + + +def test_hold_reaper( + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + services_fixture: ServicesFixture, + caplog: pytest.LogCaptureFixture, +): + caplog.set_level(LogLevel.info) + + # This patron stopped using the circulation manager a long time + # ago. + inactive_patron = db.patron() + + # This patron is still using the circulation manager. + current_patron = db.patron() + + # We're going to give these patrons some loans and holds. + edition, open_access = db.edition( + with_license_pool=True, with_open_access_download=True + ) + + not_open_access_1 = db.licensepool( + edition, open_access=False, data_source_name=DataSource.BIBLIOTHECA + ) + not_open_access_2 = db.licensepool( + edition, open_access=False, data_source_name=DataSource.AXIS_360 + ) + not_open_access_3 = db.licensepool( + edition, open_access=False, data_source_name=DataSource.BIBBLIO + ) + + now = utc_now() + a_long_time_ago = now - datetime.timedelta(days=1000) + not_very_long_ago = now - datetime.timedelta(days=60) + even_longer = now - datetime.timedelta(days=2000) + the_future = now + datetime.timedelta(days=1) + + # This hold expired without ever becoming a loan (that we saw). + not_open_access_1.on_hold_to( + inactive_patron, start=even_longer, end=a_long_time_ago + ) + + # This hold has no end date and is older than a year. + not_open_access_2.on_hold_to( + inactive_patron, + start=a_long_time_ago, + end=None, + ) + + # This hold has not expired yet. + not_open_access_1.on_hold_to(current_patron, start=now, end=the_future) + + # This hold has no end date but is pretty recent. + not_open_access_3.on_hold_to(current_patron, start=not_very_long_ago, end=None) + + assert len(inactive_patron.holds) == 2 + assert len(current_patron.holds) == 2 + + # Now we fire up the hold reaper. + hold_reaper.delay().wait() + + # All the inactive patron's holds have been reaped + assert db.session.query(Hold).where(Hold.patron == inactive_patron).all() == [] + assert len(db.session.query(Hold).where(Hold.patron == current_patron).all()) == 2 + + # verify expected circ event count for hold reaper run + call_args_list = ( + services_fixture.analytics_fixture.analytics_mock.collect_event.call_args_list + ) + assert len(call_args_list) == 2 + event_types = [call_args.kwargs["event_type"] for call_args in call_args_list] + assert event_types == [ + CirculationEvent.CM_HOLD_EXPIRED, + CirculationEvent.CM_HOLD_EXPIRED, + ] + + +def test_loan_reaper( + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + caplog: pytest.LogCaptureFixture, +): + caplog.set_level(LogLevel.info) + + # This patron stopped using the circulation manager a long time + # ago. + inactive_patron = db.patron() + + # This patron is still using the circulation manager. + current_patron = db.patron() + + # We're going to give these patrons some loans and holds. + edition, open_access = db.edition( + with_license_pool=True, with_open_access_download=True + ) + + not_open_access_1 = db.licensepool( + edition, open_access=False, data_source_name=DataSource.OVERDRIVE + ) + not_open_access_2 = db.licensepool( + edition, open_access=False, data_source_name=DataSource.AXIS_360 + ) + not_open_access_3 = db.licensepool( + edition, open_access=False, data_source_name=DataSource.BIBBLIO + ) + unlimited_access = db.licensepool( + edition, unlimited_access=True, data_source_name=DataSource.AMAZON + ) + + now = utc_now() + a_long_time_ago = now - datetime.timedelta(days=1000) + not_very_long_ago = now - datetime.timedelta(days=60) + even_longer = now - datetime.timedelta(days=2000) + the_future = now + datetime.timedelta(days=1) + + # This loan has expired. + not_open_access_1.loan_to(inactive_patron, start=even_longer, end=a_long_time_ago) + + # This loan has no end date and is older than 90 days. + not_open_access_3.loan_to( + inactive_patron, + start=a_long_time_ago, + end=None, + ) + + # This loan has no end date, but it's for an open-access work. + open_access_loan, ignore = open_access.loan_to( + inactive_patron, + start=a_long_time_ago, + end=None, + ) + + # An unlimited loan should not get reaped regardless of age + unlimited_access_loan, ignore = unlimited_access.loan_to( + inactive_patron, start=a_long_time_ago, end=None + ) + + # This loan has not expired yet. + not_expired, _ = not_open_access_1.loan_to( + current_patron, start=now, end=the_future + ) + + # This loan has no end date but is pretty recent. + recent, _ = not_open_access_2.loan_to( + current_patron, start=not_very_long_ago, end=None + ) + + assert len(inactive_patron.loans) == 4 + assert len(current_patron.loans) == 2 + + # Now we fire up the loan reaper. + loan_reaper.delay().wait() + + # All the inactive patron's loans have been reaped, + # except for the loans for open-access works and unlimited-access works, + # which will never be reaped. + assert set(db.session.query(Loan).where(Loan.patron == inactive_patron).all()) == { + open_access_loan, + unlimited_access_loan, + } + + # The current patron's loans are unaffected, either + # because they have not expired or because they have no known + # expiration date and were created relatively recently. + assert set(db.session.query(Loan).where(Loan.patron == current_patron).all()) == { + not_expired, + recent, + } + + # The reaper logged its work. + assert "Deleted 2 expired loans." in caplog.messages From 52993644932bf6d1e002b00fdf299398b4eb98cc Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Thu, 6 Mar 2025 17:05:44 -0400 Subject: [PATCH 02/11] Reapers added and tested --- src/palace/manager/celery/tasks/reaper.py | 27 ++++++++++++++--------- tests/manager/celery/tasks/test_reaper.py | 7 +++--- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/palace/manager/celery/tasks/reaper.py b/src/palace/manager/celery/tasks/reaper.py index 679958d224..bdec4513de 100644 --- a/src/palace/manager/celery/tasks/reaper.py +++ b/src/palace/manager/celery/tasks/reaper.py @@ -7,6 +7,7 @@ from sqlalchemy.sql.elements import or_ from palace.manager.celery.task import Task +from palace.manager.service.analytics.eventdata import AnalyticsEventData from palace.manager.service.celery.celery import QueueNames from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent from palace.manager.sqlalchemy.model.collection import Collection @@ -190,11 +191,12 @@ def annotation_reaper(task: Task) -> None: @shared_task(queue=QueueNames.default, bind=True) -def hold_reaper(task: Task, batch_size: int = 1000) -> None: +def hold_reaper(task: Task, batch_size: int = 100) -> None: """ Remove seemingly abandoned holds from the database. """ cutoff = utc_now() - timedelta(days=365) + analytics_service = task.services.analytics.analytics() query = ( select(Hold) .where(Hold.start < cutoff, or_(Hold.end == None, Hold.end < utc_now())) @@ -203,21 +205,24 @@ def hold_reaper(task: Task, batch_size: int = 1000) -> None: ) events_to_be_logged = [] with task.transaction() as session: - for count, hold in enumerate(session.execute(query).scalars()): - event = dict( - library=hold.library, - license_pool=hold.license_pool, - event_type=CirculationEvent.CM_HOLD_EXPIRED, - patron=hold.patron, + holds = session.execute(query).scalars().all() + for hold in holds: + events_to_be_logged.append( + AnalyticsEventData.create( + library=hold.library, + license_pool=hold.license_pool, + event_type=CirculationEvent.CM_HOLD_EXPIRED, + patron=hold.patron, + ) ) session.delete(hold) - events_to_be_logged.append(event) + + count = len(holds) + task.log.info(f"Deleted {count} expired {_pluralize(count, 'hold')}.") with task.transaction() as session: for event in events_to_be_logged: - task.services.analytics.collect_event(**event) - - task.log.info(f"Deleted {count} expired holds.") + analytics_service.collect(event=event, session=session) if count == batch_size: task.log.info("There may be more holds to delete. Re-queueing the reaper.") diff --git a/tests/manager/celery/tasks/test_reaper.py b/tests/manager/celery/tasks/test_reaper.py index b2a26d5cf6..f4f53e28d5 100644 --- a/tests/manager/celery/tasks/test_reaper.py +++ b/tests/manager/celery/tasks/test_reaper.py @@ -14,6 +14,7 @@ work_reaper, ) from palace.manager.service.logging.configuration import LogLevel +from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent from palace.manager.sqlalchemy.model.classification import Genre from palace.manager.sqlalchemy.model.collection import Collection from palace.manager.sqlalchemy.model.coverage import WorkCoverageRecord @@ -408,7 +409,7 @@ def test_hold_reaper( assert len(current_patron.holds) == 2 # Now we fire up the hold reaper. - hold_reaper.delay().wait() + hold_reaper.delay(batch_size=1).wait() # All the inactive patron's holds have been reaped assert db.session.query(Hold).where(Hold.patron == inactive_patron).all() == [] @@ -416,10 +417,10 @@ def test_hold_reaper( # verify expected circ event count for hold reaper run call_args_list = ( - services_fixture.analytics_fixture.analytics_mock.collect_event.call_args_list + services_fixture.analytics_fixture.analytics_mock.collect.call_args_list ) assert len(call_args_list) == 2 - event_types = [call_args.kwargs["event_type"] for call_args in call_args_list] + event_types = [call_args.kwargs["event"].type for call_args in call_args_list] assert event_types == [ CirculationEvent.CM_HOLD_EXPIRED, CirculationEvent.CM_HOLD_EXPIRED, From 0b40892272c1cfbeb91e0fd51d6fb4c4c3bea105 Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Thu, 6 Mar 2025 17:10:07 -0400 Subject: [PATCH 03/11] Add our reaper configuration --- src/palace/manager/service/celery/celery.py | 56 +++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/src/palace/manager/service/celery/celery.py b/src/palace/manager/service/celery/celery.py index 7bcce6a009..73b37ed6d1 100644 --- a/src/palace/manager/service/celery/celery.py +++ b/src/palace/manager/service/celery/celery.py @@ -92,6 +92,62 @@ def beat_schedule() -> dict[str, Any]: hour="4", ), # Once a day at 4:00 AM }, + "credential_reaper": { + "task": "reaper.credential_reaper", + "schedule": crontab( + minute="5", + hour="2", + ), # Once a day at 2:05 AM + }, + "patron_reaper": { + "task": "reaper.patron_reaper", + "schedule": crontab( + minute="10", + hour="2", + ), # Once a day at 2:10 AM + }, + "collection_reaper": { + "task": "reaper.collection_reaper", + "schedule": crontab( + minute="15", + hour="2", + ), # Once a day at 2:15 AM + }, + "work_reaper": { + "task": "reaper.work_reaper", + "schedule": crontab( + minute="20", + hour="2", + ), # Once a day at 2:20 AM + }, + "measurement_reaper": { + "task": "reaper.measurement_reaper", + "schedule": crontab( + minute="25", + hour="2", + ), # Once a day at 2:25 AM + }, + "annotation_reaper": { + "task": "reaper.annotation_reaper", + "schedule": crontab( + minute="30", + hour="2", + ), # Once a day at 2:30 AM + }, + "hold_reaper": { + "task": "reaper.hold_reaper", + "schedule": crontab( + minute="35", + hour="2", + ), # Once a day at 2:35 AM + }, + "loan_reaper": { + "task": "reaper.loan_reaper", + "schedule": crontab( + minute="40", + hour="2", + ), # Once a day at 2:40 AM + }, } From 149d892e6f66add799ae407727ca2ef9b3305b51 Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Thu, 6 Mar 2025 17:17:46 -0400 Subject: [PATCH 04/11] Remove old reaper code --- bin/database_reaper | 8 - docker/services/cron/cron.d/circulation | 3 - src/palace/manager/api/monitor.py | 170 ------------- src/palace/manager/core/monitor.py | 272 -------------------- src/palace/manager/scripts/monitor.py | 11 +- tests/manager/api/test_monitor.py | 260 ------------------- tests/manager/core/test_monitor.py | 316 +----------------------- tests/manager/scripts/test_monitor.py | 16 +- 8 files changed, 6 insertions(+), 1050 deletions(-) delete mode 100755 bin/database_reaper delete mode 100644 src/palace/manager/api/monitor.py delete mode 100644 tests/manager/api/test_monitor.py diff --git a/bin/database_reaper b/bin/database_reaper deleted file mode 100755 index a5e264bb4f..0000000000 --- a/bin/database_reaper +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env python -"""Remove miscellaneous expired things (Credentials, Loans, etc.) -from the database. -""" - -from palace.manager.scripts.monitor import RunReaperMonitorsScript - -RunReaperMonitorsScript().run() diff --git a/docker/services/cron/cron.d/circulation b/docker/services/cron/cron.d/circulation index fc0b775616..4ae7ef6bc1 100644 --- a/docker/services/cron/cron.d/circulation +++ b/docker/services/cron/cron.d/circulation @@ -30,9 +30,6 @@ HOME=/var/www/circulation # those works. 30 22 * * * root bin/run work_classify_unchecked_subjects >> /var/log/cron.log 2>&1 -# Remove miscellaneous expired things from the database -0 2 * * * root bin/run database_reaper >> /var/log/cron.log 2>&1 - # Sync a library's collection with NoveList 0 0 * * 0 root bin/run -d 60 novelist_update >> /var/log/cron.log 2>&1 diff --git a/src/palace/manager/api/monitor.py b/src/palace/manager/api/monitor.py deleted file mode 100644 index 8316be6c3a..0000000000 --- a/src/palace/manager/api/monitor.py +++ /dev/null @@ -1,170 +0,0 @@ -from sqlalchemy import and_, or_ - -from palace.manager.api.opds_for_distributors import OPDSForDistributorsAPI -from palace.manager.core.monitor import ReaperMonitor -from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent -from palace.manager.sqlalchemy.model.collection import Collection -from palace.manager.sqlalchemy.model.integration import IntegrationConfiguration -from palace.manager.sqlalchemy.model.licensing import LicensePool -from palace.manager.sqlalchemy.model.patron import Annotation, Hold, Loan -from palace.manager.util.datetime_helpers import utc_now - - -class LoanlikeReaperMonitor(ReaperMonitor): - SOURCE_OF_TRUTH_PROTOCOLS = [ - OPDSForDistributorsAPI.label(), - ] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._events_to_be_logged = [] - - @property - def where_clause(self): - """We never want to automatically reap loans or holds for situations - where the circulation manager is the source of truth. If we - delete something we shouldn't have, we won't be able to get - the 'real' information back. - - This means loans of open-access content and loans from - collections based on a protocol found in - SOURCE_OF_TRUTH_PROTOCOLS. - - Subclasses will append extra clauses to this filter. - """ - source_of_truth = or_( - LicensePool.open_access == True, - IntegrationConfiguration.protocol.in_(self.SOURCE_OF_TRUTH_PROTOCOLS), - ) - - source_of_truth_subquery = ( - self._db.query(self.MODEL_CLASS.id) - .join(self.MODEL_CLASS.license_pool) - .join(LicensePool.collection) - .join( - IntegrationConfiguration, - Collection.integration_configuration_id == IntegrationConfiguration.id, - ) - .filter(source_of_truth) - ) - return ~self.MODEL_CLASS.id.in_(source_of_truth_subquery) - - def after_commit(self) -> None: - super().after_commit() - copy_of_list = list(self._events_to_be_logged) - - for event in copy_of_list: - # start a separate transaction for each event in order to - # minimize database lock durations - with self._db.begin() as transaction: - self.services.analytics.collect_event(**event) - self._events_to_be_logged.remove(event) - - -class LoanReaper(LoanlikeReaperMonitor): - """Remove expired and abandoned loans from the database.""" - - MODEL_CLASS: type[Loan] = Loan - MAX_AGE = 90 - - @property - def where_clause(self): - """Find loans that have either expired, or that were created a long - time ago and have no definite end date. - """ - start_field = self.MODEL_CLASS.start - end_field = self.MODEL_CLASS.end - superclause = super().where_clause - now = utc_now() - expired = end_field < now - very_old_with_no_clear_end_date = and_( - start_field < self.cutoff, end_field == None - ) - not_unlimited_access = LicensePool.unlimited_access == False - return and_( - superclause, - not_unlimited_access, - or_(expired, very_old_with_no_clear_end_date), - ) - - def query(self): - query = super().query() - return query.join(self.MODEL_CLASS.license_pool) - - -ReaperMonitor.REGISTRY.append(LoanReaper) - - -class HoldReaper(LoanlikeReaperMonitor): - """Remove seemingly abandoned holds from the database.""" - - MODEL_CLASS = Hold - MAX_AGE = 365 - - @property - def where_clause(self): - """Find holds that were created a long time ago and either have - no end date or have an end date in the past. - - The 'end date' for a hold is just an estimate, but if the estimate - is in the future it's better to keep the hold around. - """ - start_field = self.MODEL_CLASS.start - end_field = self.MODEL_CLASS.end - superclause = super().where_clause - end_date_in_past = end_field < utc_now() - probably_abandoned = and_( - start_field < self.cutoff, or_(end_field == None, end_date_in_past) - ) - return and_(superclause, probably_abandoned) - - def delete(self, row) -> None: - event = dict( - library=row.library, - license_pool=row.license_pool, - event_type=CirculationEvent.CM_HOLD_EXPIRED, - patron=row.patron, - ) - super().delete(row) - self._events_to_be_logged.append(event) - - -ReaperMonitor.REGISTRY.append(HoldReaper) - - -class IdlingAnnotationReaper(ReaperMonitor): - """Remove idling annotations for inactive loans.""" - - MODEL_CLASS = Annotation - TIMESTAMP_FIELD = "timestamp" - MAX_AGE = 60 - - @property - def where_clause(self): - """The annotation must have motivation=IDLING, must be at least 60 - days old (meaning there has been no attempt to read the book - for 60 days), and must not be associated with one of the - patron's active loans or holds. - """ - superclause = super().where_clause - - restrictions = [] - for t in Loan, Hold: - active_subquery = ( - self._db.query(Annotation.id) - .join(t, t.patron_id == Annotation.patron_id) - .join( - LicensePool, - and_( - LicensePool.id == t.license_pool_id, - LicensePool.identifier_id == Annotation.identifier_id, - ), - ) - ) - restrictions.append(~Annotation.id.in_(active_subquery)) - return and_( - superclause, Annotation.motivation == Annotation.IDLING, *restrictions - ) - - -ReaperMonitor.REGISTRY.append(IdlingAnnotationReaper) diff --git a/src/palace/manager/core/monitor.py b/src/palace/manager/core/monitor.py index 67d4ac7b3c..5d4aafafe8 100644 --- a/src/palace/manager/core/monitor.py +++ b/src/palace/manager/core/monitor.py @@ -6,7 +6,6 @@ from typing import TYPE_CHECKING from sqlalchemy.exc import InvalidRequestError -from sqlalchemy.orm import defer from sqlalchemy.orm.exc import ObjectDeletedError, StaleDataError from sqlalchemy.sql.expression import and_, or_ from tenacity import ( @@ -20,16 +19,13 @@ from palace.manager.core.metadata_layer import TimestampData from palace.manager.service.container import container_instance from palace.manager.sqlalchemy.model.base import Base -from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent from palace.manager.sqlalchemy.model.classification import Subject from palace.manager.sqlalchemy.model.collection import Collection, CollectionMissing from palace.manager.sqlalchemy.model.coverage import CoverageRecord, Timestamp -from palace.manager.sqlalchemy.model.credential import Credential from palace.manager.sqlalchemy.model.customlist import CustomListEntry from palace.manager.sqlalchemy.model.edition import Edition from palace.manager.sqlalchemy.model.identifier import Identifier from palace.manager.sqlalchemy.model.licensing import LicensePool -from palace.manager.sqlalchemy.model.measurement import Measurement from palace.manager.sqlalchemy.model.patron import Patron from palace.manager.sqlalchemy.model.work import Work from palace.manager.sqlalchemy.presentation import PresentationCalculationPolicy @@ -814,271 +810,3 @@ class CustomListEntryWorkUpdateMonitor(CustomListEntrySweepMonitor): def process_item(self, item): item.set_work() - - -class ReaperMonitor(Monitor): - """A Monitor that deletes database rows that have expired but - have no other process to delete them. - - A subclass of ReaperMonitor MUST define values for the following - constants: - * MODEL_CLASS - The model class this monitor is reaping, e.g. Credential. - * TIMESTAMP_FIELD - Within the model class, the DateTime field to be - used when deciding which rows to deleting, - e.g. 'expires'. The reaper will be more efficient if there's - an index on this field. - * MAX_AGE - A datetime.timedelta or number of days representing - the time that must pass before an item can be safely deleted. - - A subclass of ReaperMonitor MAY define values for the following constants: - * BATCH_SIZE - The number of rows to fetch for deletion in a single - batch. The default is 1000. - - If your model class has fields that might contain a lot of data - and aren't important to the reaping process, put their field names - into a list called LARGE_FIELDS and the Reaper will avoid fetching - that information, improving performance. - """ - - MODEL_CLASS: type[Base] - TIMESTAMP_FIELD: str | None = None - MAX_AGE: datetime.timedelta | int | float - BATCH_SIZE: int = 1000 - - REGISTRY: list[type[Monitor]] = [] - - def __init__(self, *args, **kwargs): - self.SERVICE_NAME = "Reaper for %s" % self.MODEL_CLASS.__name__ - if self.TIMESTAMP_FIELD is not None: - self.SERVICE_NAME += ".%s" % self.TIMESTAMP_FIELD - - super().__init__(*args, **kwargs) - - @property - def cutoff(self): - """Items with a timestamp earlier than this time will be reaped.""" - if isinstance(self.MAX_AGE, datetime.timedelta): - max_age = self.MAX_AGE - else: - max_age = datetime.timedelta(days=self.MAX_AGE) - return utc_now() - max_age - - @property - def timestamp_field(self): - return getattr(self.MODEL_CLASS, self.TIMESTAMP_FIELD) - - @property - def where_clause(self): - """A SQLAlchemy clause that identifies the database rows to be reaped.""" - return self.timestamp_field < self.cutoff - - def run_once(self, *args, **kwargs): - rows_deleted = 0 - qu = self.query() - to_defer = getattr(self.MODEL_CLASS, "LARGE_FIELDS", []) - for x in to_defer: - qu = qu.options(defer(x)) - count = qu.count() - self.log.info("Deleting %d row(s)", count) - while count > 0: - for i in qu.limit(self.BATCH_SIZE): - self.log.info("Deleting %r", i) - self.delete(i) - rows_deleted += 1 - self._db.commit() - - self.after_commit() - - count = qu.count() - return TimestampData(achievements="Items deleted: %d" % rows_deleted) - - def after_commit(self) -> None: - return None - - def delete(self, row) -> None: - """Delete a row from the database. - - CAUTION: If you override this method such that it doesn't - actually delete the database row, then run_once() may enter an - infinite loop. - """ - self._db.delete(row) - - def query(self): - return self._db.query(self.MODEL_CLASS).filter(self.where_clause) - - -# ReaperMonitors that do something specific. - - -class CredentialReaper(ReaperMonitor): - """Remove Credentials that expired more than a day ago.""" - - MODEL_CLASS = Credential - TIMESTAMP_FIELD = "expires" - MAX_AGE = 1 - - -ReaperMonitor.REGISTRY.append(CredentialReaper) - - -class PatronRecordReaper(ReaperMonitor): - """Remove patron records that expired more than 60 days ago""" - - MODEL_CLASS = Patron - TIMESTAMP_FIELD = "authorization_expires" - MAX_AGE = 60 - - -ReaperMonitor.REGISTRY.append(PatronRecordReaper) - - -class WorkReaper(ReaperMonitor): - """Remove Works that have no associated LicensePools. - - Unlike other reapers, no timestamp is relevant. As soon as a Work - loses its last LicensePool it can be removed. - """ - - MODEL_CLASS = Work - - def __init__(self, *args, **kwargs): - search_index_client = kwargs.pop("search_index_client", None) - super().__init__(*args, **kwargs) - self.search_index_client = search_index_client or self.services.search.index() - - def query(self): - return ( - self._db.query(Work) - .outerjoin(Work.license_pools) - .filter(LicensePool.id == None) - ) - - def delete(self, work): - """Delete work from opensearch and database.""" - work.delete(search_index=self.search_index_client) - - -ReaperMonitor.REGISTRY.append(WorkReaper) - - -class CollectionReaper(ReaperMonitor): - """Remove collections that have been marked for deletion.""" - - MODEL_CLASS = Collection - - @property - def where_clause(self): - """A SQLAlchemy clause that identifies the database rows to be reaped.""" - return Collection.marked_for_deletion == True - - def delete(self, collection): - """Delete a Collection from the database. - - Database deletion of a Collection might take a really long - time, so we call a special method that will do the deletion - incrementally and can pick up where it left off if there's a - failure. - """ - collection.delete() - - -ReaperMonitor.REGISTRY.append(CollectionReaper) - - -class MeasurementReaper(ReaperMonitor): - """Remove measurements that are not the most recent""" - - MODEL_CLASS = Measurement - - @property - def where_clause(self): - return Measurement.is_most_recent == False - - def run_once(self, *args, **kwargs): - rows_deleted = self.query().delete() - self._db.commit() - return TimestampData(achievements="Items deleted: %d" % rows_deleted) - - -ReaperMonitor.REGISTRY.append(MeasurementReaper) - - -class ScrubberMonitor(ReaperMonitor): - """Scrub information from the database. - - Unlike the other ReaperMonitors, this class doesn't delete rows - from the database -- it only clears out specific data fields. - - In addition to the constants required for ReaperMonitor, a - subclass of ScrubberMonitor MUST define a value for the following - constant: - - * SCRUB_FIELD - The field whose value will be set to None when a row - is scrubbed. - """ - - def __init__(self, *args, **kwargs): - """Set the name of the Monitor based on which field is being - scrubbed. - """ - super().__init__(*args, **kwargs) - self.SERVICE_NAME = "Scrubber for {}.{}".format( - self.MODEL_CLASS.__name__, - self.SCRUB_FIELD, - ) - - def run_once(self, *args, **kwargs): - """Find all rows that need to be scrubbed, and scrub them.""" - rows_scrubbed = 0 - cls = self.MODEL_CLASS - update = ( - cls.__table__.update() - .where(self.where_clause) - .values({self.SCRUB_FIELD: None}) - .returning(cls.id) - ) - scrubbed = self._db.execute(update).fetchall() - self._db.commit() - return TimestampData(achievements="Items scrubbed: %d" % len(scrubbed)) - - @property - def where_clause(self): - """Find rows that are older than MAX_AGE _and_ which have a non-null - SCRUB_FIELD. If the field is already null, there's no need to - scrub it. - """ - return and_(super().where_clause, self.scrub_field != None) - - @property - def scrub_field(self): - """Find the SQLAlchemy representation of the model field to be - scrubbed. - """ - return getattr(self.MODEL_CLASS, self.SCRUB_FIELD) - - -class CirculationEventLocationScrubber(ScrubberMonitor): - """Scrub location information from old CirculationEvents.""" - - MODEL_CLASS = CirculationEvent - TIMESTAMP_FIELD = "start" - MAX_AGE = 365 - SCRUB_FIELD = "location" - - -ReaperMonitor.REGISTRY.append(CirculationEventLocationScrubber) - - -class PatronNeighborhoodScrubber(ScrubberMonitor): - """Scrub cached neighborhood information from patrons who haven't been - seen in a while. - """ - - MODEL_CLASS = Patron - TIMESTAMP_FIELD = "last_external_sync" - MAX_AGE = Patron.MAX_SYNC_TIME - SCRUB_FIELD = "cached_neighborhood" - - -ReaperMonitor.REGISTRY.append(PatronNeighborhoodScrubber) diff --git a/src/palace/manager/scripts/monitor.py b/src/palace/manager/scripts/monitor.py index 95666eec64..e04b021835 100644 --- a/src/palace/manager/scripts/monitor.py +++ b/src/palace/manager/scripts/monitor.py @@ -2,7 +2,7 @@ from sqlalchemy.orm import Session -from palace.manager.core.monitor import CollectionMonitor, ReaperMonitor +from palace.manager.core.monitor import CollectionMonitor from palace.manager.scripts.base import Script from palace.manager.scripts.input import CollectionArgumentsScript from palace.manager.sqlalchemy.session import production_session @@ -83,15 +83,6 @@ def do_run(self): ) -class RunReaperMonitorsScript(RunMultipleMonitorsScript): - """Run all the monitors found in ReaperMonitor.REGISTRY""" - - name = "Run all reaper monitors" - - def monitors(self, **kwargs): - return [cls(self._db, **kwargs) for cls in ReaperMonitor.REGISTRY] - - class RunCollectionMonitorScript(RunMultipleMonitorsScript, CollectionArgumentsScript): """Run a CollectionMonitor on every Collection that comes through a certain protocol. diff --git a/tests/manager/api/test_monitor.py b/tests/manager/api/test_monitor.py deleted file mode 100644 index 67830e7e44..0000000000 --- a/tests/manager/api/test_monitor.py +++ /dev/null @@ -1,260 +0,0 @@ -import datetime -import random - -from palace.manager.api.monitor import ( - HoldReaper, - IdlingAnnotationReaper, - LoanlikeReaperMonitor, - LoanReaper, -) -from palace.manager.api.opds_for_distributors import OPDSForDistributorsAPI -from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent -from palace.manager.sqlalchemy.model.datasource import DataSource -from palace.manager.sqlalchemy.model.patron import Annotation -from palace.manager.sqlalchemy.util import get_one_or_create -from palace.manager.util.datetime_helpers import utc_now -from tests.fixtures.database import DatabaseTransactionFixture -from tests.fixtures.services import ServicesFixture - - -class TestLoanlikeReaperMonitor: - """Tests the loan and hold reapers.""" - - def test_source_of_truth_protocols(self): - """Verify that well-known source of truth protocols - will be exempt from the reaper. - """ - assert LoanlikeReaperMonitor.SOURCE_OF_TRUTH_PROTOCOLS == [ - OPDSForDistributorsAPI.label() - ] - - def test_reaping( - self, db: DatabaseTransactionFixture, services_fixture: ServicesFixture - ): - # This patron stopped using the circulation manager a long time - # ago. - inactive_patron = db.patron() - - # This patron is still using the circulation manager. - current_patron = db.patron() - - # We're going to give these patrons some loans and holds. - edition, open_access = db.edition( - with_license_pool=True, with_open_access_download=True - ) - - not_open_access_1 = db.licensepool( - edition, open_access=False, data_source_name=DataSource.OVERDRIVE - ) - not_open_access_2 = db.licensepool( - edition, open_access=False, data_source_name=DataSource.BIBLIOTHECA - ) - not_open_access_3 = db.licensepool( - edition, open_access=False, data_source_name=DataSource.AXIS_360 - ) - not_open_access_4 = db.licensepool( - edition, open_access=False, data_source_name=DataSource.BIBBLIO - ) - unlimited_access = db.licensepool( - edition, unlimited_access=True, data_source_name=DataSource.AMAZON - ) - - # Here's a collection that is the source of truth for its - # loans and holds, rather than mirroring loan and hold information - # from some remote source. - sot_collection = db.collection( - "Source of Truth", - protocol=random.choice(LoanReaper.SOURCE_OF_TRUTH_PROTOCOLS), - ) - - edition2 = db.edition(with_license_pool=False) - - sot_lp1 = db.licensepool( - edition2, - open_access=False, - data_source_name=DataSource.OVERDRIVE, - collection=sot_collection, - ) - - sot_lp2 = db.licensepool( - edition2, - open_access=False, - data_source_name=DataSource.BIBLIOTHECA, - collection=sot_collection, - ) - - now = utc_now() - a_long_time_ago = now - datetime.timedelta(days=1000) - not_very_long_ago = now - datetime.timedelta(days=60) - even_longer = now - datetime.timedelta(days=2000) - the_future = now + datetime.timedelta(days=1) - - # This loan has expired. - not_open_access_1.loan_to( - inactive_patron, start=even_longer, end=a_long_time_ago - ) - - # This hold expired without ever becoming a loan (that we saw). - not_open_access_2.on_hold_to( - inactive_patron, start=even_longer, end=a_long_time_ago - ) - - # This hold has no end date and is older than a year. - not_open_access_3.on_hold_to( - inactive_patron, - start=a_long_time_ago, - end=None, - ) - - # This loan has no end date and is older than 90 days. - not_open_access_4.loan_to( - inactive_patron, - start=a_long_time_ago, - end=None, - ) - - # This loan has no end date, but it's for an open-access work. - open_access_loan, ignore = open_access.loan_to( - inactive_patron, - start=a_long_time_ago, - end=None, - ) - - # An unlimited loan should not get reaped regardless of age - unlimited_access_loan, ignore = unlimited_access.loan_to( - inactive_patron, start=a_long_time_ago, end=None - ) - - # This loan has not expired yet. - not_open_access_1.loan_to(current_patron, start=now, end=the_future) - - # This hold has not expired yet. - not_open_access_2.on_hold_to(current_patron, start=now, end=the_future) - - # This loan has no end date but is pretty recent. - not_open_access_3.loan_to(current_patron, start=not_very_long_ago, end=None) - - # This hold has no end date but is pretty recent. - not_open_access_4.on_hold_to(current_patron, start=not_very_long_ago, end=None) - - # Reapers will not touch loans or holds from the - # source-of-truth collection, even ones that have 'obviously' - # expired. - sot_loan, ignore = sot_lp1.loan_to( - inactive_patron, start=a_long_time_ago, end=a_long_time_ago - ) - - sot_hold, ignore = sot_lp2.on_hold_to( - inactive_patron, start=a_long_time_ago, end=a_long_time_ago - ) - - assert 5 == len(inactive_patron.loans) - assert 3 == len(inactive_patron.holds) - - assert 2 == len(current_patron.loans) - assert 2 == len(current_patron.holds) - - # Now we fire up the loan reaper. - monitor = LoanReaper(db.session) - monitor.services.analytics = services_fixture.analytics_fixture.analytics_mock - monitor.run() - - # All of the inactive patron's loans have been reaped, - # except for the loans for which the circulation manager is the - # source of truth (the SOT loan and the open-access loan), - # which will never be reaped. - # - # Holds are unaffected. - assert {open_access_loan, sot_loan, unlimited_access_loan} == set( - inactive_patron.loans - ) - assert len(inactive_patron.holds) == 3 - - # The active patron's loans and holds are unaffected, either - # because they have not expired or because they have no known - # expiration date and were created relatively recently. - assert len(current_patron.loans) == 2 - assert len(current_patron.holds) == 2 - - # Now fire up the hold reaper. - hold_monitor = HoldReaper(db.session) - hold_monitor.services.analytics = ( - services_fixture.analytics_fixture.analytics_mock - ) - hold_monitor.run() - - # All of the inactive patron's holds have been reaped, - # except for the one from the source-of-truth collection. - # The active patron is unaffected. - assert [sot_hold] == inactive_patron.holds - assert 2 == len(current_patron.holds) - - # verify expected circ event count for hold reaper run - call_args_list = ( - services_fixture.analytics_fixture.analytics_mock.collect_event.call_args_list - ) - assert len(call_args_list) == 2 - event_types = [call_args.kwargs["event_type"] for call_args in call_args_list] - assert event_types == [ - CirculationEvent.CM_HOLD_EXPIRED, - CirculationEvent.CM_HOLD_EXPIRED, - ] - - -class TestIdlingAnnotationReaper: - def test_where_clause(self, db: DatabaseTransactionFixture): - # Two books. - ignore, lp1 = db.edition(with_license_pool=True) - ignore, lp2 = db.edition(with_license_pool=True) - - # Two patrons who sync their annotations. - p1 = db.patron() - p2 = db.patron() - for p in [p1, p2]: - p.synchronize_annotations = True - now = utc_now() - not_that_old = now - datetime.timedelta(days=59) - very_old = now - datetime.timedelta(days=61) - - def _annotation( - patron, pool, content, motivation=Annotation.IDLING, timestamp=very_old - ): - annotation, _ = get_one_or_create( - db.session, - Annotation, - patron=patron, - identifier=pool.identifier, - motivation=motivation, - ) - annotation.timestamp = timestamp - annotation.content = content - return annotation - - # The first patron will not be affected by the - # reaper. Although their annotations are very old, they have - # an active loan for one book and a hold on the other. - loan = lp1.loan_to(p1) - old_loan = _annotation(p1, lp1, "old loan") - - hold = lp2.on_hold_to(p1) - old_hold = _annotation(p1, lp2, "old hold") - - # The second patron has a very old annotation for the first - # book. This is the only annotation that will be affected by - # the reaper. - reapable = _annotation(p2, lp1, "abandoned") - - # The second patron also has a very old non-idling annotation - # for the first book, which will not be reaped because only - # idling annotations are reaped. - not_idling = _annotation( - p2, lp1, "not idling", motivation="some other motivation" - ) - - # The second patron has a non-old idling annotation for the - # second book, which will not be reaped (even though there is - # no active loan or hold) because it's not old enough. - new_idling = _annotation(p2, lp2, "recent", timestamp=not_that_old) - reaper = IdlingAnnotationReaper(db.session) - qu = db.session.query(Annotation).filter(reaper.where_clause) - assert [reapable] == qu.all() diff --git a/tests/manager/core/test_monitor.py b/tests/manager/core/test_monitor.py index a2a73968bb..45ad76b4d0 100644 --- a/tests/manager/core/test_monitor.py +++ b/tests/manager/core/test_monitor.py @@ -9,44 +9,31 @@ from palace.manager.api.overdrive import OverdriveAPI from palace.manager.core.metadata_layer import TimestampData from palace.manager.core.monitor import ( - CirculationEventLocationScrubber, CollectionMonitor, - CollectionReaper, CoverageProvidersFailed, - CredentialReaper, CustomListEntrySweepMonitor, CustomListEntryWorkUpdateMonitor, EditionSweepMonitor, IdentifierSweepMonitor, MakePresentationReadyMonitor, - MeasurementReaper, Monitor, NotPresentationReadyWorkSweepMonitor, - PatronNeighborhoodScrubber, - PatronRecordReaper, PermanentWorkIDRefreshMonitor, PresentationReadyWorkSweepMonitor, - ReaperMonitor, SubjectSweepMonitor, SweepMonitor, TimelineMonitor, - WorkReaper, WorkSweepMonitor, ) from palace.manager.core.opds_import import OPDSAPI from palace.manager.service import container -from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent -from palace.manager.sqlalchemy.model.classification import Genre, Subject -from palace.manager.sqlalchemy.model.collection import Collection, CollectionMissing -from palace.manager.sqlalchemy.model.coverage import Timestamp, WorkCoverageRecord -from palace.manager.sqlalchemy.model.credential import Credential +from palace.manager.sqlalchemy.model.classification import Subject +from palace.manager.sqlalchemy.model.collection import CollectionMissing +from palace.manager.sqlalchemy.model.coverage import Timestamp from palace.manager.sqlalchemy.model.datasource import DataSource -from palace.manager.sqlalchemy.model.edition import Edition from palace.manager.sqlalchemy.model.identifier import Identifier -from palace.manager.sqlalchemy.model.measurement import Measurement -from palace.manager.sqlalchemy.model.patron import Patron from palace.manager.sqlalchemy.model.work import Work -from palace.manager.sqlalchemy.util import create, get_one, get_one_or_create +from palace.manager.sqlalchemy.util import get_one from palace.manager.util.datetime_helpers import datetime_utc, utc_now from tests.fixtures.database import DatabaseTransactionFixture from tests.fixtures.time import Time @@ -1031,298 +1018,3 @@ def test_set_item(self, db: DatabaseTransactionFixture): monitor = CustomListEntryWorkUpdateMonitor(db.session) monitor.process_item(entry) assert old_work == entry.work - - -class MockReaperMonitor(ReaperMonitor): - MODEL_CLASS = Timestamp - TIMESTAMP_FIELD = "timestamp" - - -class TestReaperMonitor: - def test_cutoff(self, db: DatabaseTransactionFixture): - """Test that cutoff behaves correctly when given different values for - ReaperMonitor.MAX_AGE. - """ - m = MockReaperMonitor(db.session) - - # A number here means a number of days. - for value in [1, 1.5, -1]: - m.MAX_AGE = value - expect = utc_now() - datetime.timedelta(days=value) - Time.time_eq(m.cutoff, expect) - - # But you can pass in a timedelta instead. - m.MAX_AGE = datetime.timedelta(seconds=99) - Time.time_eq(m.cutoff, utc_now() - m.MAX_AGE) - - def test_specific_reapers(self, db: DatabaseTransactionFixture): - assert Credential.expires == CredentialReaper(db.session).timestamp_field - assert 1 == CredentialReaper.MAX_AGE - assert ( - Patron.authorization_expires - == PatronRecordReaper(db.session).timestamp_field - ) - assert 60 == PatronRecordReaper.MAX_AGE - - def test_run_once(self, db: DatabaseTransactionFixture): - # Create four Credentials: two expired, two valid. - expired1 = db.credential() - expired2 = db.credential() - now = utc_now() - expiration_date = now - datetime.timedelta(days=CredentialReaper.MAX_AGE + 1) - for e in [expired1, expired2]: - e.expires = expiration_date - - active = db.credential() - active.expires = now - datetime.timedelta(days=CredentialReaper.MAX_AGE - 1) - - eternal = db.credential() - - m = CredentialReaper(db.session) - - # Set the batch size to 1 to make sure this works even - # when there are multiple batches. - m.BATCH_SIZE = 1 - - assert "Reaper for Credential.expires" == m.SERVICE_NAME - result = m.run_once() - assert "Items deleted: 2" == result.achievements - - # The expired credentials have been reaped; the others - # are still in the database. - remaining = set(db.session.query(Credential).all()) - assert {active, eternal} == remaining - - def test_reap_patrons(self, db: DatabaseTransactionFixture): - m = PatronRecordReaper(db.session) - expired = db.patron() - credential = db.credential(patron=expired) - now = utc_now() - expired.authorization_expires = now - datetime.timedelta( - days=PatronRecordReaper.MAX_AGE + 1 - ) - active = db.patron() - active.authorization_expires = now - datetime.timedelta( - days=PatronRecordReaper.MAX_AGE - 1 - ) - result = m.run_once() - assert "Items deleted: 1" == result.achievements - remaining = db.session.query(Patron).all() - assert [active] == remaining - - assert [] == db.session.query(Credential).all() - - -class TestWorkReaper: - def test_end_to_end(self, db: DatabaseTransactionFixture): - # Search mock - class MockSearchIndex: - removed = [] - - def remove_work(self, work): - self.removed.append(work) - - # First, create three works. - - # This work has a license pool. - has_license_pool = db.work(with_license_pool=True) - - # This work had a license pool and then lost it. - had_license_pool = db.work(with_license_pool=True) - db.session.delete(had_license_pool.license_pools[0]) - - # This work never had a license pool. - never_had_license_pool = db.work(with_license_pool=False) - - # Each work has a presentation edition -- keep track of these - # for later. - works = db.session.query(Work) - presentation_editions = [x.presentation_edition for x in works] - - # If and when Work gets database-level cascading deletes, this - # is where they will all be triggered, with no chance that an - # ORM-level delete is doing the work. So let's verify that all - # of the cascades work. - - # First, set up some related items for each Work. - - # Each work is assigned to a genre. - genre, ignore = Genre.lookup(db.session, "Science Fiction") - for work in works: - work.genres = [genre] - - # Each work is on the same CustomList. - l, ignore = db.customlist("a list", num_entries=0) - for work in works: - l.add_entry(work) - - # Each work has a WorkCoverageRecord. - for work in works: - WorkCoverageRecord.add_for(work, operation="some operation") - - # Run the reaper. - s = MockSearchIndex() - m = WorkReaper(db.session, search_index_client=s) - m.run_once() - - # Search index was updated - assert 2 == len(s.removed) - assert has_license_pool not in s.removed - assert had_license_pool in s.removed - assert never_had_license_pool in s.removed - - # Only the work with a license pool remains. - assert [has_license_pool] == [x for x in works] - - # The presentation editions are still around, since they might - # theoretically be used by other parts of the system. - all_editions = db.session.query(Edition).all() - for e in presentation_editions: - assert e in all_editions - - # The surviving work is still assigned to the Genre, and still - # has WorkCoverageRecords. - assert [has_license_pool] == genre.works - surviving_records = db.session.query(WorkCoverageRecord) - assert surviving_records.count() > 0 - assert all(x.work == has_license_pool for x in surviving_records) - - # The CustomListEntries still exist, but two of them have lost - # their work. - assert 2 == len([x for x in l.entries if not x.work]) - assert [has_license_pool] == [x.work for x in l.entries if x.work] - - -class TestCollectionReaper: - def test_query(self, db: DatabaseTransactionFixture): - # This reaper is looking for collections that are marked for - # deletion. - collection = db.default_collection() - reaper = CollectionReaper(db.session) - assert [] == reaper.query().all() - - collection.marked_for_deletion = True - assert [collection] == reaper.query().all() - - def test_reaper_delete_calls_collection_delete( - self, db: DatabaseTransactionFixture - ): - # Unlike most ReaperMonitors, CollectionReaper.delete() - # is overridden to call delete() on the object it was passed, - # rather than just doing a database delete. - class MockCollection: - def delete(self): - self.was_called = True - - collection = MockCollection() - reaper = CollectionReaper(db.session) - reaper.delete(collection) - assert True == collection.was_called - - @pytest.mark.parametrize( - "is_inactive", - ( - pytest.param(True, id="inactive"), - pytest.param(False, id="active"), - ), - ) - def test_run_once(self, db: DatabaseTransactionFixture, is_inactive: bool): - # End-to-end test - c1 = db.collection() - c2 = db.collection(inactive=is_inactive) - c2.marked_for_deletion = True - reaper = CollectionReaper(db.session) - result = reaper.run_once() - - # The Collection marked for deletion has been deleted; the other - # one is unaffected. - assert [c1] == db.session.query(Collection).all() - assert "Items deleted: 1" == result.achievements - - -class TestMeasurementReaper: - def test_query(self, db: DatabaseTransactionFixture): - # This reaper is looking for measurements that are not current. - measurement, created = get_one_or_create( - db.session, Measurement, is_most_recent=True - ) - reaper = MeasurementReaper(db.session) - assert [] == reaper.query().all() - measurement.is_most_recent = False - assert [measurement] == reaper.query().all() - - def test_run_once(self, db: DatabaseTransactionFixture): - # End-to-end test - measurement1, created = get_one_or_create( - db.session, - Measurement, - quantity_measured="answer", - value=12, - is_most_recent=True, - ) - measurement2, created = get_one_or_create( - db.session, - Measurement, - quantity_measured="answer", - value=42, - is_most_recent=False, - ) - reaper = MeasurementReaper(db.session) - result = reaper.run_once() - assert [measurement1] == db.session.query(Measurement).all() - assert "Items deleted: 1" == result.achievements - - -class TestScrubberMonitor: - def test_run_once(self, db: DatabaseTransactionFixture): - # ScrubberMonitor is basically an abstract class, with - # subclasses doing nothing but define missing constants. This - # is an end-to-end test using a specific subclass, - # CirculationEventLocationScrubber. - - m = CirculationEventLocationScrubber(db.session) - assert "Scrubber for CirculationEvent.location" == m.SERVICE_NAME - - # CirculationEvents are only scrubbed if they have a location - # *and* are older than MAX_AGE. - now = utc_now() - not_long_ago = m.cutoff + datetime.timedelta(days=1) - long_ago = m.cutoff - datetime.timedelta(days=1) - - new, ignore = create(db.session, CirculationEvent, start=now, location="loc") - recent, ignore = create( - db.session, CirculationEvent, start=not_long_ago, location="loc" - ) - old, ignore = create( - db.session, CirculationEvent, start=long_ago, location="loc" - ) - already_scrubbed, ignore = create( - db.session, CirculationEvent, start=long_ago, location=None - ) - - # Only the old unscrubbed CirculationEvent is eligible - # to be scrubbed. - assert [old] == m.query().all() - - # Other reapers say items were 'deleted'; we say they were - # 'scrubbed'. - timestamp = m.run_once() - assert "Items scrubbed: 1" == timestamp.achievements - - # Only the old unscrubbed CirculationEvent has been scrubbed. - assert None == old.location - for untouched in (new, recent): - assert "loc" == untouched.location - - def test_specific_scrubbers(self, db: DatabaseTransactionFixture): - # Check that all specific ScrubberMonitors are set up - # correctly. - circ = CirculationEventLocationScrubber(db.session) - assert CirculationEvent.start == circ.timestamp_field - assert CirculationEvent.location == circ.scrub_field - assert 365 == circ.MAX_AGE - - patron = PatronNeighborhoodScrubber(db.session) - assert Patron.last_external_sync == patron.timestamp_field - assert Patron.cached_neighborhood == patron.scrub_field - assert Patron.MAX_SYNC_TIME == patron.MAX_AGE diff --git a/tests/manager/scripts/test_monitor.py b/tests/manager/scripts/test_monitor.py index 5ecf7ea93d..3da39222ec 100644 --- a/tests/manager/scripts/test_monitor.py +++ b/tests/manager/scripts/test_monitor.py @@ -3,13 +3,12 @@ import pytest from palace.manager.api.bibliotheca import BibliothecaAPI -from palace.manager.core.monitor import CollectionMonitor, Monitor, ReaperMonitor +from palace.manager.core.monitor import CollectionMonitor, Monitor from palace.manager.core.opds_import import OPDSAPI from palace.manager.scripts.monitor import ( RunCollectionMonitorScript, RunMonitorScript, RunMultipleMonitorsScript, - RunReaperMonitorsScript, ) from tests.fixtures.database import DatabaseTransactionFixture @@ -82,19 +81,6 @@ def monitors(self, **kwargs): assert None == getattr(m1, "exception", None) -class TestRunReaperMonitorsScript: - def test_monitors(self, db: DatabaseTransactionFixture): - """This script instantiates a Monitor for every class in - ReaperMonitor.REGISTRY. - """ - old_registry = ReaperMonitor.REGISTRY - ReaperMonitor.REGISTRY = [SuccessMonitor] - script = RunReaperMonitorsScript(db.session) - [monitor] = script.monitors() - assert isinstance(monitor, SuccessMonitor) - ReaperMonitor.REGISTRY = old_registry - - class TestCollectionMonitorWithDifferentRunners: """CollectionMonitors are usually run by a RunCollectionMonitorScript. It's not ideal, but you can also run a CollectionMonitor script from a From 87aa4565b42ee35e14c10a9bca54f7783954b7b2 Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Thu, 6 Mar 2025 17:19:14 -0400 Subject: [PATCH 05/11] Fix mypy issue --- src/palace/manager/celery/tasks/reaper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/palace/manager/celery/tasks/reaper.py b/src/palace/manager/celery/tasks/reaper.py index bdec4513de..6e7fc0b2a7 100644 --- a/src/palace/manager/celery/tasks/reaper.py +++ b/src/palace/manager/celery/tasks/reaper.py @@ -131,7 +131,7 @@ def collection_reaper(task: Task) -> None: ) ).scalar() - if collections_awaiting_delete > 0: + if collections_awaiting_delete and collections_awaiting_delete > 0: task.log.info( f"{collections_awaiting_delete} {_pluralize(collections_awaiting_delete, 'collection')}" f" waiting for delete. Re-queueing the reaper." From 5bedb9224535cb59509d8da9cc6e3bcfff671afe Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Thu, 6 Mar 2025 20:39:29 -0400 Subject: [PATCH 06/11] Add database migration --- ...df6012a5e6_update_patron_id_foreign_key.py | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 alembic/versions/20250307_61df6012a5e6_update_patron_id_foreign_key.py diff --git a/alembic/versions/20250307_61df6012a5e6_update_patron_id_foreign_key.py b/alembic/versions/20250307_61df6012a5e6_update_patron_id_foreign_key.py new file mode 100644 index 0000000000..1507c69b3c --- /dev/null +++ b/alembic/versions/20250307_61df6012a5e6_update_patron_id_foreign_key.py @@ -0,0 +1,53 @@ +"""Update patron_id foreign key + +Revision ID: 61df6012a5e6 +Revises: 63825d889633 +Create Date: 2025-03-07 00:38:25.610733+00:00 + +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "61df6012a5e6" +down_revision = "63825d889633" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.drop_constraint("annotations_patron_id_fkey", "annotations", type_="foreignkey") + op.create_foreign_key( + None, "annotations", "patrons", ["patron_id"], ["id"], ondelete="CASCADE" + ) + op.drop_constraint("credentials_patron_id_fkey", "credentials", type_="foreignkey") + op.create_foreign_key( + None, "credentials", "patrons", ["patron_id"], ["id"], ondelete="CASCADE" + ) + op.drop_constraint("holds_patron_id_fkey", "holds", type_="foreignkey") + op.create_foreign_key( + None, "holds", "patrons", ["patron_id"], ["id"], ondelete="CASCADE" + ) + op.drop_constraint("loans_patron_id_fkey", "loans", type_="foreignkey") + op.create_foreign_key( + None, "loans", "patrons", ["patron_id"], ["id"], ondelete="CASCADE" + ) + + +def downgrade() -> None: + op.drop_constraint(None, "loans", type_="foreignkey") + op.create_foreign_key( + "loans_patron_id_fkey", "loans", "patrons", ["patron_id"], ["id"] + ) + op.drop_constraint(None, "holds", type_="foreignkey") + op.create_foreign_key( + "holds_patron_id_fkey", "holds", "patrons", ["patron_id"], ["id"] + ) + op.drop_constraint(None, "credentials", type_="foreignkey") + op.create_foreign_key( + "credentials_patron_id_fkey", "credentials", "patrons", ["patron_id"], ["id"] + ) + op.drop_constraint(None, "annotations", type_="foreignkey") + op.create_foreign_key( + "annotations_patron_id_fkey", "annotations", "patrons", ["patron_id"], ["id"] + ) From 24851d5a4113a1a397133ec67289c04c6831c994 Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Thu, 6 Mar 2025 20:42:51 -0400 Subject: [PATCH 07/11] Update test --- tests/manager/celery/tasks/test_reaper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/manager/celery/tasks/test_reaper.py b/tests/manager/celery/tasks/test_reaper.py index f4f53e28d5..da089ff8b8 100644 --- a/tests/manager/celery/tasks/test_reaper.py +++ b/tests/manager/celery/tasks/test_reaper.py @@ -343,12 +343,12 @@ def _annotation( assert "Deleted 1 outdated idling annotation." in caplog.messages # The annotation that should have been reaped is gone - assert db.session.query(Annotation).all() == [ + assert set(db.session.query(Annotation).all()) == { old_loan, old_hold, not_idling, new_idling, - ] + } def test_hold_reaper( From cba25172b665f0a745e216df893ae4a0e5259454 Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Thu, 6 Mar 2025 20:47:09 -0400 Subject: [PATCH 08/11] Add one more test --- tests/manager/celery/tasks/test_reaper.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/manager/celery/tasks/test_reaper.py b/tests/manager/celery/tasks/test_reaper.py index da089ff8b8..8f1b18968b 100644 --- a/tests/manager/celery/tasks/test_reaper.py +++ b/tests/manager/celery/tasks/test_reaper.py @@ -244,6 +244,20 @@ def test_reaper_delete_calls_collection_delete( # Make sure we called the delete method on the collection mock_delete.assert_called_once() + def test_reaper_no_collections( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + ): + # Some collections that don't need to be deleted + collections = {db.collection() for idx in range(3)} + + # Run reaper + collection_reaper.delay().wait() + + # Make sure no collections were deleted + assert set(db.session.query(Collection).all()) == collections + def test_measurement_reaper( db: DatabaseTransactionFixture, From a288bb09ce1bbb2548cd3dd05727d7775c0f5aef Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Thu, 6 Mar 2025 21:01:18 -0400 Subject: [PATCH 09/11] Move function, add test, add doc string --- src/palace/manager/celery/tasks/reaper.py | 31 +++++++---------------- src/palace/manager/util/log.py | 9 +++++++ tests/manager/util/test_log.py | 11 +++++++- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/palace/manager/celery/tasks/reaper.py b/src/palace/manager/celery/tasks/reaper.py index 6e7fc0b2a7..2b2296984b 100644 --- a/src/palace/manager/celery/tasks/reaper.py +++ b/src/palace/manager/celery/tasks/reaper.py @@ -17,6 +17,7 @@ from palace.manager.sqlalchemy.model.patron import Annotation, Hold, Loan, Patron from palace.manager.sqlalchemy.model.work import Work from palace.manager.util.datetime_helpers import utc_now +from palace.manager.util.log import pluralize def _execute_delete(session: Session, deletion_query: Delete) -> int: @@ -36,12 +37,6 @@ def _execute_delete(session: Session, deletion_query: Delete) -> int: return result.rowcount # type: ignore[attr-defined,no-any-return] -def _pluralize(count: int, singular: str, plural: str | None = None) -> str: - if plural is None: - plural = singular + "s" - return singular if count == 1 else plural - - @shared_task(queue=QueueNames.default, bind=True) def credential_reaper(task: Task) -> None: """ @@ -51,9 +46,7 @@ def credential_reaper(task: Task) -> None: deletion_query = delete(Credential).where(Credential.expires < cutoff) with task.transaction() as session: rows_removed = _execute_delete(session, deletion_query) - task.log.info( - f"Deleted {rows_removed} expired {_pluralize(rows_removed, 'credential')}." - ) + task.log.info(f"Deleted {pluralize(rows_removed, 'expired credential')}.") @shared_task(queue=QueueNames.default, bind=True) @@ -65,9 +58,7 @@ def patron_reaper(task: Task) -> None: deletion_query = delete(Patron).where(Patron.authorization_expires < cutoff) with task.transaction() as session: rows_removed = _execute_delete(session, deletion_query) - task.log.info( - f"Deleted {rows_removed} expired patron {_pluralize(rows_removed, 'record')}." - ) + task.log.info(f"Deleted {pluralize(rows_removed, 'expired patron record')}.") @shared_task(queue=QueueNames.default, bind=True) @@ -99,7 +90,7 @@ def work_reaper(task: Task, batch_size: int = 1000) -> None: removed = len(works) task.log.info( - f"Deleted {removed} {_pluralize(removed, 'Work')} with no associated LicensePools." + f"Deleted {pluralize(removed, 'Work')} with no associated LicensePools." ) if len(works) == batch_size: task.log.info("There may be more Works to delete. Re-queueing the reaper.") @@ -133,7 +124,7 @@ def collection_reaper(task: Task) -> None: if collections_awaiting_delete and collections_awaiting_delete > 0: task.log.info( - f"{collections_awaiting_delete} {_pluralize(collections_awaiting_delete, 'collection')}" + f"{pluralize(collections_awaiting_delete, 'collection')}" f" waiting for delete. Re-queueing the reaper." ) raise task.replace(collection_reaper.s()) @@ -147,9 +138,7 @@ def measurement_reaper(task: Task) -> None: deletion_query = delete(Measurement).where(Measurement.is_most_recent == False) with task.transaction() as session: rows_removed = _execute_delete(session, deletion_query) - task.log.info( - f"Deleted {rows_removed} outdated {_pluralize(rows_removed, 'measurement')}." - ) + task.log.info(f"Deleted {pluralize(rows_removed, 'outdated measurement')}.") @shared_task(queue=QueueNames.default, bind=True) @@ -185,9 +174,7 @@ def annotation_reaper(task: Task) -> None: with task.transaction() as session: rows_removed = _execute_delete(session, deletion_query) - task.log.info( - f"Deleted {rows_removed} outdated idling {_pluralize(rows_removed, 'annotation')}." - ) + task.log.info(f"Deleted {pluralize(rows_removed, 'outdated idling annotation')}.") @shared_task(queue=QueueNames.default, bind=True) @@ -218,7 +205,7 @@ def hold_reaper(task: Task, batch_size: int = 100) -> None: session.delete(hold) count = len(holds) - task.log.info(f"Deleted {count} expired {_pluralize(count, 'hold')}.") + task.log.info(f"Deleted {pluralize(count, 'expired hold')}.") with task.transaction() as session: for event in events_to_be_logged: @@ -249,4 +236,4 @@ def loan_reaper(task: Task) -> None: with task.transaction() as session: rows_removed = _execute_delete(session, deletion_query) - task.log.info(f"Deleted {rows_removed} expired {_pluralize(rows_removed, 'loan')}.") + task.log.info(f"Deleted {pluralize(rows_removed, 'expired loan')}.") diff --git a/src/palace/manager/util/log.py b/src/palace/manager/util/log.py index 6aea4aa212..e3f7aab95f 100644 --- a/src/palace/manager/util/log.py +++ b/src/palace/manager/util/log.py @@ -113,3 +113,12 @@ def log(self) -> logging.Logger: so it is easier to access the logger from an instance. """ return self.logger() + + +def pluralize(count: int, singular: str, plural: str | None = None) -> str: + """ + Return a string that pluralizes the given word based on the count. + """ + if plural is None: + plural = singular + "s" + return f"{count} {singular if count == 1 else plural}" diff --git a/tests/manager/util/test_log.py b/tests/manager/util/test_log.py index 56c1df1905..06be2362be 100644 --- a/tests/manager/util/test_log.py +++ b/tests/manager/util/test_log.py @@ -2,7 +2,7 @@ from pytest import LogCaptureFixture from palace.manager.service.logging.configuration import LogLevel -from palace.manager.util.log import LoggerMixin, log_elapsed_time +from palace.manager.util.log import LoggerMixin, log_elapsed_time, pluralize class MockClass(LoggerMixin): @@ -51,3 +51,12 @@ def test_log_elapsed_time_invalid(caplog: LogCaptureFixture): with pytest.raises(RuntimeError): log_elapsed_time(log_level=LogLevel.info, message_prefix="Test")(lambda: None)() assert len(caplog.records) == 0 + + +def test_pluralize(): + assert pluralize(1, "dingo") == "1 dingo" + assert pluralize(2, "dingo") == "2 dingos" + assert pluralize(0, "dingo") == "0 dingos" + + assert pluralize(1, "foo", "bar") == "1 foo" + assert pluralize(2, "foo", "bar") == "2 bar" From 94a9aa319e4e5479c2fcae40852c7e2a17873d2b Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Thu, 6 Mar 2025 21:07:08 -0400 Subject: [PATCH 10/11] Update alembic migration --- ...df6012a5e6_update_patron_id_foreign_key.py | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/alembic/versions/20250307_61df6012a5e6_update_patron_id_foreign_key.py b/alembic/versions/20250307_61df6012a5e6_update_patron_id_foreign_key.py index 1507c69b3c..bee9e18fe1 100644 --- a/alembic/versions/20250307_61df6012a5e6_update_patron_id_foreign_key.py +++ b/alembic/versions/20250307_61df6012a5e6_update_patron_id_foreign_key.py @@ -18,36 +18,56 @@ def upgrade() -> None: op.drop_constraint("annotations_patron_id_fkey", "annotations", type_="foreignkey") op.create_foreign_key( - None, "annotations", "patrons", ["patron_id"], ["id"], ondelete="CASCADE" + "annotations_patron_id_fkey", + "annotations", + "patrons", + ["patron_id"], + ["id"], + ondelete="CASCADE", ) op.drop_constraint("credentials_patron_id_fkey", "credentials", type_="foreignkey") op.create_foreign_key( - None, "credentials", "patrons", ["patron_id"], ["id"], ondelete="CASCADE" + "credentials_patron_id_fkey", + "credentials", + "patrons", + ["patron_id"], + ["id"], + ondelete="CASCADE", ) op.drop_constraint("holds_patron_id_fkey", "holds", type_="foreignkey") op.create_foreign_key( - None, "holds", "patrons", ["patron_id"], ["id"], ondelete="CASCADE" + "holds_patron_id_fkey", + "holds", + "patrons", + ["patron_id"], + ["id"], + ondelete="CASCADE", ) op.drop_constraint("loans_patron_id_fkey", "loans", type_="foreignkey") op.create_foreign_key( - None, "loans", "patrons", ["patron_id"], ["id"], ondelete="CASCADE" + "loans_patron_id_fkey", + "loans", + "patrons", + ["patron_id"], + ["id"], + ondelete="CASCADE", ) def downgrade() -> None: - op.drop_constraint(None, "loans", type_="foreignkey") + op.drop_constraint("loans_patron_id_fkey", "loans", type_="foreignkey") op.create_foreign_key( "loans_patron_id_fkey", "loans", "patrons", ["patron_id"], ["id"] ) - op.drop_constraint(None, "holds", type_="foreignkey") + op.drop_constraint("holds_patron_id_fkey", "holds", type_="foreignkey") op.create_foreign_key( "holds_patron_id_fkey", "holds", "patrons", ["patron_id"], ["id"] ) - op.drop_constraint(None, "credentials", type_="foreignkey") + op.drop_constraint("credentials_patron_id_fkey", "credentials", type_="foreignkey") op.create_foreign_key( "credentials_patron_id_fkey", "credentials", "patrons", ["patron_id"], ["id"] ) - op.drop_constraint(None, "annotations", type_="foreignkey") + op.drop_constraint("annotations_patron_id_fkey", "annotations", type_="foreignkey") op.create_foreign_key( "annotations_patron_id_fkey", "annotations", "patrons", ["patron_id"], ["id"] ) From 602a726b2a78af8445682fbd85350c60f57382ac Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Thu, 6 Mar 2025 21:16:53 -0400 Subject: [PATCH 11/11] Clean up formatting a bit --- ...df6012a5e6_update_patron_id_foreign_key.py | 72 +++++++++++++++---- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/alembic/versions/20250307_61df6012a5e6_update_patron_id_foreign_key.py b/alembic/versions/20250307_61df6012a5e6_update_patron_id_foreign_key.py index bee9e18fe1..186209b263 100644 --- a/alembic/versions/20250307_61df6012a5e6_update_patron_id_foreign_key.py +++ b/alembic/versions/20250307_61df6012a5e6_update_patron_id_foreign_key.py @@ -16,7 +16,11 @@ def upgrade() -> None: - op.drop_constraint("annotations_patron_id_fkey", "annotations", type_="foreignkey") + op.drop_constraint( + "annotations_patron_id_fkey", + "annotations", + type_="foreignkey", + ) op.create_foreign_key( "annotations_patron_id_fkey", "annotations", @@ -25,7 +29,11 @@ def upgrade() -> None: ["id"], ondelete="CASCADE", ) - op.drop_constraint("credentials_patron_id_fkey", "credentials", type_="foreignkey") + op.drop_constraint( + "credentials_patron_id_fkey", + "credentials", + type_="foreignkey", + ) op.create_foreign_key( "credentials_patron_id_fkey", "credentials", @@ -34,7 +42,11 @@ def upgrade() -> None: ["id"], ondelete="CASCADE", ) - op.drop_constraint("holds_patron_id_fkey", "holds", type_="foreignkey") + op.drop_constraint( + "holds_patron_id_fkey", + "holds", + type_="foreignkey", + ) op.create_foreign_key( "holds_patron_id_fkey", "holds", @@ -43,7 +55,11 @@ def upgrade() -> None: ["id"], ondelete="CASCADE", ) - op.drop_constraint("loans_patron_id_fkey", "loans", type_="foreignkey") + op.drop_constraint( + "loans_patron_id_fkey", + "loans", + type_="foreignkey", + ) op.create_foreign_key( "loans_patron_id_fkey", "loans", @@ -55,19 +71,51 @@ def upgrade() -> None: def downgrade() -> None: - op.drop_constraint("loans_patron_id_fkey", "loans", type_="foreignkey") + op.drop_constraint( + "loans_patron_id_fkey", + "loans", + type_="foreignkey", + ) op.create_foreign_key( - "loans_patron_id_fkey", "loans", "patrons", ["patron_id"], ["id"] + "loans_patron_id_fkey", + "loans", + "patrons", + ["patron_id"], + ["id"], + ) + op.drop_constraint( + "holds_patron_id_fkey", + "holds", + type_="foreignkey", ) - op.drop_constraint("holds_patron_id_fkey", "holds", type_="foreignkey") op.create_foreign_key( - "holds_patron_id_fkey", "holds", "patrons", ["patron_id"], ["id"] + "holds_patron_id_fkey", + "holds", + "patrons", + ["patron_id"], + ["id"], + ) + op.drop_constraint( + "credentials_patron_id_fkey", + "credentials", + type_="foreignkey", ) - op.drop_constraint("credentials_patron_id_fkey", "credentials", type_="foreignkey") op.create_foreign_key( - "credentials_patron_id_fkey", "credentials", "patrons", ["patron_id"], ["id"] + "credentials_patron_id_fkey", + "credentials", + "patrons", + ["patron_id"], + ["id"], + ) + op.drop_constraint( + "annotations_patron_id_fkey", + "annotations", + type_="foreignkey", ) - op.drop_constraint("annotations_patron_id_fkey", "annotations", type_="foreignkey") op.create_foreign_key( - "annotations_patron_id_fkey", "annotations", "patrons", ["patron_id"], ["id"] + "annotations_patron_id_fkey", + "annotations", + "patrons", + ["patron_id"], + ["id"], )