Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ActiveSupport::Notifications and LogSubscriber for customizing log output #338

Merged
merged 1 commit into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support
- [240](https://github.com/Shopify/job-iteration/pull/240) - Allow setting inheritable per-job `job_iteration_max_job_runtime`
- [289](https://github.com/Shopify/job-iteration/pull/289) - Fix uninitialized constant error when raising `ConditionNotSupportedError` from `ActiveRecordBatchEnumerator`
- [338](https://github.com/Shopify/job-iteration/pull/338) - All logs are now `ActiveSupport::Notifications` events and logged using `ActiveSupport::LogSubscriber` to allow customization. Events now always include the `cursor_position` tag.
- [341](https://github.com/Shopify/job-iteration/pull/341) - Add `JobIteration.default_retry_backoff`, which sets a default delay when jobs are re-enqueued after being interrupted. Defaults to `nil`, meaning no delay, which matches the current behaviour.

## v1.3.6 (Mar 9, 2022)
Expand Down
47 changes: 15 additions & 32 deletions guides/best-practices.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,24 @@ end

## Instrumentation

Iteration leverages `ActiveSupport::Notifications` which lets you instrument all kind of events:
Iteration leverages [`ActiveSupport::Notifications`](https://guides.rubyonrails.org/active_support_instrumentation.html)
to notify you what it's doing. You can subscribe to the following events (listed in order of job lifecycle):

- `build_enumerator.iteration`
- `throttled.iteration` (when using ThrottleEnumerator)
- `nil_enumerator.iteration`
- `resumed.iteration`
- `each_iteration.iteration`
- `not_found.iteration`
- `interrupted.iteration`
- `completed.iteration`

All events have tags including the job class name and cursor position, some add the amount of times interrupted and/or
total time the job spent running across interruptions.

```ruby
# config/initializers/instrumentation.rb
ActiveSupport::Notifications.subscribe('build_enumerator.iteration') do |_, started, finished, _, tags|
StatsD.distribution(
'iteration.build_enumerator',
(finished - started),
tags: { job_class: tags[:job_class]&.underscore }
)
end

ActiveSupport::Notifications.subscribe('each_iteration.iteration') do |_, started, finished, _, tags|
ActiveSupport::Notifications.monotonic_subscribe("each_iteration.iteration") do |_, started, finished, _, tags|
elapsed = finished - started
StatsD.distribution(
"iteration.each_iteration",
Expand All @@ -69,28 +74,6 @@ ActiveSupport::Notifications.subscribe('each_iteration.iteration') do |_, starte
"each_iteration runtime exceeded limit of #{BackgroundQueue.max_iteration_runtime}s"
end
end

ActiveSupport::Notifications.subscribe('resumed.iteration') do |_, _, _, _, tags|
StatsD.increment(
"iteration.resumed",
tags: { job_class: tags[:job_class]&.underscore }
)
end

ActiveSupport::Notifications.subscribe('interrupted.iteration') do |_, _, _, _, tags|
StatsD.increment(
"iteration.interrupted",
tags: { job_class: tags[:job_class]&.underscore }
)
end

# If you're using ThrottleEnumerator
ActiveSupport::Notifications.subscribe('throttled.iteration') do |_, _, _, _, tags|
StatsD.increment(
"iteration.throttled",
tags: { job_class: tags[:job_class]&.underscore }
)
end
```

## Max iteration time
Expand Down
6 changes: 6 additions & 0 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# frozen_string_literal: true

require "active_job"
require_relative "./job-iteration/version"
require_relative "./job-iteration/enumerator_builder"
require_relative "./job-iteration/iteration"
require_relative "./job-iteration/log_subscriber"

module JobIteration
IntegrationLoadError = Class.new(StandardError)
Expand All @@ -11,6 +13,10 @@ module JobIteration

extend self

attr_accessor :logger

self.logger = ActiveJob::Base.logger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does change something. Previously we ended up logging to Rails.logger, because when ActiveJob initializes itself in the context of a Rails application, it sets its logger to the Rails one:
https://github.com/rails/rails/blob/v7.0.7.2/activejob/lib/active_job/railtie.rb#L13-L15

Whereas now it will use the default Active Job logger which logs to stdout: https://github.com/rails/rails/blob/v7.0.7.2/activejob/lib/active_job/logging.rb#L11

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL. Thanks for also providing a fix - linking #427 here in case others run into this problem.


# Use this to _always_ interrupt the job after it's been running for more than N seconds.
# @example
#
Expand Down
40 changes: 18 additions & 22 deletions lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,12 @@ def interruptible_perform(*arguments)
self.start_time = Time.now.utc

enumerator = nil
ActiveSupport::Notifications.instrument("build_enumerator.iteration", iteration_instrumentation_tags) do
ActiveSupport::Notifications.instrument("build_enumerator.iteration", instrumentation_tags) do
enumerator = build_enumerator(*arguments, cursor: cursor_position)
end

unless enumerator
logger.info("[JobIteration::Iteration] `build_enumerator` returned nil. " \
"Skipping the job.")
ActiveSupport::Notifications.instrument("nil_enumerator.iteration", instrumentation_tags)
return
end

Expand All @@ -157,7 +156,10 @@ def interruptible_perform(*arguments)
if executions == 1 && times_interrupted == 0
run_callbacks(:start)
else
ActiveSupport::Notifications.instrument("resumed.iteration", iteration_instrumentation_tags)
ActiveSupport::Notifications.instrument(
"resumed.iteration",
instrumentation_tags.merge(times_interrupted: times_interrupted, total_time: total_time),
)
end

completed = catch(:abort) do
Expand All @@ -171,7 +173,10 @@ def interruptible_perform(*arguments)
reenqueue_iteration_job
elsif completed
run_callbacks(:complete)
output_interrupt_summary
ActiveSupport::Notifications.instrument(
"completed.iteration",
instrumentation_tags.merge(times_interrupted: times_interrupted, total_time: total_time),
)
end
end

Expand All @@ -184,10 +189,11 @@ def iterate_with_enumerator(enumerator, arguments)
# Deferred until 2.0.0
# assert_valid_cursor!(index)

record_unit_of_work do
ActiveSupport::Notifications.instrument("each_iteration.iteration", {}) do |tags|
found_record = true
each_iteration(object_from_enumerator, *arguments)
self.cursor_position = index
tags.replace(instrumentation_tags)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want the updated cursor_position here? The notification describes the iteration that just happened, so it makes most sense to me to tag it with the cursor position at the start of that iteration. If we do want to include both, we can give the one after a different name?

Copy link
Contributor Author

@bdewater bdewater Mar 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so, yes. I would like to know how long the thing (object_from_enumerator) being iterated on here takes. Wrote a quick test:

class SlowJob < ActiveJob::Base
  include JobIteration::Iteration

  def build_enumerator(cursor:)
    enumerator_builder.active_record_on_records(
      Product.where("id <= 5"),
      cursor: cursor,
    )
  end

  def each_iteration(product)
    sleep 0.1 * product.id
  end
end

def test_notification_cursor_position
  collector = []
  ActiveSupport::Notifications.monotonic_subscribe("each_iteration.iteration") do |_, started, finished, _, tags|
    collector << { cursor_position: tags[:cursor_position], elapsed: finished - started }
  end

  push(SlowJob)
  work_one_job
  pp collector
end

output:

[{:cursor_position=>1, :elapsed=>0.1011069999949541},
 {:cursor_position=>2, :elapsed=>0.20117599997320212},
 {:cursor_position=>3, :elapsed=>0.30098699999507517},
 {:cursor_position=>4, :elapsed=>0.40121400001225993},
 {:cursor_position=>5, :elapsed=>0.5012379999971017}]

This looks useful to me, I know exactly the ID of the thing that is slow.

If I'm understanding your suggestion correctly, it would be this implementation:

ActiveSupport::Notifications.instrument("each_iteration.iteration", instrumentation_tags) do
  found_record = true
  each_iteration(object_from_enumerator, *arguments)
  self.cursor_position = index
end

which outputs:

[{:cursor_position=>nil, :elapsed=>0.10110400000121444},
 {:cursor_position=>1, :elapsed=>0.2011929999862332},
 {:cursor_position=>2, :elapsed=>0.3052169999864418},
 {:cursor_position=>3, :elapsed=>0.4006680000165943},
 {:cursor_position=>4, :elapsed=>0.5009349999891128}]

Which doesn't seem as useful. The mental overhead for this example is still manageable to answer the question what thing was slow but it quickly becomes impossible to do from the top of your head, eg when using ULIDs as database IDs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that makes sense. I think my confusion comes from the fact that when you enqueue the job with cursor_position: 1, it doesn't process id 1, but only 2 and up, so the cursor position and item processed don't quite line up. But this is indeed the cursor_position of the thing we just processed, so it makes sense to use this as the tag 👍

end

next unless job_should_exit?
Expand All @@ -197,23 +203,18 @@ def iterate_with_enumerator(enumerator, arguments)
return false
end

logger.info(
"[JobIteration::Iteration] Enumerator found nothing to iterate! " \
"times_interrupted=#{times_interrupted} cursor_position=#{cursor_position}",
ActiveSupport::Notifications.instrument(
"not_found.iteration",
instrumentation_tags.merge(times_interrupted: times_interrupted),
) unless found_record

true
ensure
adjust_total_time
end

def record_unit_of_work(&block)
ActiveSupport::Notifications.instrument("each_iteration.iteration", iteration_instrumentation_tags, &block)
end

def reenqueue_iteration_job
ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags)
logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}")
ActiveSupport::Notifications.instrument("interrupted.iteration", instrumentation_tags)

self.times_interrupted += 1

Expand Down Expand Up @@ -283,13 +284,8 @@ def method_parameters(method_name)
method.parameters
end

def iteration_instrumentation_tags
{ job_class: self.class.name }
end

def output_interrupt_summary
message = "[JobIteration::Iteration] Completed iterating. times_interrupted=%d total_time=%.3f"
logger.info(Kernel.format(message, times_interrupted, total_time))
def instrumentation_tags
{ job_class: self.class.name, cursor_position: cursor_position }
end

def job_should_exit?
Expand Down
38 changes: 38 additions & 0 deletions lib/job-iteration/log_subscriber.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

module JobIteration
class LogSubscriber < ActiveSupport::LogSubscriber
def logger
JobIteration.logger
end

def nil_enumerator(event)
info do
"[JobIteration::Iteration] `build_enumerator` returned nil. Skipping the job."
end
bdewater marked this conversation as resolved.
Show resolved Hide resolved
end

def not_found(event)
info do
"[JobIteration::Iteration] Enumerator found nothing to iterate! " \
"times_interrupted=#{event.payload[:times_interrupted]} cursor_position=#{event.payload[:cursor_position]}"
end
end

def interrupted(event)
info do
"[JobIteration::Iteration] Interrupting and re-enqueueing the job " \
"cursor_position=#{event.payload[:cursor_position]}"
end
end

def completed(event)
info do
message = "[JobIteration::Iteration] Completed iterating. times_interrupted=%d total_time=%.3f"
Kernel.format(message, event.payload[:times_interrupted], event.payload[:total_time])
end
end
end
end

JobIteration::LogSubscriber.attach_to(:iteration)
8 changes: 4 additions & 4 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,22 @@ class Product < ActiveRecord::Base

module LoggingHelpers
def assert_logged(message)
old_logger = ActiveJob::Base.logger
old_logger = JobIteration.logger
log = StringIO.new
ActiveJob::Base.logger = Logger.new(log)
JobIteration.logger = Logger.new(log)

begin
yield

log.rewind
assert_match(message, log.read)
ensure
ActiveJob::Base.logger = old_logger
JobIteration.logger = old_logger
end
end
end

ActiveJob::Base.logger = Logger.new(IO::NULL)
JobIteration.logger = Logger.new(IO::NULL)

module ActiveSupport
class TestCase
Expand Down