Skip to content

Commit

Permalink
📦️❌ Fix cancel of stored job (#37)
Browse files Browse the repository at this point in the history
* Allow reading possibly missing job queue name

* Allow cancel of stored jobs

* Don't expect job queue to exist in error reporting
  • Loading branch information
ruksi authored Mar 8, 2024
1 parent 2defd2a commit cd353e5
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 8 deletions.
5 changes: 4 additions & 1 deletion minique/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ def cancel_job(
if not (job.has_finished or job.has_started):
with redis.pipeline() as p:
p.hset(job.redis_key, "status", JobStatus.CANCELLED.value)
p.lrem(job.get_queue().redis_key, 0, job.id)
queue_name = job.get_queue_name()
if queue_name:
queue = Queue(redis, name=queue_name)
p.lrem(queue.redis_key, 0, job.id)
if expire_time:
p.expire(job.redis_key, expire_time)
p.execute()
Expand Down
5 changes: 5 additions & 0 deletions minique/models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,13 @@ def duration(self) -> float:

@property
def queue_name(self) -> str:
return self.get_queue_name(missing_ok=False) # type:ignore[return-value]

def get_queue_name(self, *, missing_ok=True) -> Optional[str]:
queue = self.redis.hget(self.redis_key, "queue")
if not queue:
if missing_ok:
return None
raise MissingJobData(f"Job {self.id} has no queue")
return queue.decode()

Expand Down
12 changes: 5 additions & 7 deletions minique/work/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,10 @@ def run(self) -> None:

def process_exception(self, excinfo: ExcInfo) -> None:
try:
self.worker.process_exception(
excinfo,
context={
"id": str(self.job.id),
"queue": str(self.job.queue_name),
},
)
context = {"id": str(self.job.id)}
queue_name = self.job.get_queue_name()
if queue_name:
context["queue"] = queue_name
self.worker.process_exception(excinfo, context=context)
except Exception:
self.log.warning("error running process_exception()", exc_info=True)
7 changes: 7 additions & 0 deletions minique_tests/test_minique.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,10 @@ def test_stored_jobs(redis: Redis, random_queue_name: str) -> None:
_ = job.queue_name
r_job = get_job(redis, job.id)
assert r_job == job


def test_stored_job_cancel(redis: Redis, random_queue_name: str) -> None:
job = store(redis, reverse_job_id)
assert get_job(redis, job.id).status == JobStatus.NONE
assert cancel_job(redis, job.id)
assert get_job(redis, job.id).status == JobStatus.CANCELLED

0 comments on commit cd353e5

Please sign in to comment.