Skip to content

Commit

Permalink
Merge pull request #589 from phstc/allcentury-async-delivery-adapter
Browse files Browse the repository at this point in the history
Allcentury async delivery adapter
  • Loading branch information
phstc authored Nov 30, 2019
2 parents 9d20e85 + 4b76c62 commit be59f61
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 67 deletions.
7 changes: 2 additions & 5 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Style/Alias:
Style/PerlBackrefs:
Enabled: false

Layout/TrailingBlankLines:
Layout/TrailingEmptyLines:
Enabled: false

# Override the HoundCI custom rules (they do not use Rubocop defaults)
Expand Down Expand Up @@ -76,7 +76,7 @@ Style/GuardClause:
Style/RegexpLiteral:
Enabled: false

Lint/HandleExceptions:
Lint/SuppressedException:
Enabled: false

Lint/AssignmentInCondition:
Expand Down Expand Up @@ -117,6 +117,3 @@ Security/YAMLLoad:

Naming/MemoizedInstanceVariableName:
Enabled: false

Performance/RedundantBlockCall:
Enabled: false
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## [v5.0.3] - 2019-11-30

- Add support for sending messages asynchronous with Active Job using `shoryuken_concurrent_send`
- [#589](https://github.com/phstc/shoryuken/pull/589)
- [#588](https://github.com/phstc/shoryuken/pull/588)

## [v5.0.2] - 2019-11-02

- Fix Queue order is reversed if passed through CLI
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ If you are using AWS SDK version 3, please also add this line:
gem 'aws-sdk-sqs'
```

The extra gem `aws-sdk-sqs` is required in order to keep Shoryuken compatible with AWS SDK version 2 and 3.
The extra gem `aws-sdk-sqs` is required in order to keep Shoryuken compatible with AWS SDK version 2 and 3.

And then execute:

Expand Down
5 changes: 4 additions & 1 deletion lib/shoryuken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,7 @@ def self.shoryuken_options
)
end

require 'shoryuken/extensions/active_job_adapter' if Shoryuken.active_job?
if Shoryuken.active_job?
require 'shoryuken/extensions/active_job_adapter'
require 'shoryuken/extensions/active_job_concurrent_send_adapter'
end
2 changes: 1 addition & 1 deletion lib/shoryuken/default_worker_registry.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Shoryuken
class DefaultWorkerRegistry < WorkerRegistry
def initialize
@workers = {}
@workers = Concurrent::Hash.new
end

def batch_receive_messages?(queue)
Expand Down
50 changes: 50 additions & 0 deletions lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# ActiveJob docs: http://edgeguides.rubyonrails.org/active_job_basics.html
# Example adapters ref: https://github.com/rails/rails/tree/master/activejob/lib/active_job/queue_adapters
module ActiveJob
module QueueAdapters
# == Shoryuken concurrent adapter for Active Job
#
# This adapter sends messages asynchronously (ie non-blocking) and allows
# the caller to set up handlers for both success and failure
#
# To use this adapter, set up as:
#
# success_handler = ->(response, job, options) { StatsD.increment("#{job.class.name}.success") }
# error_handler = ->(err, job, options) { StatsD.increment("#{job.class.name}.failure") }
#
# adapter = ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter.new(success_handler, error_handler)
#
# config.active_job.queue_adapter = adapter
class ShoryukenConcurrentSendAdapter < ShoryukenAdapter
def initialize(success_handler = nil, error_handler = nil)
@success_handler = success_handler
@error_handler = error_handler
end

def enqueue(job, options = {})
send_concurrently(job, options) { |f_job, f_options| super(f_job, f_options) }
end

def success_handler
@success_handler ||= ->(_send_message_response, _job, _options) { nil }
end

def error_handler
@error_handler ||= begin
lambda { |error, job, _options|
Shoryuken.logger.warn("Failed to enqueue job: #{job.inspect} due to error: #{error}")
}
end
end

private

def send_concurrently(job, options)
Concurrent::Promises
.future(job, options) { |f_job, f_options| [yield(f_job, f_options), f_job, f_options] }
.then { |send_message_response, f_job, f_options| success_handler.call(send_message_response, f_job, f_options) }
.rescue(job, options) { |err, f_job, f_options| error_handler.call(err, f_job, f_options) }
end
end
end
end
62 changes: 62 additions & 0 deletions spec/shared_examples_for_active_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# rubocop:disable Metrics/BlockLength
RSpec.shared_examples 'active_job_adapters' do
let(:job) { double 'Job', id: '123', queue_name: 'queue' }
let(:fifo) { false }
let(:queue) { double 'Queue', fifo?: fifo }

before do
allow(Shoryuken::Client).to receive(:queues).with(job.queue_name).and_return(queue)
allow(job).to receive(:serialize).and_return(
'job_class' => 'Worker',
'job_id' => job.id,
'queue_name' => job.queue_name,
'arguments' => nil,
'locale' => nil
)
end

describe '#enqueue' do
specify do
expect(queue).to receive(:send_message) do |hash|
expect(hash[:message_deduplication_id]).to_not be
end
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

subject.enqueue(job)
end

context 'when fifo' do
let(:fifo) { true }

it 'does not include job_id in the deduplication_id' do
expect(queue).to receive(:send_message) do |hash|
message_deduplication_id = Digest::SHA256.hexdigest(JSON.dump(job.serialize.except('job_id')))

expect(hash[:message_deduplication_id]).to eq(message_deduplication_id)
end
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

subject.enqueue(job)
end
end
end

describe '#enqueue_at' do
specify do
delay = 1

expect(queue).to receive(:send_message) do |hash|
expect(hash[:message_deduplication_id]).to_not be
expect(hash[:delay_seconds]).to eq(delay)
end

expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

# need to figure out what to require Time.current and N.minutes to remove the stub
allow(subject).to receive(:calculate_delay).and_return(delay)

subject.enqueue_at(job, nil)
end
end
end
# rubocop:enable Metrics/BlockLength
61 changes: 2 additions & 59 deletions spec/shoryuken/extensions/active_job_adapter_spec.rb
Original file line number Diff line number Diff line change
@@ -1,64 +1,7 @@
require 'spec_helper'
require 'active_job'
require 'shoryuken/extensions/active_job_adapter'
require 'shared_examples_for_active_job'

RSpec.describe ActiveJob::QueueAdapters::ShoryukenAdapter do
let(:job) { double 'Job', id: '123', queue_name: 'queue' }
let(:fifo) { false }
let(:queue) { double 'Queue', fifo?: fifo }

before do
allow(Shoryuken::Client).to receive(:queues).with(job.queue_name).and_return(queue)
allow(job).to receive(:serialize).and_return(
'job_class' => 'Worker',
'job_id' => job.id,
'queue_name' => job.queue_name,
'arguments' => nil,
'locale' => nil
)
end

describe '#enqueue' do
specify do
expect(queue).to receive(:send_message) do |hash|
expect(hash[:message_deduplication_id]).to_not be
end
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

subject.enqueue(job)
end

context 'when fifo' do
let(:fifo) { true }

it 'does not include job_id in the deduplication_id' do
expect(queue).to receive(:send_message) do |hash|
message_deduplication_id = Digest::SHA256.hexdigest(JSON.dump(job.serialize.except('job_id')))

expect(hash[:message_deduplication_id]).to eq(message_deduplication_id)
end
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

subject.enqueue(job)
end
end
end

describe '#enqueue_at' do
specify do
delay = 1

expect(queue).to receive(:send_message) do |hash|
expect(hash[:message_deduplication_id]).to_not be
expect(hash[:delay_seconds]).to eq(delay)
end

expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

# need to figure out what to require Time.current and N.minutes to remove the stub
allow(subject).to receive(:calculate_delay).and_return(delay)

subject.enqueue_at(job, nil)
end
end
include_examples 'active_job_adapters'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
require 'spec_helper'
require 'shared_examples_for_active_job'
require 'shoryuken/extensions/active_job_adapter'
require 'shoryuken/extensions/active_job_concurrent_send_adapter'

RSpec.describe ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter do
include_examples 'active_job_adapters'

let(:options) { {} }
let(:error_handler) { -> {} }
let(:success_handler) { -> {} }

subject { described_class.new(success_handler, error_handler) }

context 'when success' do
it 'calls success_handler' do
response = true
allow(queue).to receive(:send_message).and_return(response)
expect(success_handler).to receive(:call).with(response, job, options)

subject.enqueue(job, options)
end
end

context 'when failure' do
it 'calls error_handler' do
response = Aws::SQS::Errors::InternalError.new('error', 'error')

allow(queue).to receive(:send_message).and_raise(response)
expect(error_handler).to receive(:call).with(response, job, options).and_call_original

subject.enqueue(job, options)
end
end
end

0 comments on commit be59f61

Please sign in to comment.