diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fe625ef..17e280e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,6 +8,7 @@ on: pull_request: branches: - main + - version-1.0 # remove after release of 1.0 env: ruby_version: 3.3 diff --git a/.rubocop.yml b/.rubocop.yml index bc5b096..a6de086 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -22,3 +22,6 @@ Naming/FileName: Style/BlockComments: Exclude: - 'spec/spec_helper.rb' + +Metrics/ClassLength: + Max: 150 diff --git a/CHANGELOG.md b/CHANGELOG.md index db33538..b532d03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ Unreleased Changes ------------------ +* Feature - Support per queue configuration. +* Feature - Support loading global and queue specific configuration from ENV. + 0.1.1 (2024-12-02) ------------------ diff --git a/README.md b/README.md index 997f178..0aca48b 100644 --- a/README.md +++ b/README.md @@ -62,20 +62,26 @@ end You also need to configure a mapping of ActiveJob queue names to SQS Queue URLs: ```yaml -# config/aws_sqs_active_job.yml +# config/aws_active_job_sqs.yml +backpressure: 5 # configure global options for poller +max_messages: 3 queues: - default: 'https://my-queue-url.amazon.aws' + default: + url: 'https://my-queue-url.amazon.aws' + max_messages: 2 # queue specific values override global values ``` For a complete list of configuration options see the [Aws::ActiveJob::SQS::Configuration](https://docs.aws.amazon.com/sdk-for-ruby/aws-activejob-sqs/api/Aws/ActiveJob/SQS/Configuration.html) documentation. -You can configure SQS Active Job either through the yaml file or +You can configure SQS Active Job either through the environment, yaml file or through code in your `config/.rb` or initializers. For file based configuration, you can use either -`config/aws_sqs_active_job/.yml` or `config/aws_sqs_active_job.yml`. +`config/aws_active_job_sqs/.yml` or `config/aws_active_job_sqs.yml`. +You may specify the file used through the `:config_file` option in code or the +`AWS_ACTIVE_JOB_SQS_CONFIG_FILE` environment variable. The yaml files support ERB. To configure in code: @@ -88,6 +94,21 @@ Aws::ActiveJob::SQS.configure do |config| end ``` +SQS Active Job loads global and queue specific values from your +environment. Global keys take the form of: +`AWS_ACTIVE_JOB_SQS_` and queue specific keys take the +form of: `AWS_ACTIVE_JOB_SQS__`. + is case-insensitive and is always down cased. Configuring +non-snake case queues (containing upper case) through ENV is +not supported. + +Example: + +```shell +export AWS_ACTIVE_JOB_SQS_MAX_MESSAGES = 5 +export AWS_ACTIVE_JOB_SQS_DEFAULT_URL = https://my-queue.aws +``` + ## Usage To queue a job, you can just use standard ActiveJob methods: diff --git a/VERSION b/VERSION index 3eefcb9..afaf360 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.0 +1.0.0 \ No newline at end of file diff --git a/lib/active_job/queue_adapters/sqs_adapter.rb b/lib/active_job/queue_adapters/sqs_adapter.rb index 1cda41d..a05f93a 100644 --- a/lib/active_job/queue_adapters/sqs_adapter.rb +++ b/lib/active_job/queue_adapters/sqs_adapter.rb @@ -27,7 +27,7 @@ def enqueue_at(job, timestamp) def enqueue_all(jobs) enqueued_count = 0 jobs.group_by(&:queue_name).each do |queue_name, same_queue_jobs| - queue_url = Aws::ActiveJob::SQS.config.queue_url_for(queue_name) + queue_url = Aws::ActiveJob::SQS.config.url_for(queue_name) base_send_message_opts = { queue_url: queue_url } same_queue_jobs.each_slice(10) do |chunk| diff --git a/lib/active_job/queue_adapters/sqs_adapter/params.rb b/lib/active_job/queue_adapters/sqs_adapter/params.rb index 190652c..eb8aac4 100644 --- a/lib/active_job/queue_adapters/sqs_adapter/params.rb +++ b/lib/active_job/queue_adapters/sqs_adapter/params.rb @@ -22,7 +22,7 @@ def initialize(job, body) end def queue_url - @queue_url ||= Aws::ActiveJob::SQS.config.queue_url_for(@job.queue_name) + @queue_url ||= Aws::ActiveJob::SQS.config.url_for(@job.queue_name) end def entry @@ -61,7 +61,7 @@ def options_for_fifo Digest::SHA256.hexdigest(ActiveSupport::JSON.dump(deduplication_body)) message_group_id = @job.message_group_id if @job.respond_to?(:message_group_id) - message_group_id ||= Aws::ActiveJob::SQS.config.message_group_id + message_group_id ||= Aws::ActiveJob::SQS.config.message_group_id_for(@job.queue_name) options[:message_group_id] = message_group_id options @@ -69,7 +69,7 @@ def options_for_fifo def deduplication_body ex_dedup_keys = @job.excluded_deduplication_keys if @job.respond_to?(:excluded_deduplication_keys) - ex_dedup_keys ||= Aws::ActiveJob::SQS.config.excluded_deduplication_keys + ex_dedup_keys ||= Aws::ActiveJob::SQS.config.excluded_deduplication_keys_for(@job.queue_name) @body.except(*ex_dedup_keys) end diff --git a/lib/active_job/queue_adapters/sqs_async_adapter.rb b/lib/active_job/queue_adapters/sqs_async_adapter.rb index 3971be9..16fc4b7 100644 --- a/lib/active_job/queue_adapters/sqs_async_adapter.rb +++ b/lib/active_job/queue_adapters/sqs_async_adapter.rb @@ -19,7 +19,7 @@ class SqsAsyncAdapter < SqsAdapter def _enqueue(job, body = nil, send_message_opts = {}) # FIFO jobs must be queued in order, so do not queue async - queue_url = Aws::ActiveJob::SQS.config.queue_url_for(job.queue_name) + queue_url = Aws::ActiveJob::SQS.config.url_for(job.queue_name) if Aws::ActiveJob::SQS.fifo?(queue_url) super else diff --git a/lib/aws-activejob-sqs.rb b/lib/aws-activejob-sqs.rb index b4d11ee..0f2b9e3 100644 --- a/lib/aws-activejob-sqs.rb +++ b/lib/aws-activejob-sqs.rb @@ -19,7 +19,7 @@ def self.config @config ||= Configuration.new end - # @yield Configuration + # @yield [Configuration] the (singleton) Configuration def self.configure yield(config) end diff --git a/lib/aws/active_job/sqs/configuration.rb b/lib/aws/active_job/sqs/configuration.rb index cbdc60e..c786e1a 100644 --- a/lib/aws/active_job/sqs/configuration.rb +++ b/lib/aws/active_job/sqs/configuration.rb @@ -3,11 +3,57 @@ module Aws module ActiveJob module SQS - # Use +Aws::ActiveJob::SQS.config+ to access the singleton config instance. + # This class provides a Configuration object for AWS ActiveJob + # by pulling configuration options from runtime code, the ENV, a YAML file, + # and default settings, in that order. Values set on queues are used + # preferentially to global values. + # + # Use {Aws::ActiveJob::SQS.config Aws::ActiveJob::SQS.config} + # to access the singleton config instance and use + # {Aws::ActiveJob::SQS.configure Aws::ActiveJob::SQS.configure} to + # configure in code: + # + # Aws::ActiveJob::SQS.configure do |config| + # config.logger = Rails.logger + # config.max_messages = 5 + # end + # + # # Configuation YAML File + # By default, this class will load configuration from the + # `config/aws_active_job_sqs/` and queue specific keys take the + # form of: `AWS_ACTIVE_JOB_SQS__`. + # is case-insensitive and is always down cased. Configuring + # non-snake case queues (containing upper case) through ENV is + # not supported. + # + # Example: + # + # export AWS_ACTIVE_JOB_SQS_MAX_MESSAGES = 5 + # export AWS_ACTIVE_JOB_SQS_DEFAULT_URL = https://my-queue.aws + # + # For supported global ENV configurations see + # {GLOBAL_ENV_CONFIGS}. For supported queue specific ENV configurations + # see: {QUEUE_ENV_CONFIGS}. + # class Configuration # Default configuration options # @api private DEFAULTS = { + threads: 2 * Concurrent.processor_count, + backpressure: 10, max_messages: 10, shutdown_timeout: 15, retry_standard_errors: true, # TODO: Remove in next MV @@ -17,20 +63,37 @@ class Configuration excluded_deduplication_keys: ['job_id'] }.freeze - # @api private - attr_accessor :queues, :max_messages, :visibility_timeout, - :shutdown_timeout, :client, :logger, - :async_queue_error_handler, :message_group_id + GLOBAL_ENV_CONFIGS = %i[ + config_file + threads + backpressure + max_messages + shutdown_timeout + visibility_timeout + message_group_id + ].freeze - attr_reader :excluded_deduplication_keys + QUEUE_ENV_CONFIGS = %i[ + url + max_messages + visibility_timeout + message_group_id + ].freeze - # Don't use this method directly: Configuration is a singleton class, use - # +Aws::ActiveJob::SQS.config+ to access the singleton config. + QUEUE_CONFIGS = QUEUE_ENV_CONFIGS + %i[excluded_deduplication_keys] + + # Don't use this method directly: Configuration is a singleton class, + # use {Aws::ActiveJob::SQS.config Aws::ActiveJob::SQS.config} + # to access the singleton config instance and use + # {Aws::ActiveJob::SQS.configure Aws::ActiveJob::SQS.configure} to + # configure in code: # # @param [Hash] options - # @option options [Hash[Symbol, String]] :queues A mapping between the - # active job queue name and the SQS Queue URL. Note: multiple active - # job queues can map to the same SQS Queue URL. + # @option options [Hash] :queues A mapping between the + # active job queue name and the queue properties. Values + # configured on the queue are used preferentially to the global + # values. See: {QUEUE_CONFIGS} for supported queue specific options. + # Note: multiple active job queues can map to the same SQS Queue URL. # # @option options [Integer] :max_messages # The max number of messages to poll for in a batch. @@ -50,7 +113,7 @@ class Configuration # will not be deleted from the SQS queue and will be retryable after # the visibility timeout. # - # @ option options [Boolean] :retry_standard_errors + # @option options [Boolean] :retry_standard_errors # If `true`, StandardErrors raised by ActiveJobs are left on the queue # and will be retried (pending the SQS Queue's redrive/DLQ/maximum receive settings). # This behavior overrides the standard Rails ActiveJob @@ -65,7 +128,7 @@ class Configuration # # @option options [String] :config_file # Override file to load configuration from. If not specified will - # attempt to load from config/aws_sqs_active_job.yml. + # attempt to load from config/aws_active_job_sqs.yml. # # @option options [String] :message_group_id (SqsActiveJobGroup) # The message_group_id to use for queueing messages on a fifo queues. @@ -73,7 +136,7 @@ class Configuration # See the (SQS FIFO Documentation)[https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html] # # @option options [Callable] :async_queue_error_handler An error handler - # to be called when the async active job adapter experiances an error + # to be called when the async active job adapter experiences an error # queueing a job. Only applies when # +active_job.queue_adapter = :sqs_async+. Called with: # [error, job, job_options] @@ -86,13 +149,22 @@ class Configuration # Using this option, job_id is implicitly added to the keys. def initialize(options = {}) - options[:config_file] ||= config_file if File.exist?(config_file) - options = DEFAULTS - .merge(file_options(options)) - .merge(options) - set_attributes(options) + opts = env_options.deep_merge(options) + opts = file_options(opts).deep_merge(opts) + opts = DEFAULTS.merge(opts) + + set_attributes(opts) end + # @api private + attr_accessor :queues, :threads, :backpressure, + :shutdown_timeout, :client, :logger, + :async_queue_error_handler, + :retry_standard_errors + + # @api private + attr_writer :max_messages, :message_group_id, :visibility_timeout + def excluded_deduplication_keys=(keys) @excluded_deduplication_keys = keys.map(&:to_s) | ['job_id'] end @@ -105,12 +177,10 @@ def client end end - # Return the queue_url for a given job_queue name - def queue_url_for(job_queue) - job_queue = job_queue.to_sym - raise ArgumentError, "No queue defined for #{job_queue}" unless queues.key? job_queue - - queues[job_queue] + QUEUE_CONFIGS.each do |key| + define_method(:"#{key}_for") do |job_queue| + queue_attribute_for(key, job_queue) + end end # @api private @@ -131,6 +201,13 @@ def to_h private + def queue_attribute_for(attribute, job_queue) + job_queue = job_queue.to_sym + raise ArgumentError, "No queue defined for #{job_queue}" unless queues.key? job_queue + + queues[job_queue][attribute] || instance_variable_get("@#{attribute}") + end + # Set accessible attributes after merged options. def set_attributes(options) options.each_key do |opt_name| @@ -139,18 +216,47 @@ def set_attributes(options) end end + # resolve ENV for global and queue specific options + def env_options + resolved = { queues: {} } + GLOBAL_ENV_CONFIGS.each do |cfg| + env_name = "AWS_ACTIVE_JOB_SQS_#{cfg.to_s.upcase}" + resolved[cfg] = parse_env_value(env_name) if ENV.key? env_name + end + + # check for queue specific values + queue_key_regex = + /AWS_ACTIVE_JOB_SQS_([\w]+)_(#{QUEUE_ENV_CONFIGS.map(&:upcase).join('|')})/ + ENV.each_key do |key| + next unless (match = queue_key_regex.match(key)) + + queue_name = match[1].downcase.to_sym + resolved[:queues][queue_name] ||= {} + resolved[:queues][queue_name][match[2].downcase.to_sym] = + parse_env_value(key) + end + resolved + end + + def parse_env_value(key) + val = ENV.fetch(key, nil) + Integer(val) + rescue ArgumentError, TypeError + %w[true false].include?(val) ? val == 'true' : val + end + def file_options(options = {}) - file_path = config_file_path(options) + file_path = options[:config_file] || default_config_file if file_path load_from_file(file_path) else - {} + options end end - def config_file - file = ::Rails.root.join("config/aws_sqs_active_job/#{::Rails.env}.yml") - file = ::Rails.root.join('config/aws_sqs_active_job.yml') unless File.exist?(file) + def default_config_file + file = ::Rails.root.join("config/aws_active_job_sqs/#{::Rails.env}.yml") + file = ::Rails.root.join('config/aws_active_job_sqs.yml') unless File.exist?(file) file end @@ -160,12 +266,9 @@ def load_from_file(file_path) opts.deep_symbolize_keys end - # @return [String] Configuration path found in environment or YAML file. - def config_file_path(options) - options[:config_file] || ENV.fetch('AWS_SQS_ACTIVE_JOB_CONFIG_FILE', nil) - end - def load_yaml(file_path) + return {} unless File.exist?(file_path) + require 'erb' source = ERB.new(File.read(file_path)).result diff --git a/lib/aws/active_job/sqs/poller.rb b/lib/aws/active_job/sqs/poller.rb index c23ba93..8db5071 100644 --- a/lib/aws/active_job/sqs/poller.rb +++ b/lib/aws/active_job/sqs/poller.rb @@ -12,14 +12,6 @@ module SQS class Poller class Interrupt < StandardError; end - DEFAULT_OPTS = { - threads: 2 * Concurrent.processor_count, - max_messages: 10, - shutdown_timeout: 15, - backpressure: 10, - retry_standard_errors: true - }.freeze - def initialize(args = ARGV) @options = parse_args(args) # Set_environment must be run before we boot_rails @@ -35,22 +27,28 @@ def run boot_rails # cannot load config (from file or initializers) until after - # rails has been booted. - @options = DEFAULT_OPTS - .merge(Aws::ActiveJob::SQS.config.to_h) - .merge(@options.to_h) + # rails has been booted.\ + Aws::ActiveJob::SQS.configure do |cfg| + @options.each_pair do |key, value| + cfg.send(:"#{key}=", value) if cfg.respond_to?(:"#{key}=") + end + end + validate_config + + config = Aws::ActiveJob::SQS.config + # ensure we have a logger configured - @logger = @options[:logger] || ActiveSupport::Logger.new($stdout) - @logger.info("Starting Poller with options=#{@options}") + @logger = config.logger || ActiveSupport::Logger.new($stdout) + @logger.info("Starting Poller with config=#{config.to_h}") Signal.trap('INT') { raise Interrupt } Signal.trap('TERM') { raise Interrupt } @executor = Executor.new( - max_threads: @options[:threads], + max_threads: config.threads, logger: @logger, - max_queue: @options[:backpressure], - retry_standard_errors: @options[:retry_standard_errors] + max_queue: config.backpressure, + retry_standard_errors: config.retry_standard_errors ) poll @@ -63,18 +61,18 @@ def run private def shutdown - @executor.shutdown(@options[:shutdown_timeout]) + @executor.shutdown(Aws::ActiveJob::SQS.config.shutdown_timeout) end def poll - queue_url = Aws::ActiveJob::SQS.config.queue_url_for(@options[:queue]) - @logger.info "Polling on: #{@options[:queue]} => #{queue_url}" - client = Aws::ActiveJob::SQS.config.client - @poller = Aws::SQS::QueuePoller.new(queue_url, client: client) + config = Aws::ActiveJob::SQS.config + queue = @options[:queue] + queue_url = config.url_for(queue) + @poller = Aws::SQS::QueuePoller.new(queue_url, client: config.client) poller_options = { skip_delete: true, - max_number_of_messages: @options[:max_messages], - visibility_timeout: @options[:visibility_timeout] + max_number_of_messages: config.max_messages_for(queue), + visibility_timeout: config.visibility_timeout_for(queue) } # Limit max_number_of_messages for FIFO queues to 1 # this ensures jobs with the same message_group_id are processed @@ -85,6 +83,12 @@ def poll single_message = poller_options[:max_number_of_messages] == 1 + @logger.info "Polling on: #{queue} => #{queue_url} with options=#{poller_options}" + + _poll(config.client, poller_options, queue_url, single_message) + end + + def _poll(client, poller_options, queue_url, single_message) @poller.poll(poller_options) do |msgs| msgs = [msgs] if single_message @logger.info "Processing batch of #{msgs.length} messages" diff --git a/spec/active_job/queue_adapters/sqs_adapter/params_spec.rb b/spec/active_job/queue_adapters/sqs_adapter/params_spec.rb index 78804ff..36349a4 100644 --- a/spec/active_job/queue_adapters/sqs_adapter/params_spec.rb +++ b/spec/active_job/queue_adapters/sqs_adapter/params_spec.rb @@ -49,7 +49,7 @@ class SqsAdapter describe 'fifo queue' do before do - allow(Aws::ActiveJob::SQS.config).to receive(:queue_url_for).and_return('https://queue-url.fifo') + allow(Aws::ActiveJob::SQS.config).to receive(:url_for).and_return('https://queue-url.fifo') end it 'includes message_group_id and message_deduplication_id' do diff --git a/spec/active_job/queue_adapters/sqs_adapter_spec.rb b/spec/active_job/queue_adapters/sqs_adapter_spec.rb index 4f116a6..a7b2f4f 100644 --- a/spec/active_job/queue_adapters/sqs_adapter_spec.rb +++ b/spec/active_job/queue_adapters/sqs_adapter_spec.rb @@ -23,7 +23,7 @@ module QueueAdapters describe 'fifo queues' do before do - allow(Aws::ActiveJob::SQS.config).to receive(:queue_url_for).and_return('https://queue-url.fifo') + allow(Aws::ActiveJob::SQS.config).to receive(:url_for).and_return('https://queue-url.fifo') end it 'adds message_deduplication_id and default message_group_id if job does not override it' do @@ -33,7 +33,7 @@ module QueueAdapters queue_url: 'https://queue-url.fifo', message_body: instance_of(String), message_attributes: instance_of(Hash), - message_group_id: Aws::ActiveJob::SQS.config.message_group_id, + message_group_id: Aws::ActiveJob::SQS.config.message_group_id_for(:default), message_deduplication_id: instance_of(String) } ) @@ -62,7 +62,7 @@ module QueueAdapters queue_url: 'https://queue-url.fifo', message_body: instance_of(String), message_attributes: instance_of(Hash), - message_group_id: Aws::ActiveJob::SQS.config.message_group_id, + message_group_id: Aws::ActiveJob::SQS.config.message_group_id_for(:default), message_deduplication_id: hashed_body } ) @@ -90,7 +90,7 @@ module QueueAdapters queue_url: 'https://queue-url.fifo', message_body: instance_of(String), message_attributes: instance_of(Hash), - message_group_id: Aws::ActiveJob::SQS.config.message_group_id, + message_group_id: Aws::ActiveJob::SQS.config.message_group_id_for(:default), message_deduplication_id: hashed_body } ) @@ -108,7 +108,7 @@ module QueueAdapters queue_url: 'https://queue-url.fifo', message_body: instance_of(String), message_attributes: instance_of(Hash), - message_group_id: Aws::ActiveJob::SQS.config.message_group_id, + message_group_id: Aws::ActiveJob::SQS.config.message_group_id_for(:default), message_deduplication_id: instance_of(String) } ) diff --git a/spec/active_job/queue_adapters/sqs_async_adapter_spec.rb b/spec/active_job/queue_adapters/sqs_async_adapter_spec.rb index e941f75..9ca5154 100644 --- a/spec/active_job/queue_adapters/sqs_async_adapter_spec.rb +++ b/spec/active_job/queue_adapters/sqs_async_adapter_spec.rb @@ -58,7 +58,7 @@ def mock_async end it 'queues jobs to fifo queues synchronously' do - allow(Aws::ActiveJob::SQS.config).to receive(:queue_url_for) + allow(Aws::ActiveJob::SQS.config).to receive(:url_for) .and_return('https://queue-url.fifo') expect(Concurrent::Promises).not_to receive(:future) expect(client).to receive(:send_message) diff --git a/spec/aws-activejob-sqs_spec.rb b/spec/aws-activejob-sqs_spec.rb index 153ebab..b731ebc 100644 --- a/spec/aws-activejob-sqs_spec.rb +++ b/spec/aws-activejob-sqs_spec.rb @@ -22,13 +22,13 @@ module ActiveJob describe '.configure' do it 'allows configuration through a block' do Aws::ActiveJob::SQS.configure do |config| - config.visibility_timeout = 360 - config.excluded_deduplication_keys = [:job_class] + config.threads = 3 + config.backpressure = 5 end expect(Aws::ActiveJob::SQS.config).to have_attributes( - visibility_timeout: 360, - excluded_deduplication_keys: contain_exactly('job_class', 'job_id') + threads: 3, + backpressure: 5 ) end end diff --git a/spec/aws/active_job/sqs/configuration_spec.rb b/spec/aws/active_job/sqs/configuration_spec.rb index 157369e..00212ef 100644 --- a/spec/aws/active_job/sqs/configuration_spec.rb +++ b/spec/aws/active_job/sqs/configuration_spec.rb @@ -7,7 +7,7 @@ module SQS let(:expected_file_opts) do { max_messages: 5, - queues: { default: 'https://queue-url' } + queues: { default: { url: 'https://queue-url', max_messages: 2 } } } end @@ -41,13 +41,91 @@ module SQS it 'accepts YAML config with alias' do allow_any_instance_of(ERB).to receive(:result).and_return(<<~YAML) common: &common - default: 'https://queue-url' + default: + url: 'https://queue-url' queues: <<: *common YAML expect { Aws::ActiveJob::SQS::Configuration.new }.to_not raise_error end + context 'ENV set' do + Configuration::GLOBAL_ENV_CONFIGS.each do |config_name| + next if config_name == :config_file + + describe "ENV #{config_name}" do + let(:env_name) { "AWS_ACTIVE_JOB_SQS_#{config_name.to_s.upcase}" } + + let(:cfg) do + options = {} + options[config_name] = 'file_value' + Tempfile.create('aws_active_job_sqs.yml') do |f| + f << options.transform_keys(&:to_s).to_yaml + f.rewind + Configuration.new( + config_file: f.path, + queues: { default: {} } + ) + end + end + + before(:each) do + ENV[env_name] = 'env_value' + end + + after(:each) do + ENV.delete(env_name) + end + + it 'uses values from ENV over default and file' do + if Configuration::QUEUE_CONFIGS.include?(config_name) + expect(cfg.send(:"#{config_name}_for", :default)).to eq('env_value') + else + expect(cfg.send(config_name)).to eq('env_value') + end + end + end + end + + Configuration::QUEUE_ENV_CONFIGS.each do |config_name| + describe "ENV queue #{config_name}" do + let(:env_name) { "AWS_ACTIVE_JOB_SQS_DEFAULT_#{config_name.to_s.upcase}" } + + let(:cfg) do + options = { queues: { default: {} } } + options[:queues][:default][config_name] = 'file_value' + Tempfile.create('aws_active_job_sqs.yml') do |f| + f << options.deep_transform_keys(&:to_s).to_yaml + f.rewind + Configuration.new( + config_file: f.path, + queues: { default: {} } + ) + end + end + + before(:each) do + ENV[env_name] = 'env_value' + end + + after(:each) do + ENV.delete(env_name) + end + + it 'uses values from ENV over default and file' do + expect(cfg.send(:"#{config_name}_for", :default)).to eq('env_value') + end + + it 'uses runtime configured values over ENV' do + options = { queues: { default: {} } } + options[:queues][:default][config_name] = 'runtime_value' + cfg = Configuration.new(options) + expect(cfg.send(:"#{config_name}_for", :default)).to eq('runtime_value') + end + end + end + end + describe '#client' do it 'does not create client on initialize' do expect(Aws::SQS::Client).not_to receive(:new) @@ -62,21 +140,30 @@ module SQS end end - describe '#queue_url_for' do - let(:queue_url) { 'https://queue_url' } + Configuration::QUEUE_CONFIGS.each do |config_name| + describe "##{config_name}_for" do + let(:cfg) do + queues = { + default: {}, + override: {} + } + queues[:override][config_name] = 'queue_value' + options = { queues: queues, config_file: 'nonexistant' } + options[config_name] = 'global_value' + Aws::ActiveJob::SQS::Configuration.new(**options) + end - let(:cfg) do - Aws::ActiveJob::SQS::Configuration.new( - queues: { default: queue_url } - ) - end + it 'returns the queue value when set' do + expect(cfg.send(:"#{config_name}_for", :override)).to eq('queue_value') + end - it 'returns the queue url' do - expect(cfg.queue_url_for(:default)).to eq queue_url - end + it 'returns the global value when unset' do + expect(cfg.send(:"#{config_name}_for", :default)).to eq('global_value') + end - it 'raises an ArgumentError when the queue is not mapped' do - expect { cfg.queue_url_for(:not_mapped) }.to raise_error(ArgumentError) + it 'raises an ArgumentError when the queue is not mapped' do + expect { cfg.send(:"#{config_name}_for", :not_mapped) }.to raise_error(ArgumentError) + end end end end diff --git a/spec/aws/active_job/sqs/poller_spec.rb b/spec/aws/active_job/sqs/poller_spec.rb index 1415eaa..b3025ef 100644 --- a/spec/aws/active_job/sqs/poller_spec.rb +++ b/spec/aws/active_job/sqs/poller_spec.rb @@ -47,10 +47,10 @@ module SQS allow(poller).to receive(:poll) # no-op the poll poller.run - options = poller.instance_variable_get(:@options) - expect(options[:max_messages]).to eq 5 # from test app config file - expect(options[:visibility_timeout]).to eq 360 # from argv - expect(options[:shutdown_timeout]).to eq 15 # from defaults + config = Aws::ActiveJob::SQS.config + expect(config.max_messages_for(:default)).to eq 2 # from test app config file + expect(config.visibility_timeout_for(:default)).to eq 360 # from argv + expect(config.shutdown_timeout).to eq 15 # from defaults end it 'polls the configured queue' do @@ -71,7 +71,7 @@ module SQS expect(queue_poller).to receive(:poll).with( { skip_delete: true, - max_number_of_messages: 5, + max_number_of_messages: 2, # from queue config in app config file visibility_timeout: 360 } ) @@ -82,7 +82,7 @@ module SQS it 'sets max_number_of_messages to 1 for fifo queues' do allow(poller).to receive(:boot_rails) # no-op the boot - allow(Aws::ActiveJob::SQS.config).to receive(:queue_url_for).and_return('https://queue-url.fifo') + allow(Aws::ActiveJob::SQS.config).to receive(:url_for).and_return('https://queue-url.fifo') expect(Aws::SQS::QueuePoller).to receive(:new).and_return(queue_poller) expect(queue_poller).to receive(:poll).with( diff --git a/spec/dummy/config/aws_active_job_sqs.yml b/spec/dummy/config/aws_active_job_sqs.yml new file mode 100644 index 0000000..dc8c630 --- /dev/null +++ b/spec/dummy/config/aws_active_job_sqs.yml @@ -0,0 +1,5 @@ +max_messages: 5 +queues: + default: + url: 'https://queue-url' + max_messages: 2 diff --git a/spec/dummy/config/aws_sqs_active_job.yml b/spec/dummy/config/aws_sqs_active_job.yml deleted file mode 100644 index 69f4154..0000000 --- a/spec/dummy/config/aws_sqs_active_job.yml +++ /dev/null @@ -1,3 +0,0 @@ -max_messages: 5 -queues: - default: 'https://queue-url'