Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upgrade sqlalchemy to version 2.x #48

Merged
merged 1 commit into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/lint-and-tests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Execute lint and tests

on:
workflow_call:
push:
branches:
- "**"
Expand All @@ -14,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:14-alpine
image: postgres:15-alpine
env:
POSTGRES_DB: fastqueue-test
POSTGRES_USER: fastqueue
Expand Down
39 changes: 1 addition & 38 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,44 +10,7 @@ env:

jobs:
build:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:14-alpine
env:
POSTGRES_DB: fastqueue-test
POSTGRES_USER: fastqueue
POSTGRES_PASSWORD: fastqueue
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Install system dependencies
run: sudo apt update && sudo apt install --no-install-recommends -y make git
- uses: actions/checkout@v3
- uses: actions/cache@v3
with:
path: ~/.cache
key: self-runner-${{ runner.os }}-python-3.11-poetry-${{ hashFiles('poetry.lock') }}-precommit-${{ hashFiles('.pre-commit-config.yaml') }}
- name: Set up Python 3.11
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
cp env.sample .env
python -m pip install --upgrade pip
pip install poetry
poetry config virtualenvs.create false
poetry install
- name: pre-commit lint
run: make lint
- name: pytest
run: make test
uses: ./.github/workflows/lint-and-tests.yml
release-please:
needs: build
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ run-db:
-e POSTGRES_PASSWORD=fastqueue \
-e POSTGRES_DB=fastqueue \
-p 5432:5432 \
-d postgres:14-alpine
-d postgres:15-alpine

rm-db:
docker kill $$(docker ps -aqf name=postgres-fastqueue)
Expand All @@ -24,7 +24,7 @@ run-test-db:
-e POSTGRES_PASSWORD=fastqueue \
-e POSTGRES_DB=fastqueue-test \
-p 5432:5432 \
-d postgres:14-alpine
-d postgres:15-alpine

rm-test-db:
docker kill $$(docker ps -aqf name=postgres-fastqueue-test)
Expand Down
36 changes: 36 additions & 0 deletions alembic/versions/003_use_on_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Auto generated

Revision ID: 53a93f17a53d
Revises: 4c4cb56442a9
Create Date: 2023-04-25 18:46:59.742398

"""
from alembic import op

# revision identifiers, used by Alembic.
revision = "53a93f17a53d"
down_revision = "4c4cb56442a9"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint("messages_queue_id_fkey", "messages", type_="foreignkey")
op.create_foreign_key(None, "messages", "queues", ["queue_id"], ["id"], ondelete="CASCADE")
op.drop_constraint("queues_topic_id_fkey", "queues", type_="foreignkey")
op.drop_constraint("queues_dead_queue_id_fkey", "queues", type_="foreignkey")
op.create_foreign_key(None, "queues", "topics", ["topic_id"], ["id"], ondelete="SET NULL")
op.create_foreign_key(None, "queues", "queues", ["dead_queue_id"], ["id"], ondelete="SET NULL")
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(None, "queues", type_="foreignkey")
op.drop_constraint(None, "queues", type_="foreignkey")
op.create_foreign_key("queues_dead_queue_id_fkey", "queues", "queues", ["dead_queue_id"], ["id"])
op.create_foreign_key("queues_topic_id_fkey", "queues", "topics", ["topic_id"], ["id"])
op.drop_constraint(None, "messages", type_="foreignkey")
op.create_foreign_key("messages_queue_id_fkey", "messages", "queues", ["queue_id"], ["id"])
# ### end Alembic commands ###
15 changes: 9 additions & 6 deletions fastqueue/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@ class Topic(Base):
created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False)

def __repr__(self):
return f"User(id={self.id!r})"
return f"User(id={self.id})"


class Queue(Base):
__tablename__ = "queues"

id = sqlalchemy.Column(sqlalchemy.String(length=128), primary_key=True, nullable=False)
topic_id = sqlalchemy.Column(
sqlalchemy.String(length=128), sqlalchemy.ForeignKey("topics.id"), index=True, nullable=True
sqlalchemy.String(length=128),
sqlalchemy.ForeignKey("topics.id", ondelete="SET NULL"),
index=True,
nullable=True,
)
dead_queue_id = sqlalchemy.Column(
sqlalchemy.String(length=128), sqlalchemy.ForeignKey("queues.id"), nullable=True
sqlalchemy.String(length=128), sqlalchemy.ForeignKey("queues.id", ondelete="SET NULL"), nullable=True
)
ack_deadline_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
message_retention_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
Expand All @@ -33,7 +36,7 @@ class Queue(Base):
updated_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False)

def __repr__(self):
return f"Queue(id={self.id!r}, topic_id={self.topic_id!r})"
return f"Queue(id={self.id}, topic_id={self.topic_id})"


class Message(Base):
Expand All @@ -48,7 +51,7 @@ class Message(Base):

id = sqlalchemy.Column(postgresql.UUID, primary_key=True, nullable=False)
queue_id = sqlalchemy.Column(
sqlalchemy.String(length=128), sqlalchemy.ForeignKey("queues.id"), nullable=False
sqlalchemy.String(length=128), sqlalchemy.ForeignKey("queues.id", ondelete="CASCADE"), nullable=False
)
data = sqlalchemy.Column(postgresql.JSONB, nullable=False)
attributes = sqlalchemy.Column(postgresql.JSONB, nullable=True)
Expand All @@ -59,4 +62,4 @@ class Message(Base):
updated_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False)

def __repr__(self):
return f"Message(id={self.id!r}, queue_id={self.queue_id!r})"
return f"Message(id={self.id}, queue_id={self.queue_id})"
11 changes: 5 additions & 6 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get_filters_for_consume(queue: Any, now: datetime) -> list:


class Service:
def __init__(self, session):
def __init__(self, session: Session):
self.session = session


Expand All @@ -93,7 +93,6 @@ def list(self, filters: dict | None, offset: int | None, limit: int | None) -> L

def delete(self, id: str) -> None:
topic = get_model(model=Topic, filters={"id": id}, session=self.session)
self.session.query(Queue).filter_by(topic_id=topic.id).update({"topic_id": None})
self.session.query(Topic).filter_by(id=topic.id).delete()
self.session.commit()

Expand Down Expand Up @@ -159,8 +158,6 @@ def list(self, filters: dict | None, offset: int | None, limit: int | None) -> L

def delete(self, id: str) -> None:
queue = get_model(model=Queue, filters={"id": id}, session=self.session)
self.session.query(Message).filter_by(queue_id=queue.id).delete()
self.session.query(Queue).filter_by(dead_queue_id=queue.id).update({"dead_queue_id": None})
self.session.query(Queue).filter_by(id=queue.id).delete()
self.session.commit()

Expand Down Expand Up @@ -214,7 +211,9 @@ def _cleanup_move_messages_to_dead_queue(self, queue: QueueSchema) -> None:
"scheduled_at": scheduled_at,
"updated_at": now,
}
self.session.query(Message).filter(*delivery_attempts_filter).update(update_data)
self.session.query(Message).filter(*delivery_attempts_filter).update(
update_data, synchronize_session=False
)

def cleanup(self, id: str) -> None:
queue = get_model(model=Queue, filters={"id": id}, session=self.session)
Expand All @@ -241,7 +240,7 @@ def redrive(self, id: str, data: RedriveQueueSchema) -> None:
"scheduled_at": scheduled_at,
"updated_at": now,
}
self.session.query(Message).filter(*filters).update(update_data)
self.session.query(Message).filter(*filters).update(update_data, synchronize_session=False)
self.session.commit()


Expand Down
Loading