Skip to content

Commit

Permalink
15692: Introduce background jobs (#16927)
Browse files Browse the repository at this point in the history
* Introduce reusable BackgroundJob framework

A new abstract class can be used to implement job function classes. It
handles the necessary logic for starting and stopping jobs, including
exception handling and rescheduling of recurring jobs.

This commit also includes the migration of data source jobs to the new
framework.

* Restore using import_string for jobs

Using the 'import_string()' utility from Django allows the job script
class to be simplified, as module imports no longer need to avoid loops.
This should make it easier to queue and maintain jobs.

* Use SyncDataSourceJob for management command

Instead of maintaining two separate job execution logics, the same job
is now used for both background and interactive execution.

* Implement BackgroundJob for running scripts

The independent implementations of interactive and background script
execution have been merged into a single BackgroundJob implementation.

* Fix documentation of model features

* Ensure consitent code style

* Introduce reusable ScheduledJob

A new abstract class can be used to implement job function classes that
specialize in scheduling. These use the same logic as regular
BackgroundJobs, but ensure that they are only scheduled once at any given
time.

* Introduce reusable SystemJob

A new abstract class can be used to implement job function classes that
specialize in system background tasks (e.g. synchronization or
housekeeping). In addition to the features of the BackgroundJob and
ScheduledJob classes, these implement additional logic to not need to be
bound to an existing NetBox object and to setup job schedules on plugin
load instead of an interactive request.

* Add documentation for jobs framework

* Revert "Use SyncDataSourceJob for management"

This partially reverts commit db591d4. The 'run_now' parameter of
'enqueue()' remains, as its being used by following commits.

* Merge enqueued status into JobStatusChoices

* Fix logger for ScriptJob

* Remove job name for scripts

Because scripts are already linked through the Job Instance field, the
name is displayed twice. Removing this reduces redundancy and opens up
the possibility of simplifying the BackgroundJob framework in future
commits.

* Merge ScheduledJob into BackgroundJob

Instead of using separate classes, the logic of ScheduledJob is now
merged into the generic BackgroundJob class. This allows reusing the
same logic, but dynamically deciding whether to enqueue the same job
once or multiple times.

* Add name attribute for BackgroundJob

Instead of defining individual names on enqueue, BackgroundJob classes
can now set a job name in their meta class. This is equivalent to other
Django classes and NetBox scripts.

* Drop enqueue_sync_job() method from DataSource

* Import ScriptJob directly

* Relax requirement for Jobs to reference a specific object

* Rename 'run_now' arg on Job.enqueue() to 'immediate'

* Fix queue lookup in Job enqueue

* Collapse SystemJob into BackgroundJob

* Remove legacy JobResultStatusChoices

ChoiceSet was moved to core in 40572b5.

* Use queue 'low' for system jobs by default

System jobs usually perform low-priority background tasks and therefore
can use a different queue than 'default', which is used for regular jobs
related to specific objects.

* Add test cases for BackgroundJob handling

* Fix enqueue interval jobs

As the job's name is set by enqueue(), it must not be passed in handle()
to avoid duplicate kwargs with the same name.

* Honor schedule_at for job's enqueue_once

Not only can a job's interval change, but so can the time at which it is
scheduled to run. If a specific scheduled time is set, it will also be
checked against the current job schedule. If there are any changes, the
job is rescheduled with the new time.

* Switch BackgroundJob to regular methods

Instead of using a class method for run(), a regular method is used for
this purpose. This gives the possibility to add more convenience methods
in the future, e.g. for interacting with the job object or for logging,
as implemented for scripts.

* Fix background tasks documentation

* Test enqueue in combination with enqueue_once

* Rename background jobs to tasks (to differentiate from RQ)

* Touch up docs

* Revert "Use queue 'low' for system jobs by default"

This reverts commit b17b205.

* Remove system background job

This commit reverts commits 4880d81 and 0b15ecf. Using the database
'connection_created' signal for job registration feels a little wrong at
this point, as it would trigger registration very often. However, the
background job framework is prepared for this use case and can be used
by plugins once the auto-registration of jobs is solved.

* Fix runscript management command

Defining names for background jobs was disabled with fb75389. The
preceeding changes in 257976d did forget the management command.

* Use regular imports for ScriptJob

* Rename BackgroundJob to JobRunner

---------

Co-authored-by: Jeremy Stretch <[email protected]>
  • Loading branch information
alehaa and jeremystretch authored Jul 30, 2024
1 parent dde84b4 commit d3a3a6b
Show file tree
Hide file tree
Showing 23 changed files with 597 additions and 314 deletions.
2 changes: 1 addition & 1 deletion docs/development/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Depending on its classification, each NetBox model may support various features
| [Custom links](../customization/custom-links.md) | `CustomLinksMixin` | `custom_links` | These models support the assignment of custom links |
| [Custom validation](../customization/custom-validation.md) | `CustomValidationMixin` | - | Supports the enforcement of custom validation rules |
| [Export templates](../customization/export-templates.md) | `ExportTemplatesMixin` | `export_templates` | Users can create custom export templates for these models |
| [Job results](../features/background-jobs.md) | `JobsMixin` | `jobs` | Users can create custom export templates for these models |
| [Job results](../features/background-jobs.md) | `JobsMixin` | `jobs` | Background jobs can be scheduled for these models |
| [Journaling](../features/journaling.md) | `JournalingMixin` | `journaling` | These models support persistent historical commentary |
| [Synchronized data](../integrations/synchronized-data.md) | `SyncedDataMixin` | `synced_data` | Certain model data can be automatically synchronized from a remote data source |
| [Tagging](../models/extras/tag.md) | `TagsMixin` | `tags` | The models can be tagged with user-defined tags |
Expand Down
97 changes: 97 additions & 0 deletions docs/plugins/development/background-jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Background Jobs

NetBox plugins can defer certain operations by enqueuing [background jobs](../../features/background-jobs.md), which are executed asynchronously by background workers. This is helpful for decoupling long-running processes from the user-facing request-response cycle.

For example, your plugin might need to fetch data from a remote system. Depending on the amount of data and the responsiveness of the remote server, this could take a few minutes. Deferring this task to a queued job ensures that it can be completed in the background, without interrupting the user. The data it fetches can be made available once the job has completed.

## Job Runners

A background job implements a basic [Job](../../models/core/job.md) executor for all kinds of tasks. It has logic implemented to handle the management of the associated job object, rescheduling of periodic jobs in the given interval and error handling. Adding custom jobs is done by subclassing NetBox's `JobRunner` class.

::: utilities.jobs.JobRunner

#### Example

```python title="jobs.py"
from utilities.jobs import JobRunner


class MyTestJob(JobRunner):
class Meta:
name = "My Test Job"

def run(self, *args, **kwargs):
obj = self.job.object
# your logic goes here
```

You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`. However, no `name` argument must be passed, as the background job name will be used instead.

### Attributes

`JobRunner` attributes are defined under a class named `Meta` within the job. These are optional, but encouraged.

#### `name`

This is the human-friendly names of your background job. If omitted, the class name will be used.

### Scheduled Jobs

As described above, jobs can be scheduled for immediate execution or at any later time using the `enqueue()` method. However, for management purposes, the `enqueue_once()` method allows a job to be scheduled exactly once avoiding duplicates. If a job is already scheduled for a particular instance, a second one won't be scheduled, respecting thread safety. An example use case would be to schedule a periodic task that is bound to an instance in general, but not to any event of that instance (such as updates). The parameters of the `enqueue_once()` method are identical to those of `enqueue()`.

!!! tip
It is not forbidden to `enqueue()` additional jobs while an interval schedule is active. An example use of this would be to schedule a periodic daily synchronization, but also trigger additional synchronizations on demand when the user presses a button.

#### Example

```python title="jobs.py"
from utilities.jobs import JobRunner


class MyHousekeepingJob(JobRunner):
class Meta:
name = "Housekeeping"

def run(self, *args, **kwargs):
# your logic goes here
```

```python title="__init__.py"
from netbox.plugins import PluginConfig

class MyPluginConfig(PluginConfig):
def ready(self):
from .jobs import MyHousekeepingJob
MyHousekeepingJob.setup(interval=60)
```

## Task queues

Three task queues of differing priority are defined by default:

* High
* Default
* Low

Any tasks in the "high" queue are completed before the default queue is checked, and any tasks in the default queue are completed before those in the "low" queue.

Plugins can also add custom queues for their own needs by setting the `queues` attribute under the PluginConfig class. An example is included below:

```python
class MyPluginConfig(PluginConfig):
name = 'myplugin'
...
queues = [
'foo',
'bar',
]
```

The `PluginConfig` above creates two custom queues with the following names `my_plugin.foo` and `my_plugin.bar`. (The plugin's name is prepended to each queue to avoid conflicts between plugins.)

!!! warning "Configuring the RQ worker process"
By default, NetBox's RQ worker process only services the high, default, and low queues. Plugins which introduce custom queues should advise users to either reconfigure the default worker, or run a dedicated worker specifying the necessary queues. For example:

```
python manage.py rqworker my_plugin.foo my_plugin.bar
```
30 changes: 0 additions & 30 deletions docs/plugins/development/background-tasks.md

This file was deleted.

1 change: 1 addition & 0 deletions docs/plugins/development/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ project-name/
- __init__.py
- filtersets.py
- graphql.py
- jobs.py
- models.py
- middleware.py
- navigation.py
Expand Down
2 changes: 2 additions & 0 deletions docs/plugins/development/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ For more information about database migrations, see the [Django documentation](h

::: netbox.models.features.ExportTemplatesMixin

::: netbox.models.features.JobsMixin

::: netbox.models.features.JournalingMixin

::: netbox.models.features.TagsMixin
Expand Down
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ nav:
- Data Backends: 'plugins/development/data-backends.md'
- REST API: 'plugins/development/rest-api.md'
- GraphQL API: 'plugins/development/graphql-api.md'
- Background Tasks: 'plugins/development/background-tasks.md'
- Background Jobs: 'plugins/development/background-jobs.md'
- Dashboard Widgets: 'plugins/development/dashboard-widgets.md'
- Staged Changes: 'plugins/development/staged-changes.md'
- Exceptions: 'plugins/development/exceptions.md'
Expand Down
8 changes: 7 additions & 1 deletion netbox/core/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from rest_framework.viewsets import ReadOnlyModelViewSet

from core import filtersets
from core.choices import DataSourceStatusChoices
from core.jobs import SyncDataSourceJob
from core.models import *
from netbox.api.metadata import ContentTypeMetadata
from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet
Expand Down Expand Up @@ -36,7 +38,11 @@ def sync(self, request, pk):
if not request.user.has_perm('core.sync_datasource', obj=datasource):
raise PermissionDenied(_("This user does not have permission to synchronize this data source."))

datasource.enqueue_sync_job(request)
# Enqueue the sync job & update the DataSource's status
SyncDataSourceJob.enqueue(instance=datasource, user=request.user)
datasource.status = DataSourceStatusChoices.QUEUED
DataSource.objects.filter(pk=datasource.pk).update(status=datasource.status)

serializer = serializers.DataSourceSerializer(datasource, context={'request': request})

return Response(serializer.data)
Expand Down
6 changes: 6 additions & 0 deletions netbox/core/choices.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ class JobStatusChoices(ChoiceSet):
(STATUS_FAILED, _('Failed'), 'red'),
)

ENQUEUED_STATE_CHOICES = (
STATUS_PENDING,
STATUS_SCHEDULED,
STATUS_RUNNING,
)

TERMINAL_STATE_CHOICES = (
STATUS_COMPLETED,
STATUS_ERRORED,
Expand Down
32 changes: 16 additions & 16 deletions netbox/core/jobs.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
import logging

from netbox.search.backends import search_backend
from .choices import *
from utilities.jobs import JobRunner
from .choices import DataSourceStatusChoices
from .exceptions import SyncError
from .models import DataSource
from rq.timeouts import JobTimeoutException

logger = logging.getLogger(__name__)


def sync_datasource(job, *args, **kwargs):
class SyncDataSourceJob(JobRunner):
"""
Call sync() on a DataSource.
"""
datasource = DataSource.objects.get(pk=job.object_id)

try:
job.start()
datasource.sync()
class Meta:
name = 'Synchronization'

# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())
def run(self, *args, **kwargs):
datasource = DataSource.objects.get(pk=self.job.object_id)

job.terminate()
try:
datasource.sync()

except Exception as e:
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) in (SyncError, JobTimeoutException):
logging.error(e)
else:
# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())

except Exception as e:
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) is SyncError:
logging.error(e)
raise e
24 changes: 24 additions & 0 deletions netbox/core/migrations/0012_job_object_type_optional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('contenttypes', '0002_remove_content_type_name'),
('core', '0011_move_objectchange'),
]

operations = [
migrations.AlterField(
model_name='job',
name='object_type',
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name='jobs',
to='contenttypes.contenttype'
),
),
]
19 changes: 1 addition & 18 deletions netbox/core/models/data.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import hashlib
import logging
import os
import yaml
from fnmatch import fnmatchcase
from urllib.parse import urlparse

import yaml
from django.conf import settings
from django.contrib.contenttypes.fields import GenericForeignKey
from django.core.exceptions import ValidationError
from django.core.validators import RegexValidator
from django.db import models
from django.urls import reverse
from django.utils import timezone
from django.utils.module_loading import import_string
from django.utils.translation import gettext as _

from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
Expand All @@ -23,7 +22,6 @@
from ..choices import *
from ..exceptions import SyncError
from ..signals import post_sync, pre_sync
from .jobs import Job

__all__ = (
'AutoSyncRecord',
Expand Down Expand Up @@ -153,21 +151,6 @@ def to_objectchange(self, action):

return objectchange

def enqueue_sync_job(self, request):
"""
Enqueue a background job to synchronize the DataSource by calling sync().
"""
# Set the status to "syncing"
self.status = DataSourceStatusChoices.QUEUED
DataSource.objects.filter(pk=self.pk).update(status=self.status)

# Enqueue a sync job
return Job.enqueue(
import_string('core.jobs.sync_datasource'),
instance=self,
user=request.user
)

def get_backend(self):
backend_params = self.parameters or {}
return self.backend_class(self.source_url, **backend_params)
Expand Down
31 changes: 25 additions & 6 deletions netbox/core/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Job(models.Model):
to='contenttypes.ContentType',
related_name='jobs',
on_delete=models.CASCADE,
blank=True,
null=True
)
object_id = models.PositiveBigIntegerField(
blank=True,
Expand Down Expand Up @@ -197,25 +199,34 @@ def terminate(self, status=JobStatusChoices.STATUS_COMPLETED, error=None):
job_end.send(self)

@classmethod
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs):
def enqueue(cls, func, instance=None, name='', user=None, schedule_at=None, interval=None, immediate=False, **kwargs):
"""
Create a Job instance and enqueue a job using the given callable
Args:
func: The callable object to be enqueued for execution
instance: The NetBox object to which this job pertains
instance: The NetBox object to which this job pertains (optional)
name: Name for the job (optional)
user: The user responsible for running the job
schedule_at: Schedule the job to be executed at the passed date and time
interval: Recurrence interval (in minutes)
immediate: Run the job immediately without scheduling it in the background. Should be used for interactive
management commands only.
"""
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
rq_queue_name = get_queue_for_model(object_type.model)
if schedule_at and immediate:
raise ValueError("enqueue() cannot be called with values for both schedule_at and immediate.")

if instance:
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
object_id = instance.pk
else:
object_type = object_id = None
rq_queue_name = get_queue_for_model(object_type.model if object_type else None)
queue = django_rq.get_queue(rq_queue_name)
status = JobStatusChoices.STATUS_SCHEDULED if schedule_at else JobStatusChoices.STATUS_PENDING
job = Job.objects.create(
object_type=object_type,
object_id=instance.pk,
object_id=object_id,
name=name,
status=status,
scheduled=schedule_at,
Expand All @@ -224,8 +235,16 @@ def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=
job_id=uuid.uuid4()
)

if schedule_at:
# Run the job immediately, rather than enqueuing it as a background task. Note that this is a synchronous
# (blocking) operation, and execution will pause until the job completes.
if immediate:
func(job_id=str(job.job_id), job=job, **kwargs)

# Schedule the job to run at a specific date & time.
elif schedule_at:
queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs)

# Schedule the job to run asynchronously at this first available opportunity.
else:
queue.enqueue(func, job_id=str(job.job_id), job=job, **kwargs)

Expand Down
Loading

0 comments on commit d3a3a6b

Please sign in to comment.