Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to pause jobs by queue, job class, or label (experimental) #1575

Merged
merged 8 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ For more of the story of GoodJob, read the [introductory blog post](https://isla
- [Write tests](#write-tests)
- [PgBouncer compatibility](#pgbouncer-compatibility)
- [CLI HTTP health check probes](#cli-http-health-check-probes)
- [Pausing Jobs](#pausing-jobs)
- [Doing your best job with GoodJob](#doing-your-best-job-with-goodjob)
- [Sizing jobs: mice and elephants](#sizing-jobs-mice-and-elephants)
- [Isolating by total latency](#isolating-by-total-latency)
Expand Down Expand Up @@ -329,6 +330,8 @@ Available configuration options are:
config.good_job.probe_handler = 'webrick'
```

- `enable_pauses` (boolean) whether job processing can be paused. Defaults to `false`. You can also set this with the environment variable `GOOD_JOB_ENABLE_PAUSES`.

By default, GoodJob configures the following execution modes per environment:

```ruby
Expand Down Expand Up @@ -1559,9 +1562,35 @@ gem 'webrick'

If WEBrick is configured to be used, but the dependency is not found, GoodJob will log a warning and fallback to the default probe server.

### Pausing Jobs

GoodJob allows for pausing jobs by queue or job class. This feature is currently opt-in because the performance impact of loading and filtering these attributes is not yet known. To enable this feature, add the following to your configuration:

> ```ruby
> config.good_job.enable_pauses = true
> ```

Pausing can be done via the Dashboard's Performance page, or in Ruby

```ruby
# To pause:
GoodJob.pause(queue: "default")
GoodJob.pause(job_class: "MyJob")

# To check status
GoodJob.paused # => { queues: ["default"], job_classes: ["MyJob"] }
GoodJob.paused?(queue: "default") # => true
GoodJob.paused?(job_class: "MyJob") # => true
GoodJob.paused? # => true

# To unpause
GoodJob.unpause(queue: "default")
GoodJob.unpause(job_class: "MyJob")
````

## Doing your best job with GoodJob

_This section explains how to use GoodJob the most efficiently and performantly, according to its maintainers. GoodJob is very flexible and you dont necessarily have to use it this way, but the concepts explained here are part of GoodJobs design intent._
_This section explains how to use GoodJob the most efficiently and performantly, according to its maintainers. GoodJob is very flexible and you don't necessarily have to use it this way, but the concepts explained here are part of GoodJob's design intent._

Background jobs are hard. There are two extremes:

Expand Down
2 changes: 2 additions & 0 deletions app/controllers/good_job/metrics_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ def primary_nav
jobs_count = GoodJob::Job.count
batches_count = GoodJob::BatchRecord.all.size
cron_entries_count = GoodJob::CronEntry.all.size
pauses_count = GoodJob::Setting.paused.values.sum(&:count)
processes_count = GoodJob::Process.active.count
discarded_count = GoodJob::Job.discarded.count

render json: {
jobs_count: helpers.number_to_human(jobs_count),
batches_count: helpers.number_to_human(batches_count),
cron_entries_count: helpers.number_to_human(cron_entries_count),
pauses_count: helpers.number_to_human(pauses_count),
processes_count: helpers.number_to_human(processes_count),
discarded_count: helpers.number_to_human(discarded_count),
}
Expand Down
34 changes: 34 additions & 0 deletions app/controllers/good_job/pauses_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# frozen_string_literal: true

module GoodJob
class PausesController < ApplicationController
before_action :validate_type, only: [:create, :destroy]
def index
@paused = GoodJob::Setting.paused
end

def create
pause_type = params[:type].to_sym
pause_value = params[:value].to_s

GoodJob::Setting.pause(pause_type => pause_value)
redirect_to({ action: :index }, notice: "Successfully paused #{params[:type]} '#{params[:value]}'")
end

def destroy
pause_type = params[:type].to_sym
pause_value = params[:value].to_s

GoodJob::Setting.unpause(pause_type => pause_value)
redirect_to({ action: :index }, notice: "Successfully unpaused #{params[:type]} '#{params[:value]}'")
end

private

def validate_type
return if params[:type].in?(%w[queue job_class label]) && params[:value].to_s.present?

raise ActionController::BadRequest, "Invalid type"
end
end
end
26 changes: 15 additions & 11 deletions app/controllers/good_job/performance_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@
module GoodJob
class PerformanceController < ApplicationController
def index
@performances = GoodJob::Execution
.where.not(job_class: nil)
.group(:job_class)
.select("
job_class,
COUNT(*) AS executions_count,
AVG(duration) AS avg_duration,
MIN(duration) AS min_duration,
MAX(duration) AS max_duration
")
.order("job_class")
@performances = GoodJob::Execution.group(:job_class).select("
job_class,
COUNT(*) AS executions_count,
AVG(duration) AS avg_duration,
MIN(duration) AS min_duration,
MAX(duration) AS max_duration
").order(:job_class)

@queue_performances = GoodJob::Execution.group(:queue_name).select("
queue_name,
COUNT(*) AS executions_count,
AVG(duration) AS avg_duration,
MIN(duration) AS min_duration,
MAX(duration) AS max_duration
").order(:queue_name)
end

def show
Expand Down
29 changes: 28 additions & 1 deletion app/models/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,33 @@ class Job < BaseRecord
# @return [ActiveRecord::Relation]
scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))).or(where(scheduled_at: nil)) }

# Exclude jobs that are paused via queue_name or job_class.
# Only applies when enable_pauses configuration is true.
# @!method exclude_paused
# @!scope class
# @return [ActiveRecord::Relation]
scope :exclude_paused, lambda {
return all unless GoodJob.configuration.enable_pauses

paused_query = GoodJob::Setting.where(key: GoodJob::Setting::PAUSES)
paused_queues_query = paused_query.select("jsonb_array_elements_text(value->'queues')")
paused_job_classes_query = paused_query.select("jsonb_array_elements_text(value->'job_classes')")
paused_labels_query = paused_query.select("jsonb_array_elements_text(value->'labels')")

where.not(queue_name: paused_queues_query)
.where.not(job_class: paused_job_classes_query)
.where(
Arel::Nodes::Not.new(
Arel::Nodes::NamedFunction.new(
"COALESCE", [
Arel::Nodes::InfixOperation.new('&&', arel_table['labels'], Arel::Nodes::NamedFunction.new('ARRAY', [paused_labels_query.arel])),
Arel::Nodes::SqlLiteral.new('FALSE'),
]
)
)
)
}

# Order jobs by priority (highest priority first).
# @!method priority_ordered
# @!scope class
Expand Down Expand Up @@ -290,7 +317,7 @@ def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_l
job = nil
result = nil

unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |jobs|
unfinished.dequeueing_ordered(parsed_queues).only_scheduled.exclude_paused.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |jobs|
job = jobs.first

if job&.executable?
Expand Down
78 changes: 78 additions & 0 deletions app/models/good_job/setting.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module GoodJob
class Setting < BaseRecord
CRON_KEYS_ENABLED = "cron_keys_enabled"
CRON_KEYS_DISABLED = "cron_keys_disabled"
PAUSES = "pauses"

self.table_name = 'good_job_settings'
self.implicit_order_column = 'created_at'
Expand Down Expand Up @@ -47,5 +48,82 @@ def self.cron_key_disable(key)
disabled_setting.value << key unless disabled_setting.value.include?(key_string)
disabled_setting.save!
end

def self.pause(queue: nil, job_class: nil, label: nil)
raise ArgumentError, "Must provide exactly one of queue, job_class, or label" unless [queue, job_class, label].count(&:present?) == 1

setting = find_or_initialize_by(key: PAUSES) do |record|
record.value = { "queues" => [], "job_classes" => [], "labels" => [] }
end

if queue
setting.value["queues"] ||= []
setting.value["queues"] << queue.to_s unless setting.value["queues"].include?(queue.to_s)
elsif job_class
setting.value["job_classes"] ||= []
setting.value["job_classes"] << job_class.to_s unless setting.value["job_classes"].include?(job_class.to_s)
else
setting.value["labels"] ||= []
setting.value["labels"] << label.to_s unless setting.value["labels"].include?(label.to_s)
end
setting.save!
end

def self.unpause(queue: nil, job_class: nil, label: nil)
raise ArgumentError, "Must provide exactly one of queue, job_class, or label" unless [queue, job_class, label].count(&:present?) == 1

setting = find_by(key: PAUSES)
return unless setting

if queue
return unless setting.value["queues"]&.include?(queue.to_s)

setting.value["queues"].delete(queue.to_s)
elsif job_class
return unless setting.value["job_classes"]&.include?(job_class.to_s)

setting.value["job_classes"].delete(job_class.to_s)
else
return unless setting.value["labels"]&.include?(label.to_s)

setting.value["labels"].delete(label.to_s)
end
setting.save!
end

def self.paused?(queue: nil, job_class: nil, label: nil)
raise ArgumentError, "Must provide at most one of queue, job_class, or label" if [queue, job_class, label].count(&:present?) > 1

if queue
queue.in? paused(:queues)
elsif job_class
job_class.in? paused(:job_classes)
elsif label
label.in? paused(:labels)
else
paused.values.any?(&:any?)
end
end

def self.paused(type = nil)
setting = find_by(key: PAUSES)
pauses = setting&.value&.deep_dup || { "queues" => [], "job_classes" => [], "labels" => [] }
pauses = pauses.with_indifferent_access

case type
when :queues
pauses["queues"]
when :job_classes
pauses["job_classes"]
when :labels
pauses["labels"]
else
{
queues: pauses["queues"] || [],
job_classes: pauses["job_classes"] || [],
labels: pauses["labels"] || [],
}
end
end
end
end
28 changes: 28 additions & 0 deletions app/views/good_job/pauses/_group.html.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<% group = type.to_s.pluralize.to_sym %>
<% items = paused.fetch(group) { [] } %>
<% if items&.any? %>
<div class="my-3 card">
<div class="list-group list-group-flush text-nowrap" role="table">
<header class="list-group-item bg-body-tertiary">
<div class="row small text-muted text-uppercase align-items-center">
<div class="col-12"><%= t("good_job.pauses.index.#{type}") %></div>
</div>
</header>

<% items.each do |value| %>
<li class="list-group-item d-flex justify-content-between align-items-center">
<%= value %>
<%= button_to(
{ action: :destroy, type: type, value: value },
method: :delete,
class: 'btn btn-sm btn-outline-primary',
data: { confirm: t('good_job.pauses.index.confirm_unpause', value: value) }
) do %>
<%= render_icon "play" %>
<%= t("good_job.pauses.index.unpause") %>
<% end %>
</li>
<% end %>
</div>
</div>
<% end %>
8 changes: 8 additions & 0 deletions app/views/good_job/pauses/_pause.html.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<li class="list-group-item d-flex justify-content-between align-items-center">
<%= value %>
<%= button_to t('good_job.pauses.index.unpause'),
{ action: :destroy, type: type, value: value },
method: :delete,
class: 'btn btn-sm btn-outline-primary',
data: { confirm: t('good_job.pauses.index.confirm_unpause', value: value) } %>
</li>
46 changes: 46 additions & 0 deletions app/views/good_job/pauses/index.html.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<%= content_for :page_title do %>
<%= t('good_job.pauses.index.title') %>
<% end %>

<div class="border-bottom">
<h1 class="h2 pt-3 pb-2"><%= t('good_job.pauses.index.title') %></h1>
</div>

<% if GoodJob.configuration.enable_pauses %>
<div class="card mb-4">
<div class="card-body">
<%= form_tag({ action: :create }, method: :post, class: 'row g-3 align-items-end') do %>
<div class="col-3">
<%= label_tag :type, t('good_job.pauses.index.type'), class: 'form-label' %>
<%= select_tag :type,
options_for_select([
[t('good_job.pauses.index.queue'), 'queue'],
[t('good_job.pauses.index.job_class'), 'job_class'],
[t('good_job.pauses.index.label'), 'label'],
]),
class: 'form-select',
required: true %>
</div>
<div class="col-6">
<%= label_tag :value, t('good_job.pauses.index.value'), class: 'form-label' %>
<%= text_field_tag :value, nil, class: 'form-control', autocomplete: "off", required: true %>
</div>
<div class="col-3">
<%= button_tag type: "submit", class: 'btn btn-primary' do %>
<%= render_icon "pause" %>
<%= t('good_job.pauses.index.pause', value: '') %>
<% end %>
</div>
<% end %>
</div>
</div>

<%= render 'group', paused: @paused, type: :queue %>
<%= render 'group', paused: @paused, type: :job_class %>
<%= render 'group', paused: @paused, type: :label %>
<% else %>
<div class="alert alert-warning" role="alert">
<p><%= t('good_job.pauses.index.disabled') %>:</p>
<pre class="mb-0">config.good_job.enable_pauses = true</pre>
</div>
<% end %>
Loading