Skip to content

Commit

Permalink
Simplify file structure and improve naming
Browse files Browse the repository at this point in the history
  • Loading branch information
mhenrixon committed Jul 30, 2015
1 parent feb9972 commit 779a2be
Show file tree
Hide file tree
Showing 23 changed files with 453 additions and 459 deletions.
39 changes: 36 additions & 3 deletions lib/sidekiq-unique-jobs.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# rubocop:disable FileName
require 'yaml' if RUBY_VERSION.include?('2.0.0')
require 'yaml' if RUBY_VERSION.include?('2.0.0') # rubocop:disable FileName
require 'sidekiq_unique_jobs/middleware'
require 'sidekiq_unique_jobs/version'
require 'sidekiq_unique_jobs/config'
require 'sidekiq_unique_jobs/payload_helper'
require 'sidekiq_unique_jobs/sidekiq_unique_ext'

require 'ostruct'
Expand Down Expand Up @@ -37,4 +35,39 @@ def worker_class_constantize(worker_class)
rescue NameError
worker_class
end

def get_payload(klass, queue, *args)
unique_on_all_queues = false
if config.unique_args_enabled
worker_class = klass.constantize
args = yield_unique_args(worker_class, *args)
unique_on_all_queues =
worker_class.get_sidekiq_options['unique_on_all_queues']
end
md5_arguments = { class: klass, args: args }
md5_arguments[:queue] = queue unless unique_on_all_queues
"#{config.unique_prefix}:" \
"#{Digest::MD5.hexdigest(Sidekiq.dump_json(md5_arguments))}"
end

def yield_unique_args(worker_class, args)
unique_args = worker_class.get_sidekiq_options['unique_args']
filtered_args(worker_class, unique_args, args)
rescue NameError
# fallback to not filtering args when class can't be instantiated
args
end

def filtered_args(worker_class, unique_args, args)
case unique_args
when Proc
unique_args.call(args)
when Symbol
if worker_class.respond_to?(unique_args)
worker_class.send(unique_args, *args)
end
else
args
end
end
end
41 changes: 41 additions & 0 deletions lib/sidekiq_unique_jobs/client/middleware.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
require 'sidekiq_unique_jobs/client/strategies/unique'
require 'sidekiq_unique_jobs/client/strategies/testing_inline'

module SidekiqUniqueJobs
module Client
class Middleware
STRATEGIES = [
Strategies::TestingInline,
Strategies::Unique
]

attr_reader :item, :worker_class, :redis_pool

def call(worker_class, item, queue, redis_pool = nil)
@worker_class = SidekiqUniqueJobs.worker_class_constantize(worker_class)
@item = item
@redis_pool = redis_pool

if unique_enabled?
strategy.review(worker_class, item, queue, redis_pool, log_duplicate_payload?) { yield }
else
yield
end
end

private

def unique_enabled?
worker_class.get_sidekiq_options['unique'] || item['unique']
end

def log_duplicate_payload?
worker_class.get_sidekiq_options['log_duplicate_payload'] || item['log_duplicate_payload']
end

def strategy
STRATEGIES.detect(&:elegible?)
end
end
end
end
23 changes: 23 additions & 0 deletions lib/sidekiq_unique_jobs/client/strategies/testing_inline.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
require 'sidekiq_unique_jobs/server/middleware'

module SidekiqUniqueJobs
module Client
module Strategies
class TestingInline < Unique
def self.elegible?
SidekiqUniqueJobs.config.inline_testing_enabled?
end

def review
_middleware.call(worker_class.new, item, queue, redis_pool) do
super
end
end

def _middleware
SidekiqUniqueJobs::Server::Middleware.new
end
end
end
end
end
103 changes: 103 additions & 0 deletions lib/sidekiq_unique_jobs/client/strategies/unique.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
require 'digest'
require 'sidekiq_unique_jobs/connectors'

REQUIRE_FILES = lambda do
if SidekiqUniqueJobs.config.testing_enabled? && Sidekiq::Testing.fake?
require 'sidekiq_unique_jobs/sidekiq_test_overrides'
end
end

module SidekiqUniqueJobs
module Client
module Strategies
class Unique
def self.elegible?
true
end

def self.review(worker_class, item, queue, redis_pool = nil, log_duplicate_payload = false)
new(worker_class, item, queue, redis_pool, log_duplicate_payload).review do
yield
end
end

def initialize(worker_class, item, queue, redis_pool = nil, log_duplicate_payload = false)
@worker_class = SidekiqUniqueJobs.worker_class_constantize(worker_class)
@item = item
@queue = queue
@redis_pool = redis_pool
@log_duplicate_payload = log_duplicate_payload
REQUIRE_FILES.call
end

def review
item['unique_hash'] = payload_hash

unless unique_for_connection?
Sidekiq.logger.warn "payload is not unique #{item}" if @log_duplicate_payload
return
end

yield
end

private

attr_reader :item, :worker_class, :redis_pool, :queue, :log_duplicate_payload

def unique_for_connection?
send("#{storage_method}_unique_for?")
end

def storage_method
SidekiqUniqueJobs.config.unique_storage_method
end

def old_unique_for?
unique = nil
connection do |conn|
conn.watch(payload_hash)
pid = conn.get(payload_hash).to_i
if pid == 1 || (pid == 2 && item['at'])
# if the job is already queued, or is already scheduled and
# we're trying to schedule again, abort
conn.unwatch
else
unique = conn.multi do
# set value of 2 for scheduled jobs, 1 for queued jobs.
conn.setex(payload_hash, expires_at, item['jid'])
end
end
end
unique
end

def new_unique_for?
connection do |conn|
return conn.set(payload_hash, item['jid'], nx: true, ex: expires_at)
end
end

def expires_at
# if the job was previously scheduled and is now being queued,
# or we've never seen it before
ex = unique_job_expiration || SidekiqUniqueJobs.config.default_expiration
ex = ((Time.at(item['at']) - Time.now.utc) + ex).to_i if item['at']
ex
end

def connection(&block)
SidekiqUniqueJobs::Connectors.connection(redis_pool, &block)
end

def payload_hash
SidekiqUniqueJobs.get_payload(item['class'], item['queue'], item['args'])
end

def unique_job_expiration
worker_class.get_sidekiq_options['unique_job_expiration']
end
end
end
end
end
2 changes: 0 additions & 2 deletions lib/sidekiq_unique_jobs/inline_testing.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
require 'sidekiq_unique_jobs/lib'

begin
require 'mock_redis'
rescue LoadError
Expand Down
23 changes: 0 additions & 23 deletions lib/sidekiq_unique_jobs/lib.rb

This file was deleted.

12 changes: 6 additions & 6 deletions lib/sidekiq_unique_jobs/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

Sidekiq.configure_server do |config|
config.server_middleware do |chain|
require 'sidekiq_unique_jobs/middleware/server/unique_jobs'
chain.add SidekiqUniqueJobs::Middleware::Server::UniqueJobs
require 'sidekiq_unique_jobs/server/middleware'
chain.add SidekiqUniqueJobs::Server::Middleware
end
config.client_middleware do |chain|
require 'sidekiq_unique_jobs/middleware/client/unique_jobs'
chain.add SidekiqUniqueJobs::Middleware::Client::UniqueJobs
require 'sidekiq_unique_jobs/client/middleware'
chain.add SidekiqUniqueJobs::Client::Middleware
end
end

Sidekiq.configure_client do |config|
config.client_middleware do |chain|
require 'sidekiq_unique_jobs/middleware/client/unique_jobs'
chain.add SidekiqUniqueJobs::Middleware::Client::UniqueJobs
require 'sidekiq_unique_jobs/client/middleware'
chain.add SidekiqUniqueJobs::Client::Middleware
end
end

This file was deleted.

Loading

0 comments on commit 779a2be

Please sign in to comment.