Skip to content

Commit

Permalink
Exclude job_id from message deduplication when ActiveJob (#462)
Browse files Browse the repository at this point in the history
Exclude job_id from message deduplication when ActiveJob

Fix #457
  • Loading branch information
phstc authored Jan 27, 2018
1 parent b7e4c42 commit 6eb64a8
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 10 deletions.
30 changes: 20 additions & 10 deletions lib/shoryuken/extensions/active_job_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,40 @@ def enqueue_at(job, timestamp)
end
end

def enqueue(job) #:nodoc:
def enqueue(job, options = {}) #:nodoc:
register_worker!(job)

queue = Shoryuken::Client.queues(job.queue_name)
queue.send_message(message(job))
queue.send_message(message(queue, job, options))
end

def enqueue_at(job, timestamp) #:nodoc:
register_worker!(job)
enqueue(job, delay_seconds: calculate_delay(timestamp))
end

private

def calculate_delay(timestamp)
delay = (timestamp - Time.current.to_f).round
raise 'The maximum allowed delay is 15 minutes' if delay > 15.minutes

queue = Shoryuken::Client.queues(job.queue_name)
queue.send_message(message(job, delay_seconds: delay))
delay
end

private

def message(job, options = {})
def message(queue, job, options = {})
body = job.serialize

{ message_body: body,
message_attributes: message_attributes }.merge(options)
msg = {}

if queue.fifo?
# See https://github.com/phstc/shoryuken/issues/457
msg[:message_deduplication_id] = Digest::SHA256.hexdigest(JSON.dump(body.except('job_id')))
end

msg[:message_body] = body
msg[:message_attributes] = message_attributes

msg.merge(options)
end

def register_worker!(job)
Expand Down
64 changes: 64 additions & 0 deletions spec/shoryuken/extensions/active_job_adapter_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
require 'spec_helper'
require 'active_job'
require 'shoryuken/extensions/active_job_adapter'

RSpec.describe ActiveJob::QueueAdapters::ShoryukenAdapter do
let(:job) { double 'Job', id: '123', queue_name: 'queue' }
let(:fifo) { false }
let(:queue) { double 'Queue', fifo?: fifo }

before do
allow(Shoryuken::Client).to receive(:queues).with(job.queue_name).and_return(queue)
allow(job).to receive(:serialize).and_return({
'job_class' => 'Worker',
'job_id' => job.id,
'queue_name' => job.queue_name,
'arguments' => nil,
'locale' => nil
})
end

describe '#enqueue' do
specify do
expect(queue).to receive(:send_message) do |hash|
expect(hash[:message_deduplication_id]).to_not be
end
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

subject.enqueue(job)
end

context 'when fifo' do
let(:fifo) { true }

it 'does not include job_id in the deduplication_id' do
expect(queue).to receive(:send_message) do |hash|
message_deduplication_id = Digest::SHA256.hexdigest(JSON.dump(job.serialize.except('job_id')))

expect(hash[:message_deduplication_id]).to eq(message_deduplication_id)
end
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

subject.enqueue(job)
end
end
end

describe '#enqueue_at' do
specify do
delay = 1

expect(queue).to receive(:send_message) do |hash|
expect(hash[:message_deduplication_id]).to_not be
expect(hash[:delay_seconds]).to eq(delay)
end

expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

# need to figure out what to require Time.current and N.minutes to remove the stub
allow(subject).to receive(:calculate_delay).and_return(delay)

subject.enqueue_at(job, nil)
end
end
end

0 comments on commit 6eb64a8

Please sign in to comment.