diff --git a/CHANGELOG.md b/CHANGELOG.md index 549a0b27..ce474a92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support - [240](https://github.com/Shopify/job-iteration/pull/240) - Allow setting inheritable per-job `job_iteration_max_job_runtime` - [289](https://github.com/Shopify/job-iteration/pull/289) - Fix uninitialized constant error when raising `ConditionNotSupportedError` from `ActiveRecordBatchEnumerator` +- [338](https://github.com/Shopify/job-iteration/pull/338) - All logs are now `ActiveSupport::Notifications` events and logged using `ActiveSupport::LogSubscriber` to allow customization. Events now always include the `cursor_position` tag. - [341](https://github.com/Shopify/job-iteration/pull/341) - Add `JobIteration.default_retry_backoff`, which sets a default delay when jobs are re-enqueued after being interrupted. Defaults to `nil`, meaning no delay, which matches the current behaviour. ## v1.3.6 (Mar 9, 2022) diff --git a/guides/best-practices.md b/guides/best-practices.md index bb972c05..e7a2f146 100644 --- a/guides/best-practices.md +++ b/guides/best-practices.md @@ -44,19 +44,24 @@ end ## Instrumentation -Iteration leverages `ActiveSupport::Notifications` which lets you instrument all kind of events: +Iteration leverages [`ActiveSupport::Notifications`](https://guides.rubyonrails.org/active_support_instrumentation.html) +to notify you what it's doing. You can subscribe to the following events (listed in order of job lifecycle): + +- `build_enumerator.iteration` +- `throttled.iteration` (when using ThrottleEnumerator) +- `nil_enumerator.iteration` +- `resumed.iteration` +- `each_iteration.iteration` +- `not_found.iteration` +- `interrupted.iteration` +- `completed.iteration` + +All events have tags including the job class name and cursor position, some add the amount of times interrupted and/or +total time the job spent running across interruptions. ```ruby # config/initializers/instrumentation.rb -ActiveSupport::Notifications.subscribe('build_enumerator.iteration') do |_, started, finished, _, tags| - StatsD.distribution( - 'iteration.build_enumerator', - (finished - started), - tags: { job_class: tags[:job_class]&.underscore } - ) -end - -ActiveSupport::Notifications.subscribe('each_iteration.iteration') do |_, started, finished, _, tags| +ActiveSupport::Notifications.monotonic_subscribe("each_iteration.iteration") do |_, started, finished, _, tags| elapsed = finished - started StatsD.distribution( "iteration.each_iteration", @@ -69,28 +74,6 @@ ActiveSupport::Notifications.subscribe('each_iteration.iteration') do |_, starte "each_iteration runtime exceeded limit of #{BackgroundQueue.max_iteration_runtime}s" end end - -ActiveSupport::Notifications.subscribe('resumed.iteration') do |_, _, _, _, tags| - StatsD.increment( - "iteration.resumed", - tags: { job_class: tags[:job_class]&.underscore } - ) -end - -ActiveSupport::Notifications.subscribe('interrupted.iteration') do |_, _, _, _, tags| - StatsD.increment( - "iteration.interrupted", - tags: { job_class: tags[:job_class]&.underscore } - ) -end - -# If you're using ThrottleEnumerator -ActiveSupport::Notifications.subscribe('throttled.iteration') do |_, _, _, _, tags| - StatsD.increment( - "iteration.throttled", - tags: { job_class: tags[:job_class]&.underscore } - ) -end ``` ## Max iteration time diff --git a/lib/job-iteration.rb b/lib/job-iteration.rb index ec6175d4..42fd6a10 100644 --- a/lib/job-iteration.rb +++ b/lib/job-iteration.rb @@ -1,8 +1,10 @@ # frozen_string_literal: true +require "active_job" require_relative "./job-iteration/version" require_relative "./job-iteration/enumerator_builder" require_relative "./job-iteration/iteration" +require_relative "./job-iteration/log_subscriber" module JobIteration IntegrationLoadError = Class.new(StandardError) @@ -11,6 +13,10 @@ module JobIteration extend self + attr_accessor :logger + + self.logger = ActiveJob::Base.logger + # Use this to _always_ interrupt the job after it's been running for more than N seconds. # @example # diff --git a/lib/job-iteration/iteration.rb b/lib/job-iteration/iteration.rb index 5620dbf6..0881c2fc 100644 --- a/lib/job-iteration/iteration.rb +++ b/lib/job-iteration/iteration.rb @@ -142,13 +142,12 @@ def interruptible_perform(*arguments) self.start_time = Time.now.utc enumerator = nil - ActiveSupport::Notifications.instrument("build_enumerator.iteration", iteration_instrumentation_tags) do + ActiveSupport::Notifications.instrument("build_enumerator.iteration", instrumentation_tags) do enumerator = build_enumerator(*arguments, cursor: cursor_position) end unless enumerator - logger.info("[JobIteration::Iteration] `build_enumerator` returned nil. " \ - "Skipping the job.") + ActiveSupport::Notifications.instrument("nil_enumerator.iteration", instrumentation_tags) return end @@ -157,7 +156,10 @@ def interruptible_perform(*arguments) if executions == 1 && times_interrupted == 0 run_callbacks(:start) else - ActiveSupport::Notifications.instrument("resumed.iteration", iteration_instrumentation_tags) + ActiveSupport::Notifications.instrument( + "resumed.iteration", + instrumentation_tags.merge(times_interrupted: times_interrupted, total_time: total_time), + ) end completed = catch(:abort) do @@ -171,7 +173,10 @@ def interruptible_perform(*arguments) reenqueue_iteration_job elsif completed run_callbacks(:complete) - output_interrupt_summary + ActiveSupport::Notifications.instrument( + "completed.iteration", + instrumentation_tags.merge(times_interrupted: times_interrupted, total_time: total_time), + ) end end @@ -184,10 +189,11 @@ def iterate_with_enumerator(enumerator, arguments) # Deferred until 2.0.0 # assert_valid_cursor!(index) - record_unit_of_work do + ActiveSupport::Notifications.instrument("each_iteration.iteration", {}) do |tags| found_record = true each_iteration(object_from_enumerator, *arguments) self.cursor_position = index + tags.replace(instrumentation_tags) end next unless job_should_exit? @@ -197,9 +203,9 @@ def iterate_with_enumerator(enumerator, arguments) return false end - logger.info( - "[JobIteration::Iteration] Enumerator found nothing to iterate! " \ - "times_interrupted=#{times_interrupted} cursor_position=#{cursor_position}", + ActiveSupport::Notifications.instrument( + "not_found.iteration", + instrumentation_tags.merge(times_interrupted: times_interrupted), ) unless found_record true @@ -207,13 +213,8 @@ def iterate_with_enumerator(enumerator, arguments) adjust_total_time end - def record_unit_of_work(&block) - ActiveSupport::Notifications.instrument("each_iteration.iteration", iteration_instrumentation_tags, &block) - end - def reenqueue_iteration_job - ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags) - logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}") + ActiveSupport::Notifications.instrument("interrupted.iteration", instrumentation_tags) self.times_interrupted += 1 @@ -283,13 +284,8 @@ def method_parameters(method_name) method.parameters end - def iteration_instrumentation_tags - { job_class: self.class.name } - end - - def output_interrupt_summary - message = "[JobIteration::Iteration] Completed iterating. times_interrupted=%d total_time=%.3f" - logger.info(Kernel.format(message, times_interrupted, total_time)) + def instrumentation_tags + { job_class: self.class.name, cursor_position: cursor_position } end def job_should_exit? diff --git a/lib/job-iteration/log_subscriber.rb b/lib/job-iteration/log_subscriber.rb new file mode 100644 index 00000000..56c97203 --- /dev/null +++ b/lib/job-iteration/log_subscriber.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +module JobIteration + class LogSubscriber < ActiveSupport::LogSubscriber + def logger + JobIteration.logger + end + + def nil_enumerator(event) + info do + "[JobIteration::Iteration] `build_enumerator` returned nil. Skipping the job." + end + end + + def not_found(event) + info do + "[JobIteration::Iteration] Enumerator found nothing to iterate! " \ + "times_interrupted=#{event.payload[:times_interrupted]} cursor_position=#{event.payload[:cursor_position]}" + end + end + + def interrupted(event) + info do + "[JobIteration::Iteration] Interrupting and re-enqueueing the job " \ + "cursor_position=#{event.payload[:cursor_position]}" + end + end + + def completed(event) + info do + message = "[JobIteration::Iteration] Completed iterating. times_interrupted=%d total_time=%.3f" + Kernel.format(message, event.payload[:times_interrupted], event.payload[:total_time]) + end + end + end +end + +JobIteration::LogSubscriber.attach_to(:iteration) diff --git a/test/test_helper.rb b/test/test_helper.rb index 6b6d6c59..f0133c8f 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -75,9 +75,9 @@ class Product < ActiveRecord::Base module LoggingHelpers def assert_logged(message) - old_logger = ActiveJob::Base.logger + old_logger = JobIteration.logger log = StringIO.new - ActiveJob::Base.logger = Logger.new(log) + JobIteration.logger = Logger.new(log) begin yield @@ -85,12 +85,12 @@ def assert_logged(message) log.rewind assert_match(message, log.read) ensure - ActiveJob::Base.logger = old_logger + JobIteration.logger = old_logger end end end -ActiveJob::Base.logger = Logger.new(IO::NULL) +JobIteration.logger = Logger.new(IO::NULL) module ActiveSupport class TestCase