Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per queue config + ENV config #10

Merged
merged 15 commits into from
Dec 2, 2024
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
pull_request:
branches:
- main
- version-1.0 # remove after release of 1.0

env:
ruby_version: 3.3
Expand Down
4 changes: 4 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ Naming/FileName:
Style/BlockComments:
Exclude:
- 'spec/spec_helper.rb'

Metrics/ClassLength:
alextwoods marked this conversation as resolved.
Show resolved Hide resolved
Exclude:
- 'lib/aws/active_job/sqs/configuration.rb'
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
Unreleased Changes
------------------

* Feature - Support per queue configuration.
* Feature - Support loading global and queue specific configuration from ENV.

0.1.1 (2024-12-02)
------------------

Expand Down
29 changes: 25 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,26 @@ end
You also need to configure a mapping of ActiveJob queue names to SQS Queue URLs:

```yaml
# config/aws_sqs_active_job.yml
# config/aws_active_job_sqs.yml
backpressure: 5 # configure global options for poller
max_messages: 3
queues:
default: 'https://my-queue-url.amazon.aws'
default:
url: 'https://my-queue-url.amazon.aws'
max_messages: 2 # queue specific values override global values
```

For a complete list of configuration options see the
[Aws::ActiveJob::SQS::Configuration](https://docs.aws.amazon.com/sdk-for-ruby/aws-activejob-sqs/api/Aws/ActiveJob/SQS/Configuration.html)
documentation.

You can configure SQS Active Job either through the yaml file or
You can configure SQS Active Job either through the environment, yaml file or
through code in your `config/<env>.rb` or initializers.

For file based configuration, you can use either
`config/aws_sqs_active_job/<Rails.env>.yml` or `config/aws_sqs_active_job.yml`.
`config/aws_active_job_sqs/<Rails.env>.yml` or `config/aws_active_job_sqs.yml`.
You may specify the file used through the `:config_file` option in code or the
`AWS_ACTIVE_JOB_SQS_CONFIG_FILE` environment variable.
The yaml files support ERB.

To configure in code:
Expand All @@ -88,6 +94,21 @@ Aws::ActiveJob::SQS.configure do |config|
end
```

SQS Active Job loads global and queue specific values from your
environment. Global keys take the form of:
`AWS_ACTIVE_JOB_SQS_<KEY_NAME>` and queue specific keys take the
form of: `AWS_ACTIVE_JOB_SQS_<QUEUE_NAME>_<KEY_NAME>`.
<QUEUE_NAME> is case-insensitive and is always down cased. Configuring
non-snake case queues (containing upper case) through ENV is
not supported.

Example:

```shell
export AWS_ACTIVE_JOB_SQS_MAX_MESSAGES = 5
export AWS_ACTIVE_JOB_SQS_DEFAULT_URL = https://my-queue.aws
```

## Usage

To queue a job, you can just use standard ActiveJob methods:
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.0
1.0.0
2 changes: 1 addition & 1 deletion lib/active_job/queue_adapters/sqs_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def enqueue_at(job, timestamp)
def enqueue_all(jobs)
enqueued_count = 0
jobs.group_by(&:queue_name).each do |queue_name, same_queue_jobs|
queue_url = Aws::ActiveJob::SQS.config.queue_url_for(queue_name)
queue_url = Aws::ActiveJob::SQS.config.url_for(queue_name)
base_send_message_opts = { queue_url: queue_url }

same_queue_jobs.each_slice(10) do |chunk|
Expand Down
6 changes: 3 additions & 3 deletions lib/active_job/queue_adapters/sqs_adapter/params.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def initialize(job, body)
end

def queue_url
@queue_url ||= Aws::ActiveJob::SQS.config.queue_url_for(@job.queue_name)
@queue_url ||= Aws::ActiveJob::SQS.config.url_for(@job.queue_name)
end

def entry
Expand Down Expand Up @@ -61,15 +61,15 @@ def options_for_fifo
Digest::SHA256.hexdigest(ActiveSupport::JSON.dump(deduplication_body))

message_group_id = @job.message_group_id if @job.respond_to?(:message_group_id)
message_group_id ||= Aws::ActiveJob::SQS.config.message_group_id
message_group_id ||= Aws::ActiveJob::SQS.config.message_group_id_for(@job.queue_name)

options[:message_group_id] = message_group_id
options
end

def deduplication_body
ex_dedup_keys = @job.excluded_deduplication_keys if @job.respond_to?(:excluded_deduplication_keys)
ex_dedup_keys ||= Aws::ActiveJob::SQS.config.excluded_deduplication_keys
ex_dedup_keys ||= Aws::ActiveJob::SQS.config.excluded_deduplication_keys_for(@job.queue_name)

@body.except(*ex_dedup_keys)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/active_job/queue_adapters/sqs_async_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class SqsAsyncAdapter < SqsAdapter

def _enqueue(job, body = nil, send_message_opts = {})
# FIFO jobs must be queued in order, so do not queue async
queue_url = Aws::ActiveJob::SQS.config.queue_url_for(job.queue_name)
queue_url = Aws::ActiveJob::SQS.config.url_for(job.queue_name)
if Aws::ActiveJob::SQS.fifo?(queue_url)
super
else
Expand Down
168 changes: 134 additions & 34 deletions lib/aws/active_job/sqs/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,57 @@
module Aws
module ActiveJob
module SQS
# Use +Aws::ActiveJob::SQS.config+ to access the singleton config instance.
# This class provides a Configuration object for AWS ActiveJob
# by pulling configuration options from runtime code, the ENV, a YAML file,
# and default settings, in that order. Values set on queues are used
# preferentially to global values.
#
# Use {Aws::ActiveJob::SQS.config Aws::ActiveJob::SQS.config}
alextwoods marked this conversation as resolved.
Show resolved Hide resolved
# to access the singleton config instance and use
# {Aws::ActiveJob::SQS.configure Aws::ActiveJob::SQS.configure} to
# configure in code:
#
# Aws::ActiveJob::SQS.configure do |config|
# config.logger = Rails.logger
# config.max_messages = 5
# end
#
# # Configuation YAML File
# By default, this class will load configuration from the
# `config/aws_active_job_sqs/<RAILS_ENV}.yml` or
# `config/aws_active_job_sqs.yml` files. You may specify the file used
# through the `:config_file` option in code or the
# `AWS_ACTIVE_JOB_SQS_CONFIG_FILE` environment variable.
#
# # Global and queue specific options
# Values configured for specific queues are used preferentially to
# global values. See: {QUEUE_CONFIGS} for supported queue specific
alextwoods marked this conversation as resolved.
Show resolved Hide resolved
# options.
#
# # Environment Variables
# The Configuration loads global and qubeue specific values from your
# environment. Global keys take the form of:
# `AWS_ACTIVE_JOB_SQS_<KEY_NAME>` and queue specific keys take the
# form of: `AWS_ACTIVE_JOB_SQS_<QUEUE_NAME>_<KEY_NAME>`.
# <QUEUE_NAME> is case-insensitive and is always down cased. Configuring
# non-snake case queues (containing upper case) through ENV is
# not supported.
#
# Example:
#
# export AWS_ACTIVE_JOB_SQS_MAX_MESSAGES = 5
# export AWS_ACTIVE_JOB_SQS_DEFAULT_URL = https://my-queue.aws
#
# For supported global ENV configurations see
# {GLOBAL_ENV_CONFIGS}. For supported queue specific ENV configurations
# see: {QUEUE_ENV_CONFIGS}.
#
class Configuration
# Default configuration options
# @api private
DEFAULTS = {
threads: 2 * Concurrent.processor_count,
backpressure: 10,
max_messages: 10,
shutdown_timeout: 15,
retry_standard_errors: true, # TODO: Remove in next MV
Expand All @@ -17,20 +63,34 @@ class Configuration
excluded_deduplication_keys: ['job_id']
}.freeze

# @api private
attr_accessor :queues, :max_messages, :visibility_timeout,
:shutdown_timeout, :client, :logger,
:async_queue_error_handler, :message_group_id
GLOBAL_ENV_CONFIGS = %i[
config_file
threads
backpressure
max_messages
shutdown_timeout
visibility_timeout
message_group_id
].freeze

attr_reader :excluded_deduplication_keys
QUEUE_ENV_CONFIGS = %i[
url
max_messages
visibility_timeout
message_group_id
].freeze

QUEUE_CONFIGS = QUEUE_ENV_CONFIGS + %i[excluded_deduplication_keys]

# Don't use this method directly: Configuration is a singleton class, use
# +Aws::ActiveJob::SQS.config+ to access the singleton config.
# {}Aws::ActiveJob::SQS.config} to access the singleton config.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be . for class methods

#
# @param [Hash] options
# @option options [Hash[Symbol, String]] :queues A mapping between the
# active job queue name and the SQS Queue URL. Note: multiple active
# job queues can map to the same SQS Queue URL.
# @option options [Hash<Symbol, Hash>] :queues A mapping between the
# active job queue name and the queue properties. Values
# configured on the queue are used preferentially to the global
# values. See: {QUEUE_CONFIGS} for supported queue specific options.
# Note: multiple active job queues can map to the same SQS Queue URL.
#
# @option options [Integer] :max_messages
# The max number of messages to poll for in a batch.
Expand All @@ -50,7 +110,7 @@ class Configuration
# will not be deleted from the SQS queue and will be retryable after
# the visibility timeout.
#
# @ option options [Boolean] :retry_standard_errors
# @option options [Boolean] :retry_standard_errors
# If `true`, StandardErrors raised by ActiveJobs are left on the queue
# and will be retried (pending the SQS Queue's redrive/DLQ/maximum receive settings).
# This behavior overrides the standard Rails ActiveJob
Expand All @@ -65,15 +125,15 @@ class Configuration
#
# @option options [String] :config_file
# Override file to load configuration from. If not specified will
# attempt to load from config/aws_sqs_active_job.yml.
# attempt to load from config/aws_active_job_sqs.yml.
#
# @option options [String] :message_group_id (SqsActiveJobGroup)
# The message_group_id to use for queueing messages on a fifo queues.
# Applies only to jobs queued on FIFO queues.
# See the (SQS FIFO Documentation)[https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html]
#
# @option options [Callable] :async_queue_error_handler An error handler
# to be called when the async active job adapter experiances an error
# to be called when the async active job adapter experiences an error
# queueing a job. Only applies when
# +active_job.queue_adapter = :sqs_async+. Called with:
# [error, job, job_options]
Expand All @@ -86,13 +146,22 @@ class Configuration
# Using this option, job_id is implicitly added to the keys.

def initialize(options = {})
mullermp marked this conversation as resolved.
Show resolved Hide resolved
options[:config_file] ||= config_file if File.exist?(config_file)
options = DEFAULTS
.merge(file_options(options))
.merge(options)
set_attributes(options)
opts = env_options.deep_merge(options)
alextwoods marked this conversation as resolved.
Show resolved Hide resolved
opts = file_options(opts).deep_merge(opts)
opts = DEFAULTS.merge(opts)

set_attributes(opts)
end

# @api private
attr_accessor :queues, :threads, :backpressure,
alextwoods marked this conversation as resolved.
Show resolved Hide resolved
:shutdown_timeout, :client, :logger,
:async_queue_error_handler,
:retry_standard_errors

# @api private
attr_writer :max_messages, :message_group_id, :visibility_timeout

def excluded_deduplication_keys=(keys)
@excluded_deduplication_keys = keys.map(&:to_s) | ['job_id']
end
Expand All @@ -105,12 +174,10 @@ def client
end
end

# Return the queue_url for a given job_queue name
def queue_url_for(job_queue)
job_queue = job_queue.to_sym
raise ArgumentError, "No queue defined for #{job_queue}" unless queues.key? job_queue

queues[job_queue]
QUEUE_CONFIGS.each do |key|
define_method(:"#{key}_for") do |job_queue|
queue_attribute_for(key, job_queue)
end
end

# @api private
Expand All @@ -131,6 +198,13 @@ def to_h

private

def queue_attribute_for(attribute, job_queue)
job_queue = job_queue.to_sym
raise ArgumentError, "No queue defined for #{job_queue}" unless queues.key? job_queue

queues[job_queue][attribute] || instance_variable_get("@#{attribute}")
end

# Set accessible attributes after merged options.
def set_attributes(options)
options.each_key do |opt_name|
Expand All @@ -139,18 +213,47 @@ def set_attributes(options)
end
end

# resolve ENV for global and queue specific options
def env_options
resolved = { queues: {} }
GLOBAL_ENV_CONFIGS.each do |cfg|
env_name = "AWS_ACTIVE_JOB_SQS_#{cfg.to_s.upcase}"
resolved[cfg] = parse_env_value(env_name) if ENV.key? env_name
end

# check for queue specific values
queue_key_regex =
/AWS_ACTIVE_JOB_SQS_([\w]+)_(#{QUEUE_ENV_CONFIGS.map(&:upcase).join('|')})/
ENV.each_key do |key|
next unless (match = queue_key_regex.match(key))

queue_name = match[1].downcase.to_sym
resolved[:queues][queue_name] ||= {}
resolved[:queues][queue_name][match[2].downcase.to_sym] =
parse_env_value(key)
end
resolved
end

def parse_env_value(key)
val = ENV.fetch(key, nil)
Integer(val)
rescue ArgumentError, TypeError
%w[true false].include?(val) ? val == 'true' : val
end

def file_options(options = {})
file_path = config_file_path(options)
file_path = options[:config_file] || default_config_file
if file_path
load_from_file(file_path)
else
{}
options
end
end

def config_file
file = ::Rails.root.join("config/aws_sqs_active_job/#{::Rails.env}.yml")
file = ::Rails.root.join('config/aws_sqs_active_job.yml') unless File.exist?(file)
def default_config_file
alextwoods marked this conversation as resolved.
Show resolved Hide resolved
file = ::Rails.root.join("config/aws_active_job_sqs/#{::Rails.env}.yml")
file = ::Rails.root.join('config/aws_active_job_sqs.yml') unless File.exist?(file)
file
end

Expand All @@ -160,12 +263,9 @@ def load_from_file(file_path)
opts.deep_symbolize_keys
end

# @return [String] Configuration path found in environment or YAML file.
def config_file_path(options)
options[:config_file] || ENV.fetch('AWS_SQS_ACTIVE_JOB_CONFIG_FILE', nil)
end

def load_yaml(file_path)
return {} unless File.exist?(file_path)

require 'erb'
source = ERB.new(File.read(file_path)).result

Expand Down
Loading
Loading