Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery Task with start_time Set in the Future Causes Blocking of Other Tasks #843

Open
JinRiYao2001 opened this issue Feb 9, 2025 · 1 comment

Comments

@JinRiYao2001
Copy link

JinRiYao2001 commented Feb 9, 2025

Description:

I encountered an issue when setting a specific start_time for a scheduled task in django-celery-beat. If the start_time is set to a time in the future, Celery will treat this time as part of the event_t tuple, specifically the event_t.time field. Celery continues to check whether the task has reached its execution condition.

However, if the start_time is greater than the current time and the time for the next crontab run has not yet been reached, the task will continue to be added to the heap. As a result, the tick function continuously processes this task in the loop, causing it to block the execution of other tasks.

This kind of exception is unlikely to occur with Celery itself, but when used in conjunction with django-celery-beat, it is easy to reproduce the issue due to the user-defined start_time setting.

Code with Explanation:

    def tick(self, event_t=event_t, min=min, heappop=heapq.heappop,
             heappush=heapq.heappush):
        """Run a tick - one iteration of the scheduler.

        Executes one due task per call.

        Returns:
            float: preferred delay in seconds for next call.
        """
        adjust = self.adjust
        max_interval = self.max_interval

        # If the heap is empty or the scheduled tasks have changed, repopulate the heap.
        if (self._heap is None or
                not self.schedules_equal(self.old_schedulers, self.schedule)):
            self.old_schedulers = copy.copy(self.schedule)
            self.populate_heap()

        H = self._heap

        # If the heap is empty, return the maximum interval.
        if not H:
            return max_interval

        event = H[0]
        entry = event[2]
        is_due, next_time_to_run = self.is_due(entry)

        # If the task is due, process the task.
        if is_due:
            verify = heappop(H)
            if verify is event:
                next_entry = self.reserve(entry)
                self.apply_entry(entry, producer=self.producer)

                # Update the task's next scheduled time and push it back into the heap
                heappush(H, event_t(self._when(next_entry, next_time_to_run),
                                    event[1], next_entry))

                return 0
            else:
                # If the task has been modified, push it back into the heap and return the next shortest delay
                heappush(H, verify)
                return min(verify[0], max_interval)

        # **Explanation of the Core Issue**:
        # When `is_due` is False, the task should not be executed immediately. 
        # However, if the task's `event.time` is the smallest in the heap, it will remain at the top of the heap.
        # This is because `event.time` is smaller than the times of other tasks.
        # Since the subsequent processing doesn't adjust this task's `event.time`, 
        # the task will keep being processed and block other tasks from being executed.
        
        adjusted_next_time_to_run = adjust(next_time_to_run)

        # If `next_time_to_run` is a valid numeric value, return the next execution time.
        return min(adjusted_next_time_to_run if is_numeric_value(adjusted_next_time_to_run) else max_interval,
                   max_interval)

Steps to Reproduce:

  1. Set a start_time in the future for a periodic task.
  2. Set up the task with a crontab schedule.
  3. Set another periodic task with a 10-second interval.
  4. Observe that when the current time is earlier than the start_time and the next crontab execution time has not yet arrived, the task continues to be added to the heap.
  5. Notice that when the event.time generated by the crontab task is the smallest, it will remain at the top of the heap, blocking the execution of other tasks (such as the 10-second interval task).

Expected Behavior:

The task should not appear at the top of the heap repeatedly before its execution time, regardless of the schedule. This would prevent unnecessary checks and blocking of other tasks that should run in parallel.

Possible Solution:

The issue can be addressed by modifying the is_due method in django_celery_beat/schedulers.py. The method should calculate the next valid scheduled time based on the start_time and task schedule crontab. Tasks should only be checked for execution when this time is reached.

link

Since I don't know how to set up a unit testing environment for django-celery-beat, I used manual simulation testing. However, I'm unsure if my code modification is correct and whether it can contribute to the community.

Alternatively, the tick method in Celery could be updated to handle tasks with a future event_t.time, preventing them from blocking other tasks until they are ready to execute.

Either solution would prevent tasks with a future event_t.time from being added to the heap and continuously processed, blocking other tasks.

Environment:

celery==5.4.0
Django==5.1.6
django-celery-beat==2.7.0
python==3.12

@JinRiYao2001
Copy link
Author

I found that self.schedule.remaining_estimate cannot calculate the next execution time based on start_time, but this part of the code is implemented in Celery. Therefore, I should modify Celery's code and implement a method in Celery's code to calculate the next execution time for each task.

After implementing the corresponding method, I will call it within django-celery-beat.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant