Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RQ: implement reliable timeout #4305

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion redash/cli/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

from click import argument
from flask.cli import AppGroup
from rq import Connection, Worker
from rq import Connection
from sqlalchemy.orm import configure_mappers

from redash import rq_redis_connection
from redash.worker import HardTimeLimitingWorker as Worker
from redash.schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions

manager = AppGroup(help="RQ management commands.")
Expand Down
73 changes: 71 additions & 2 deletions redash/worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

import errno
import os
from datetime import timedelta
from functools import partial

Expand All @@ -9,8 +10,11 @@
from celery.signals import worker_process_init
from celery.utils.log import get_logger

from rq import get_current_job
from rq import Worker, get_current_job
from rq.utils import utcnow
from rq.decorators import job as rq_job
from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException
from rq.job import JobStatus

from redash import create_app, extensions, settings, redis_connection, rq_redis_connection
from redash.metrics import celery as celery_metrics # noqa
Expand Down Expand Up @@ -93,3 +97,68 @@ def add_periodic_tasks(sender, **kwargs):
for params in extensions.periodic_tasks.values():
# Add it to Celery's periodic task registry, too.
sender.add_periodic_task(**params)


class HardTimeLimitingWorker(Worker):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we move this into its own module in the redash.tasks package? The redash.schedule module belongs there as well.

I would move all the rq stuff from redash.worker into redash.tasks and eventually remove this module when we say goodbye to Celery.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also worth adding some documentation on why we added this class.

grace_period = 15

def soft_limit_exceeded(self, job):
seconds_under_monitor = (utcnow() - self.monitor_started).seconds
return seconds_under_monitor > job.timeout + self.grace_period

def enforce_hard_limit(self, job):
self.log.warning('Job %s exceeded timeout of %ds (+%ds grace period) but work horse did not terminate it. '
'Killing the work horse.', job.id, job.timeout, self.grace_period)
self.kill_horse()

def monitor_work_horse(self, job):
"""The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to
failed
"""
self.monitor_started = utcnow()
while True:
try:
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
retpid, ret_val = os.waitpid(self._horse_pid, 0)
break
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
self.heartbeat(self.job_monitoring_interval + 5)

if self.soft_limit_exceeded(job):
self.enforce_hard_limit(job)
except OSError as e:
# In case we encountered an OSError due to EINTR (which is
# caused by a SIGINT or SIGTERM signal during
# os.waitpid()), we simply ignore it and enter the next
# iteration of the loop, waiting for the child to end. In
# any other case, this is some other unexpected OS error,
# which we don't want to catch, so we re-raise those ones.
if e.errno != errno.EINTR:
raise
# Send a heartbeat to keep the worker alive.
self.heartbeat()

if ret_val == os.EX_OK: # The process exited normally.
return
job_status = job.get_status()
if job_status is None: # Job completed and its ttl has expired
return
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:

if not job.ended_at:
job.ended_at = utcnow()

# Unhandled failure: move the job to the failed queue
self.log.warning((
'Moving job to FailedJobRegistry '
'(work-horse terminated unexpectedly; waitpid returned {})'
).format(ret_val))

self.handle_job_failure(
job,
exc_string="Work-horse process was terminated unexpectedly "
"(waitpid returned %s)" % ret_val
)