Skip to content

Commit

Permalink
Merge pull request #16 from lsimoneau/sidekiq-2.12.1-compat
Browse files Browse the repository at this point in the history
Compatibility with Sidekiq 2.12.1 Scheduled Jobs
  • Loading branch information
mhenrixon committed Jun 17, 2013
2 parents f39819d + c10a44a commit 0d19454
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
10 changes: 8 additions & 2 deletions lib/sidekiq-unique-jobs/middleware/client/unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@ def call(worker_class, item, queue)

conn.watch(payload_hash)

if conn.get(payload_hash)
if conn.get(payload_hash).to_i == 1 ||
(conn.get(payload_hash).to_i == 2 && item['at'])
# if the job is already queued, or is already scheduled and
# we're trying to schedule again, abort
conn.unwatch
else
# if the job was previously scheduled and is now being queued,
# or we've never seen it before
expires_at = unique_job_expiration || SidekiqUniqueJobs::Config.default_expiration
expires_at = ((Time.at(item['at']) - Time.now.utc) + expires_at).to_i if item['at']

unique = conn.multi do
conn.setex(payload_hash, expires_at, 1)
# set value of 2 for scheduled jobs, 1 for queued jobs.
conn.setex(payload_hash, expires_at, item['at'] ? 2 : 1)
end
end
end
Expand Down
17 changes: 17 additions & 0 deletions test/lib/sidekiq/test_client.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'helper'
require 'celluloid'
require 'sidekiq/worker'
require "sidekiq-unique-jobs"
require 'sidekiq/scheduled'
Expand Down Expand Up @@ -35,6 +36,22 @@ def run(x)
assert_equal 1, Sidekiq.redis {|c| c.llen("queue:customqueue") }
end

it 'does not schedule duplicates when calling perform_in' do
QueueWorker.sidekiq_options :unique => true
10.times { QueueWorker.perform_in(60, [1, 2]) }
assert_equal 1, Sidekiq.redis { |c| c.zcount("schedule", -1, Time.now.to_f + 2 * 60) }
end

it 'enqueues previously scheduled job' do
QueueWorker.sidekiq_options :unique => true
QueueWorker.perform_in(60 * 60, 1, 2)

# time passes and the job is pulled off the schedule:
Sidekiq::Client.push('class' => TestClient::QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2])

assert_equal 1, Sidekiq.redis {|c| c.llen("queue:customqueue") }
end

it 'sets an expiration when provided by sidekiq options' do
one_hour_expiration = 60 * 60
QueueWorker.sidekiq_options :unique => true, :unique_job_expiration => one_hour_expiration
Expand Down

0 comments on commit 0d19454

Please sign in to comment.