From 8da4126c1c95b8c50fd57b453b43312ccd937db7 Mon Sep 17 00:00:00 2001 From: Bart de Water <496367+bdewater@users.noreply.github.com> Date: Mon, 13 Feb 2023 22:26:56 -0500 Subject: [PATCH] Use ActiveSupport::Notifications and LogSubscriber for logs This is the same mechanism Rails uses to output logs from events. This allows for more instrumentation and easier customization of log output. The default log output is completely unchanged. Since a bunch of events need cursor_position to keep the existing logging, I made it a default tag for all events. allow to setup logger for JobIteration --- CHANGELOG.md | 1 + guides/best-practices.md | 47 +++++++++-------------------- lib/job-iteration.rb | 6 ++++ lib/job-iteration/iteration.rb | 40 +++++++++++------------- lib/job-iteration/log_subscriber.rb | 38 +++++++++++++++++++++++ test/test_helper.rb | 8 ++--- 6 files changed, 82 insertions(+), 58 deletions(-) create mode 100644 lib/job-iteration/log_subscriber.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 549a0b27..ce474a92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support - [240](https://github.com/Shopify/job-iteration/pull/240) - Allow setting inheritable per-job `job_iteration_max_job_runtime` - [289](https://github.com/Shopify/job-iteration/pull/289) - Fix uninitialized constant error when raising `ConditionNotSupportedError` from `ActiveRecordBatchEnumerator` +- [338](https://github.com/Shopify/job-iteration/pull/338) - All logs are now `ActiveSupport::Notifications` events and logged using `ActiveSupport::LogSubscriber` to allow customization. Events now always include the `cursor_position` tag. - [341](https://github.com/Shopify/job-iteration/pull/341) - Add `JobIteration.default_retry_backoff`, which sets a default delay when jobs are re-enqueued after being interrupted. Defaults to `nil`, meaning no delay, which matches the current behaviour. ## v1.3.6 (Mar 9, 2022) diff --git a/guides/best-practices.md b/guides/best-practices.md index bb972c05..e7a2f146 100644 --- a/guides/best-practices.md +++ b/guides/best-practices.md @@ -44,19 +44,24 @@ end ## Instrumentation -Iteration leverages `ActiveSupport::Notifications` which lets you instrument all kind of events: +Iteration leverages [`ActiveSupport::Notifications`](https://guides.rubyonrails.org/active_support_instrumentation.html) +to notify you what it's doing. You can subscribe to the following events (listed in order of job lifecycle): + +- `build_enumerator.iteration` +- `throttled.iteration` (when using ThrottleEnumerator) +- `nil_enumerator.iteration` +- `resumed.iteration` +- `each_iteration.iteration` +- `not_found.iteration` +- `interrupted.iteration` +- `completed.iteration` + +All events have tags including the job class name and cursor position, some add the amount of times interrupted and/or +total time the job spent running across interruptions. ```ruby # config/initializers/instrumentation.rb -ActiveSupport::Notifications.subscribe('build_enumerator.iteration') do |_, started, finished, _, tags| - StatsD.distribution( - 'iteration.build_enumerator', - (finished - started), - tags: { job_class: tags[:job_class]&.underscore } - ) -end - -ActiveSupport::Notifications.subscribe('each_iteration.iteration') do |_, started, finished, _, tags| +ActiveSupport::Notifications.monotonic_subscribe("each_iteration.iteration") do |_, started, finished, _, tags| elapsed = finished - started StatsD.distribution( "iteration.each_iteration", @@ -69,28 +74,6 @@ ActiveSupport::Notifications.subscribe('each_iteration.iteration') do |_, starte "each_iteration runtime exceeded limit of #{BackgroundQueue.max_iteration_runtime}s" end end - -ActiveSupport::Notifications.subscribe('resumed.iteration') do |_, _, _, _, tags| - StatsD.increment( - "iteration.resumed", - tags: { job_class: tags[:job_class]&.underscore } - ) -end - -ActiveSupport::Notifications.subscribe('interrupted.iteration') do |_, _, _, _, tags| - StatsD.increment( - "iteration.interrupted", - tags: { job_class: tags[:job_class]&.underscore } - ) -end - -# If you're using ThrottleEnumerator -ActiveSupport::Notifications.subscribe('throttled.iteration') do |_, _, _, _, tags| - StatsD.increment( - "iteration.throttled", - tags: { job_class: tags[:job_class]&.underscore } - ) -end ``` ## Max iteration time diff --git a/lib/job-iteration.rb b/lib/job-iteration.rb index ec6175d4..42fd6a10 100644 --- a/lib/job-iteration.rb +++ b/lib/job-iteration.rb @@ -1,8 +1,10 @@ # frozen_string_literal: true +require "active_job" require_relative "./job-iteration/version" require_relative "./job-iteration/enumerator_builder" require_relative "./job-iteration/iteration" +require_relative "./job-iteration/log_subscriber" module JobIteration IntegrationLoadError = Class.new(StandardError) @@ -11,6 +13,10 @@ module JobIteration extend self + attr_accessor :logger + + self.logger = ActiveJob::Base.logger + # Use this to _always_ interrupt the job after it's been running for more than N seconds. # @example # diff --git a/lib/job-iteration/iteration.rb b/lib/job-iteration/iteration.rb index 5620dbf6..0881c2fc 100644 --- a/lib/job-iteration/iteration.rb +++ b/lib/job-iteration/iteration.rb @@ -142,13 +142,12 @@ def interruptible_perform(*arguments) self.start_time = Time.now.utc enumerator = nil - ActiveSupport::Notifications.instrument("build_enumerator.iteration", iteration_instrumentation_tags) do + ActiveSupport::Notifications.instrument("build_enumerator.iteration", instrumentation_tags) do enumerator = build_enumerator(*arguments, cursor: cursor_position) end unless enumerator - logger.info("[JobIteration::Iteration] `build_enumerator` returned nil. " \ - "Skipping the job.") + ActiveSupport::Notifications.instrument("nil_enumerator.iteration", instrumentation_tags) return end @@ -157,7 +156,10 @@ def interruptible_perform(*arguments) if executions == 1 && times_interrupted == 0 run_callbacks(:start) else - ActiveSupport::Notifications.instrument("resumed.iteration", iteration_instrumentation_tags) + ActiveSupport::Notifications.instrument( + "resumed.iteration", + instrumentation_tags.merge(times_interrupted: times_interrupted, total_time: total_time), + ) end completed = catch(:abort) do @@ -171,7 +173,10 @@ def interruptible_perform(*arguments) reenqueue_iteration_job elsif completed run_callbacks(:complete) - output_interrupt_summary + ActiveSupport::Notifications.instrument( + "completed.iteration", + instrumentation_tags.merge(times_interrupted: times_interrupted, total_time: total_time), + ) end end @@ -184,10 +189,11 @@ def iterate_with_enumerator(enumerator, arguments) # Deferred until 2.0.0 # assert_valid_cursor!(index) - record_unit_of_work do + ActiveSupport::Notifications.instrument("each_iteration.iteration", {}) do |tags| found_record = true each_iteration(object_from_enumerator, *arguments) self.cursor_position = index + tags.replace(instrumentation_tags) end next unless job_should_exit? @@ -197,9 +203,9 @@ def iterate_with_enumerator(enumerator, arguments) return false end - logger.info( - "[JobIteration::Iteration] Enumerator found nothing to iterate! " \ - "times_interrupted=#{times_interrupted} cursor_position=#{cursor_position}", + ActiveSupport::Notifications.instrument( + "not_found.iteration", + instrumentation_tags.merge(times_interrupted: times_interrupted), ) unless found_record true @@ -207,13 +213,8 @@ def iterate_with_enumerator(enumerator, arguments) adjust_total_time end - def record_unit_of_work(&block) - ActiveSupport::Notifications.instrument("each_iteration.iteration", iteration_instrumentation_tags, &block) - end - def reenqueue_iteration_job - ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags) - logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}") + ActiveSupport::Notifications.instrument("interrupted.iteration", instrumentation_tags) self.times_interrupted += 1 @@ -283,13 +284,8 @@ def method_parameters(method_name) method.parameters end - def iteration_instrumentation_tags - { job_class: self.class.name } - end - - def output_interrupt_summary - message = "[JobIteration::Iteration] Completed iterating. times_interrupted=%d total_time=%.3f" - logger.info(Kernel.format(message, times_interrupted, total_time)) + def instrumentation_tags + { job_class: self.class.name, cursor_position: cursor_position } end def job_should_exit? diff --git a/lib/job-iteration/log_subscriber.rb b/lib/job-iteration/log_subscriber.rb new file mode 100644 index 00000000..56c97203 --- /dev/null +++ b/lib/job-iteration/log_subscriber.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +module JobIteration + class LogSubscriber < ActiveSupport::LogSubscriber + def logger + JobIteration.logger + end + + def nil_enumerator(event) + info do + "[JobIteration::Iteration] `build_enumerator` returned nil. Skipping the job." + end + end + + def not_found(event) + info do + "[JobIteration::Iteration] Enumerator found nothing to iterate! " \ + "times_interrupted=#{event.payload[:times_interrupted]} cursor_position=#{event.payload[:cursor_position]}" + end + end + + def interrupted(event) + info do + "[JobIteration::Iteration] Interrupting and re-enqueueing the job " \ + "cursor_position=#{event.payload[:cursor_position]}" + end + end + + def completed(event) + info do + message = "[JobIteration::Iteration] Completed iterating. times_interrupted=%d total_time=%.3f" + Kernel.format(message, event.payload[:times_interrupted], event.payload[:total_time]) + end + end + end +end + +JobIteration::LogSubscriber.attach_to(:iteration) diff --git a/test/test_helper.rb b/test/test_helper.rb index 6b6d6c59..f0133c8f 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -75,9 +75,9 @@ class Product < ActiveRecord::Base module LoggingHelpers def assert_logged(message) - old_logger = ActiveJob::Base.logger + old_logger = JobIteration.logger log = StringIO.new - ActiveJob::Base.logger = Logger.new(log) + JobIteration.logger = Logger.new(log) begin yield @@ -85,12 +85,12 @@ def assert_logged(message) log.rewind assert_match(message, log.read) ensure - ActiveJob::Base.logger = old_logger + JobIteration.logger = old_logger end end end -ActiveJob::Base.logger = Logger.new(IO::NULL) +JobIteration.logger = Logger.new(IO::NULL) module ActiveSupport class TestCase