Skip to content

Commit

Permalink
Use ActiveSupport::Notifications and LogSubscriber for logs
Browse files Browse the repository at this point in the history
This is the same mechanism Rails uses to output logs from events. This allows for more instrumentation and easier customization of log output. The default log output is completely unchanged.

Since a bunch of events need cursor_position to keep the existing logging, I made it a default tag for all events.

allow to setup logger for JobIteration
  • Loading branch information
bdewater committed Mar 22, 2023
1 parent dd8e19e commit 8da4126
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support
- [240](https://github.com/Shopify/job-iteration/pull/240) - Allow setting inheritable per-job `job_iteration_max_job_runtime`
- [289](https://github.com/Shopify/job-iteration/pull/289) - Fix uninitialized constant error when raising `ConditionNotSupportedError` from `ActiveRecordBatchEnumerator`
- [338](https://github.com/Shopify/job-iteration/pull/338) - All logs are now `ActiveSupport::Notifications` events and logged using `ActiveSupport::LogSubscriber` to allow customization. Events now always include the `cursor_position` tag.
- [341](https://github.com/Shopify/job-iteration/pull/341) - Add `JobIteration.default_retry_backoff`, which sets a default delay when jobs are re-enqueued after being interrupted. Defaults to `nil`, meaning no delay, which matches the current behaviour.

## v1.3.6 (Mar 9, 2022)
Expand Down
47 changes: 15 additions & 32 deletions guides/best-practices.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,24 @@ end

## Instrumentation

Iteration leverages `ActiveSupport::Notifications` which lets you instrument all kind of events:
Iteration leverages [`ActiveSupport::Notifications`](https://guides.rubyonrails.org/active_support_instrumentation.html)
to notify you what it's doing. You can subscribe to the following events (listed in order of job lifecycle):

- `build_enumerator.iteration`
- `throttled.iteration` (when using ThrottleEnumerator)
- `nil_enumerator.iteration`
- `resumed.iteration`
- `each_iteration.iteration`
- `not_found.iteration`
- `interrupted.iteration`
- `completed.iteration`

All events have tags including the job class name and cursor position, some add the amount of times interrupted and/or
total time the job spent running across interruptions.

```ruby
# config/initializers/instrumentation.rb
ActiveSupport::Notifications.subscribe('build_enumerator.iteration') do |_, started, finished, _, tags|
StatsD.distribution(
'iteration.build_enumerator',
(finished - started),
tags: { job_class: tags[:job_class]&.underscore }
)
end

ActiveSupport::Notifications.subscribe('each_iteration.iteration') do |_, started, finished, _, tags|
ActiveSupport::Notifications.monotonic_subscribe("each_iteration.iteration") do |_, started, finished, _, tags|
elapsed = finished - started
StatsD.distribution(
"iteration.each_iteration",
Expand All @@ -69,28 +74,6 @@ ActiveSupport::Notifications.subscribe('each_iteration.iteration') do |_, starte
"each_iteration runtime exceeded limit of #{BackgroundQueue.max_iteration_runtime}s"
end
end

ActiveSupport::Notifications.subscribe('resumed.iteration') do |_, _, _, _, tags|
StatsD.increment(
"iteration.resumed",
tags: { job_class: tags[:job_class]&.underscore }
)
end

ActiveSupport::Notifications.subscribe('interrupted.iteration') do |_, _, _, _, tags|
StatsD.increment(
"iteration.interrupted",
tags: { job_class: tags[:job_class]&.underscore }
)
end

# If you're using ThrottleEnumerator
ActiveSupport::Notifications.subscribe('throttled.iteration') do |_, _, _, _, tags|
StatsD.increment(
"iteration.throttled",
tags: { job_class: tags[:job_class]&.underscore }
)
end
```

## Max iteration time
Expand Down
6 changes: 6 additions & 0 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# frozen_string_literal: true

require "active_job"
require_relative "./job-iteration/version"
require_relative "./job-iteration/enumerator_builder"
require_relative "./job-iteration/iteration"
require_relative "./job-iteration/log_subscriber"

module JobIteration
IntegrationLoadError = Class.new(StandardError)
Expand All @@ -11,6 +13,10 @@ module JobIteration

extend self

attr_accessor :logger

self.logger = ActiveJob::Base.logger

# Use this to _always_ interrupt the job after it's been running for more than N seconds.
# @example
#
Expand Down
40 changes: 18 additions & 22 deletions lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,12 @@ def interruptible_perform(*arguments)
self.start_time = Time.now.utc

enumerator = nil
ActiveSupport::Notifications.instrument("build_enumerator.iteration", iteration_instrumentation_tags) do
ActiveSupport::Notifications.instrument("build_enumerator.iteration", instrumentation_tags) do
enumerator = build_enumerator(*arguments, cursor: cursor_position)
end

unless enumerator
logger.info("[JobIteration::Iteration] `build_enumerator` returned nil. " \
"Skipping the job.")
ActiveSupport::Notifications.instrument("nil_enumerator.iteration", instrumentation_tags)
return
end

Expand All @@ -157,7 +156,10 @@ def interruptible_perform(*arguments)
if executions == 1 && times_interrupted == 0
run_callbacks(:start)
else
ActiveSupport::Notifications.instrument("resumed.iteration", iteration_instrumentation_tags)
ActiveSupport::Notifications.instrument(
"resumed.iteration",
instrumentation_tags.merge(times_interrupted: times_interrupted, total_time: total_time),
)
end

completed = catch(:abort) do
Expand All @@ -171,7 +173,10 @@ def interruptible_perform(*arguments)
reenqueue_iteration_job
elsif completed
run_callbacks(:complete)
output_interrupt_summary
ActiveSupport::Notifications.instrument(
"completed.iteration",
instrumentation_tags.merge(times_interrupted: times_interrupted, total_time: total_time),
)
end
end

Expand All @@ -184,10 +189,11 @@ def iterate_with_enumerator(enumerator, arguments)
# Deferred until 2.0.0
# assert_valid_cursor!(index)

record_unit_of_work do
ActiveSupport::Notifications.instrument("each_iteration.iteration", {}) do |tags|
found_record = true
each_iteration(object_from_enumerator, *arguments)
self.cursor_position = index
tags.replace(instrumentation_tags)
end

next unless job_should_exit?
Expand All @@ -197,23 +203,18 @@ def iterate_with_enumerator(enumerator, arguments)
return false
end

logger.info(
"[JobIteration::Iteration] Enumerator found nothing to iterate! " \
"times_interrupted=#{times_interrupted} cursor_position=#{cursor_position}",
ActiveSupport::Notifications.instrument(
"not_found.iteration",
instrumentation_tags.merge(times_interrupted: times_interrupted),
) unless found_record

true
ensure
adjust_total_time
end

def record_unit_of_work(&block)
ActiveSupport::Notifications.instrument("each_iteration.iteration", iteration_instrumentation_tags, &block)
end

def reenqueue_iteration_job
ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags)
logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}")
ActiveSupport::Notifications.instrument("interrupted.iteration", instrumentation_tags)

self.times_interrupted += 1

Expand Down Expand Up @@ -283,13 +284,8 @@ def method_parameters(method_name)
method.parameters
end

def iteration_instrumentation_tags
{ job_class: self.class.name }
end

def output_interrupt_summary
message = "[JobIteration::Iteration] Completed iterating. times_interrupted=%d total_time=%.3f"
logger.info(Kernel.format(message, times_interrupted, total_time))
def instrumentation_tags
{ job_class: self.class.name, cursor_position: cursor_position }
end

def job_should_exit?
Expand Down
38 changes: 38 additions & 0 deletions lib/job-iteration/log_subscriber.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

module JobIteration
class LogSubscriber < ActiveSupport::LogSubscriber
def logger
JobIteration.logger
end

def nil_enumerator(event)
info do
"[JobIteration::Iteration] `build_enumerator` returned nil. Skipping the job."
end
end

def not_found(event)
info do
"[JobIteration::Iteration] Enumerator found nothing to iterate! " \
"times_interrupted=#{event.payload[:times_interrupted]} cursor_position=#{event.payload[:cursor_position]}"
end
end

def interrupted(event)
info do
"[JobIteration::Iteration] Interrupting and re-enqueueing the job " \
"cursor_position=#{event.payload[:cursor_position]}"
end
end

def completed(event)
info do
message = "[JobIteration::Iteration] Completed iterating. times_interrupted=%d total_time=%.3f"
Kernel.format(message, event.payload[:times_interrupted], event.payload[:total_time])
end
end
end
end

JobIteration::LogSubscriber.attach_to(:iteration)
8 changes: 4 additions & 4 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,22 @@ class Product < ActiveRecord::Base

module LoggingHelpers
def assert_logged(message)
old_logger = ActiveJob::Base.logger
old_logger = JobIteration.logger
log = StringIO.new
ActiveJob::Base.logger = Logger.new(log)
JobIteration.logger = Logger.new(log)

begin
yield

log.rewind
assert_match(message, log.read)
ensure
ActiveJob::Base.logger = old_logger
JobIteration.logger = old_logger
end
end
end

ActiveJob::Base.logger = Logger.new(IO::NULL)
JobIteration.logger = Logger.new(IO::NULL)

module ActiveSupport
class TestCase
Expand Down

0 comments on commit 8da4126

Please sign in to comment.