Skip to content

Commit

Permalink
Add ability to pause jobs by queue, job class, or label (experimental) (
Browse files Browse the repository at this point in the history
#1575)

* Add ability to pause jobs by queue or job class (experimental)

* Move behavior into pauses controller

* Put all the values under a single setting key

* Add label as pausable

* Only show Dashboard UI when configuration is enabled

* Add a number badge to navbar

* Remove extraneous changes

* Clean up the controller
  • Loading branch information
bensheldon authored Jan 22, 2025
1 parent 81fdbe2 commit 19234ee
Show file tree
Hide file tree
Showing 35 changed files with 1,111 additions and 78 deletions.
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ For more of the story of GoodJob, read the [introductory blog post](https://isla
- [Write tests](#write-tests)
- [PgBouncer compatibility](#pgbouncer-compatibility)
- [CLI HTTP health check probes](#cli-http-health-check-probes)
- [Pausing Jobs](#pausing-jobs)
- [Doing your best job with GoodJob](#doing-your-best-job-with-goodjob)
- [Sizing jobs: mice and elephants](#sizing-jobs-mice-and-elephants)
- [Isolating by total latency](#isolating-by-total-latency)
Expand Down Expand Up @@ -329,6 +330,8 @@ Available configuration options are:
config.good_job.probe_handler = 'webrick'
```
- `enable_pauses` (boolean) whether job processing can be paused. Defaults to `false`. You can also set this with the environment variable `GOOD_JOB_ENABLE_PAUSES`.
By default, GoodJob configures the following execution modes per environment:
```ruby
Expand Down Expand Up @@ -1559,9 +1562,35 @@ gem 'webrick'
If WEBrick is configured to be used, but the dependency is not found, GoodJob will log a warning and fallback to the default probe server.
### Pausing Jobs
GoodJob allows for pausing jobs by queue or job class. This feature is currently opt-in because the performance impact of loading and filtering these attributes is not yet known. To enable this feature, add the following to your configuration:
> ```ruby
> config.good_job.enable_pauses = true
> ```
Pausing can be done via the Dashboard's Performance page, or in Ruby
```ruby
# To pause:
GoodJob.pause(queue: "default")
GoodJob.pause(job_class: "MyJob")
# To check status
GoodJob.paused # => { queues: ["default"], job_classes: ["MyJob"] }
GoodJob.paused?(queue: "default") # => true
GoodJob.paused?(job_class: "MyJob") # => true
GoodJob.paused? # => true
# To unpause
GoodJob.unpause(queue: "default")
GoodJob.unpause(job_class: "MyJob")
````
## Doing your best job with GoodJob
_This section explains how to use GoodJob the most efficiently and performantly, according to its maintainers. GoodJob is very flexible and you dont necessarily have to use it this way, but the concepts explained here are part of GoodJobs design intent._
_This section explains how to use GoodJob the most efficiently and performantly, according to its maintainers. GoodJob is very flexible and you don't necessarily have to use it this way, but the concepts explained here are part of GoodJob's design intent._
Background jobs are hard. There are two extremes:
Expand Down
2 changes: 2 additions & 0 deletions app/controllers/good_job/metrics_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ def primary_nav
jobs_count = GoodJob::Job.count
batches_count = GoodJob::BatchRecord.all.size
cron_entries_count = GoodJob::CronEntry.all.size
pauses_count = GoodJob::Setting.paused.values.sum(&:count)
processes_count = GoodJob::Process.active.count
discarded_count = GoodJob::Job.discarded.count

render json: {
jobs_count: helpers.number_to_human(jobs_count),
batches_count: helpers.number_to_human(batches_count),
cron_entries_count: helpers.number_to_human(cron_entries_count),
pauses_count: helpers.number_to_human(pauses_count),
processes_count: helpers.number_to_human(processes_count),
discarded_count: helpers.number_to_human(discarded_count),
}
Expand Down
34 changes: 34 additions & 0 deletions app/controllers/good_job/pauses_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# frozen_string_literal: true

module GoodJob
class PausesController < ApplicationController
before_action :validate_type, only: [:create, :destroy]
def index
@paused = GoodJob::Setting.paused
end

def create
pause_type = params[:type].to_sym
pause_value = params[:value].to_s

GoodJob::Setting.pause(pause_type => pause_value)
redirect_to({ action: :index }, notice: "Successfully paused #{params[:type]} '#{params[:value]}'")
end

def destroy
pause_type = params[:type].to_sym
pause_value = params[:value].to_s

GoodJob::Setting.unpause(pause_type => pause_value)
redirect_to({ action: :index }, notice: "Successfully unpaused #{params[:type]} '#{params[:value]}'")
end

private

def validate_type
return if params[:type].in?(%w[queue job_class label]) && params[:value].to_s.present?

raise ActionController::BadRequest, "Invalid type"
end
end
end
26 changes: 15 additions & 11 deletions app/controllers/good_job/performance_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@
module GoodJob
class PerformanceController < ApplicationController
def index
@performances = GoodJob::Execution
.where.not(job_class: nil)
.group(:job_class)
.select("
job_class,
COUNT(*) AS executions_count,
AVG(duration) AS avg_duration,
MIN(duration) AS min_duration,
MAX(duration) AS max_duration
")
.order("job_class")
@performances = GoodJob::Execution.group(:job_class).select("
job_class,
COUNT(*) AS executions_count,
AVG(duration) AS avg_duration,
MIN(duration) AS min_duration,
MAX(duration) AS max_duration
").order(:job_class)

@queue_performances = GoodJob::Execution.group(:queue_name).select("
queue_name,
COUNT(*) AS executions_count,
AVG(duration) AS avg_duration,
MIN(duration) AS min_duration,
MAX(duration) AS max_duration
").order(:queue_name)
end

def show
Expand Down
29 changes: 28 additions & 1 deletion app/models/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,33 @@ class Job < BaseRecord
# @return [ActiveRecord::Relation]
scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))).or(where(scheduled_at: nil)) }

# Exclude jobs that are paused via queue_name or job_class.
# Only applies when enable_pauses configuration is true.
# @!method exclude_paused
# @!scope class
# @return [ActiveRecord::Relation]
scope :exclude_paused, lambda {
return all unless GoodJob.configuration.enable_pauses

paused_query = GoodJob::Setting.where(key: GoodJob::Setting::PAUSES)
paused_queues_query = paused_query.select("jsonb_array_elements_text(value->'queues')")
paused_job_classes_query = paused_query.select("jsonb_array_elements_text(value->'job_classes')")
paused_labels_query = paused_query.select("jsonb_array_elements_text(value->'labels')")

where.not(queue_name: paused_queues_query)
.where.not(job_class: paused_job_classes_query)
.where(
Arel::Nodes::Not.new(
Arel::Nodes::NamedFunction.new(
"COALESCE", [
Arel::Nodes::InfixOperation.new('&&', arel_table['labels'], Arel::Nodes::NamedFunction.new('ARRAY', [paused_labels_query.arel])),
Arel::Nodes::SqlLiteral.new('FALSE'),
]
)
)
)
}

# Order jobs by priority (highest priority first).
# @!method priority_ordered
# @!scope class
Expand Down Expand Up @@ -290,7 +317,7 @@ def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_l
job = nil
result = nil

unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |jobs|
unfinished.dequeueing_ordered(parsed_queues).only_scheduled.exclude_paused.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |jobs|
job = jobs.first

if job&.executable?
Expand Down
78 changes: 78 additions & 0 deletions app/models/good_job/setting.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module GoodJob
class Setting < BaseRecord
CRON_KEYS_ENABLED = "cron_keys_enabled"
CRON_KEYS_DISABLED = "cron_keys_disabled"
PAUSES = "pauses"

self.table_name = 'good_job_settings'
self.implicit_order_column = 'created_at'
Expand Down Expand Up @@ -47,5 +48,82 @@ def self.cron_key_disable(key)
disabled_setting.value << key unless disabled_setting.value.include?(key_string)
disabled_setting.save!
end

def self.pause(queue: nil, job_class: nil, label: nil)
raise ArgumentError, "Must provide exactly one of queue, job_class, or label" unless [queue, job_class, label].count(&:present?) == 1

setting = find_or_initialize_by(key: PAUSES) do |record|
record.value = { "queues" => [], "job_classes" => [], "labels" => [] }
end

if queue
setting.value["queues"] ||= []
setting.value["queues"] << queue.to_s unless setting.value["queues"].include?(queue.to_s)
elsif job_class
setting.value["job_classes"] ||= []
setting.value["job_classes"] << job_class.to_s unless setting.value["job_classes"].include?(job_class.to_s)
else
setting.value["labels"] ||= []
setting.value["labels"] << label.to_s unless setting.value["labels"].include?(label.to_s)
end
setting.save!
end

def self.unpause(queue: nil, job_class: nil, label: nil)
raise ArgumentError, "Must provide exactly one of queue, job_class, or label" unless [queue, job_class, label].count(&:present?) == 1

setting = find_by(key: PAUSES)
return unless setting

if queue
return unless setting.value["queues"]&.include?(queue.to_s)

setting.value["queues"].delete(queue.to_s)
elsif job_class
return unless setting.value["job_classes"]&.include?(job_class.to_s)

setting.value["job_classes"].delete(job_class.to_s)
else
return unless setting.value["labels"]&.include?(label.to_s)

setting.value["labels"].delete(label.to_s)
end
setting.save!
end

def self.paused?(queue: nil, job_class: nil, label: nil)
raise ArgumentError, "Must provide at most one of queue, job_class, or label" if [queue, job_class, label].count(&:present?) > 1

if queue
queue.in? paused(:queues)
elsif job_class
job_class.in? paused(:job_classes)
elsif label
label.in? paused(:labels)
else
paused.values.any?(&:any?)
end
end

def self.paused(type = nil)
setting = find_by(key: PAUSES)
pauses = setting&.value&.deep_dup || { "queues" => [], "job_classes" => [], "labels" => [] }
pauses = pauses.with_indifferent_access

case type
when :queues
pauses["queues"]
when :job_classes
pauses["job_classes"]
when :labels
pauses["labels"]
else
{
queues: pauses["queues"] || [],
job_classes: pauses["job_classes"] || [],
labels: pauses["labels"] || [],
}
end
end
end
end
28 changes: 28 additions & 0 deletions app/views/good_job/pauses/_group.html.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<% group = type.to_s.pluralize.to_sym %>
<% items = paused.fetch(group) { [] } %>
<% if items&.any? %>
<div class="my-3 card">
<div class="list-group list-group-flush text-nowrap" role="table">
<header class="list-group-item bg-body-tertiary">
<div class="row small text-muted text-uppercase align-items-center">
<div class="col-12"><%= t("good_job.pauses.index.#{type}") %></div>
</div>
</header>

<% items.each do |value| %>
<li class="list-group-item d-flex justify-content-between align-items-center">
<%= value %>
<%= button_to(
{ action: :destroy, type: type, value: value },
method: :delete,
class: 'btn btn-sm btn-outline-primary',
data: { confirm: t('good_job.pauses.index.confirm_unpause', value: value) }
) do %>
<%= render_icon "play" %>
<%= t("good_job.pauses.index.unpause") %>
<% end %>
</li>
<% end %>
</div>
</div>
<% end %>
8 changes: 8 additions & 0 deletions app/views/good_job/pauses/_pause.html.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<li class="list-group-item d-flex justify-content-between align-items-center">
<%= value %>
<%= button_to t('good_job.pauses.index.unpause'),
{ action: :destroy, type: type, value: value },
method: :delete,
class: 'btn btn-sm btn-outline-primary',
data: { confirm: t('good_job.pauses.index.confirm_unpause', value: value) } %>
</li>
46 changes: 46 additions & 0 deletions app/views/good_job/pauses/index.html.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<%= content_for :page_title do %>
<%= t('good_job.pauses.index.title') %>
<% end %>

<div class="border-bottom">
<h1 class="h2 pt-3 pb-2"><%= t('good_job.pauses.index.title') %></h1>
</div>

<% if GoodJob.configuration.enable_pauses %>
<div class="card mb-4">
<div class="card-body">
<%= form_tag({ action: :create }, method: :post, class: 'row g-3 align-items-end') do %>
<div class="col-3">
<%= label_tag :type, t('good_job.pauses.index.type'), class: 'form-label' %>
<%= select_tag :type,
options_for_select([
[t('good_job.pauses.index.queue'), 'queue'],
[t('good_job.pauses.index.job_class'), 'job_class'],
[t('good_job.pauses.index.label'), 'label'],
]),
class: 'form-select',
required: true %>
</div>
<div class="col-6">
<%= label_tag :value, t('good_job.pauses.index.value'), class: 'form-label' %>
<%= text_field_tag :value, nil, class: 'form-control', autocomplete: "off", required: true %>
</div>
<div class="col-3">
<%= button_tag type: "submit", class: 'btn btn-primary' do %>
<%= render_icon "pause" %>
<%= t('good_job.pauses.index.pause', value: '') %>
<% end %>
</div>
<% end %>
</div>
</div>

<%= render 'group', paused: @paused, type: :queue %>
<%= render 'group', paused: @paused, type: :job_class %>
<%= render 'group', paused: @paused, type: :label %>
<% else %>
<div class="alert alert-warning" role="alert">
<p><%= t('good_job.pauses.index.disabled') %>:</p>
<pre class="mb-0">config.good_job.enable_pauses = true</pre>
</div>
<% end %>
Loading

0 comments on commit 19234ee

Please sign in to comment.