Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor way of working with RQ meta #9082

Merged
merged 39 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
2a2dc86
Add new API && remove deprecated && refactor working with rq meta
Marishka17 Feb 7, 2025
e3d6da9
Remove outdated tests
Marishka17 Feb 7, 2025
a01a8a8
Resolve conflicts
Marishka17 Feb 7, 2025
0f71c08
Rename method
Marishka17 Feb 7, 2025
a22e10f
Split RQMeta into several classes && small fixes
Marishka17 Feb 9, 2025
19a322a
Black code
Marishka17 Feb 9, 2025
f535d32
f
Marishka17 Feb 9, 2025
3c9895b
Fix typo
Marishka17 Feb 9, 2025
1762178
Revert some changes
Marishka17 Feb 10, 2025
472763c
Sort imports
Marishka17 Feb 10, 2025
d9eeecb
Remove commented code
Marishka17 Feb 10, 2025
b8f8a88
Fix var usage
Marishka17 Feb 10, 2025
6208de4
Small fixes
Marishka17 Feb 10, 2025
c9bbe48
Fix types
Marishka17 Feb 10, 2025
3b9aefc
Fix meta update
Marishka17 Feb 12, 2025
7ea0523
Resolve conflicts
Marishka17 Feb 14, 2025
0253b4e
apply comments
Marishka17 Feb 17, 2025
d15ae47
rename module && move define_dependent_job into rq.py
Marishka17 Feb 17, 2025
a16f179
Fix imports sorting
Marishka17 Feb 17, 2025
2d8fd91
Fix exception class used
Marishka17 Feb 17, 2025
f8908cb
Merge branch 'develop' into mk/refactor_working_with_rq_meta
Marishka17 Feb 17, 2025
583c09a
Resolve conflicts
Marishka17 Feb 20, 2025
78cf972
Use descriptors
Marishka17 Feb 20, 2025
0ac2010
Merge branch 'develop' into mk/refactor_working_with_rq_meta
Marishka17 Feb 20, 2025
eedf1b5
black
Marishka17 Feb 20, 2025
8ca66ff
Fix merge
Marishka17 Feb 21, 2025
9ce85ac
Small improvments
Marishka17 Feb 21, 2025
06e81a3
refactor a bit
Marishka17 Feb 21, 2025
a0b7a74
fix typos
Marishka17 Feb 21, 2025
80a9705
Apply some comments
Marishka17 Feb 24, 2025
aa58f13
Merge develop
Marishka17 Feb 24, 2025
97368ef
Update consensus app
Marishka17 Feb 24, 2025
6edd66c
sort imports
Marishka17 Feb 24, 2025
dc6780d
Fix merge
Marishka17 Feb 24, 2025
3663015
Simplify the code
Marishka17 Feb 24, 2025
976871c
Raise AttributeError when trying to set immutable attribute
Marishka17 Feb 24, 2025
227c877
Make lambda attr optional
Marishka17 Feb 24, 2025
83fc87a
Merge branch 'develop' into mk/refactor_working_with_rq_meta
Marishka17 Feb 24, 2025
9b83548
Fix exception catching
Marishka17 Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Resolve conflicts
  • Loading branch information
Marishka17 committed Feb 20, 2025
commit 583c09a025f8023766b28fda20d04dc276ab90aa
20 changes: 17 additions & 3 deletions cvat/apps/engine/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from django_rq.queues import DjangoRQ
from rq.job import Dependency as RQDependency
from rq.job import Job as RQJob
from rq.registry import BaseRegistry as RQBaseRegistry

from cvat.apps.engine.types import ExtendedRequest

Expand Down Expand Up @@ -407,13 +408,17 @@ def define_dependent_job(
if not should_be_dependent:
return None

queues = [queue.deferred_job_registry, queue, queue.started_job_registry]
queues: list[RQBaseRegistry | DjangoRQ] = [
queue.deferred_job_registry,
queue,
queue.started_job_registry,
]
# Since there is no cleanup implementation in DeferredJobRegistry,
# this registry can contain "outdated" jobs that weren't deleted from it
# but were added to another registry. Probably such situations can occur
# if there are active or deferred jobs when restarting the worker container.
filters = [lambda job: job.is_deferred, lambda _: True, lambda _: True]
all_user_jobs = []
all_user_jobs: list[RQJob] = []
for q, f in zip(queues, filters):
job_ids = q.get_job_ids()
jobs = q.job_class.fetch_many(job_ids, q.connection)
Expand All @@ -422,8 +427,17 @@ def define_dependent_job(
)
all_user_jobs.extend(jobs)

# prevent possible cyclic dependencies
if rq_id:
# Prevent cases where an RQ job depends on itself.
# It isn't possible to have multiple RQ jobs with the same ID in Redis.
# However, a race condition in request processing can lead to self-dependencies
# when 2 parallel requests attempt to enqueue RQ jobs with the same ID.
# This happens if an rq_job is fetched without a lock,
# but a lock is used when defining the dependent job and enqueuing a new one.
if any(rq_id == job.id for job in all_user_jobs):
return None

# prevent possible cyclic dependencies
all_job_dependency_ids = {
dep_id.decode() for job in all_user_jobs for dep_id in job.dependency_ids or ()
}
Expand Down
3 changes: 2 additions & 1 deletion cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
from PIL import Image
from redis.lock import Lock
from rest_framework.reverse import reverse as _reverse
from rq.job import Job
from rq.job import Job as RQJob


from cvat.apps.engine.types import ExtendedRequest

Expand Down
31 changes: 12 additions & 19 deletions cvat/apps/engine/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3481,24 +3481,17 @@ def _import_annotations(
av_scan_paths(filename)
user_id = request.user.id

if location == Location.CLOUD_STORAGE:
func_args = (db_storage, key, func) + func_args
func = import_resource_from_cloud_storage

av_scan_paths(filename)
user_id = request.user.id

with get_rq_lock_by_user(queue, user_id):
meta = ImportRQMeta.build_for(request=request, db_obj=db_obj, tmp_file=filename)
rq_job = queue.enqueue_call(
func=func,
args=func_args,
job_id=rq_id,
depends_on=define_dependent_job(queue, user_id, rq_id=rq_id),
meta=meta,
result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(),
failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds()
)
with get_rq_lock_by_user(queue, user_id):
meta = ImportRQMeta.build_for(request=request, db_obj=db_obj, tmp_file=filename)
queue.enqueue_call(
func=func,
args=func_args,
job_id=rq_id,
depends_on=define_dependent_job(queue, user_id, rq_id=rq_id),
meta=meta,
result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(),
failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds()
)

# log events after releasing Redis lock
if not rq_job:
Expand Down Expand Up @@ -3615,7 +3608,7 @@ def _import_project_dataset(

with get_rq_lock_by_user(queue, user_id):
meta = ImportRQMeta.build_for(request=request, db_obj=db_obj, tmp_file=filename)
rq_job = queue.enqueue_call(
queue.enqueue_call(
func=func,
args=func_args,
job_id=rq_id,
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.