diff --git a/lib/sidekiq_unique_jobs/batch_delete.rb b/lib/sidekiq_unique_jobs/batch_delete.rb index fdbe429c..f4796e59 100644 --- a/lib/sidekiq_unique_jobs/batch_delete.rb +++ b/lib/sidekiq_unique_jobs/batch_delete.rb @@ -91,6 +91,7 @@ def batch_delete(conn) chunk.each do |digest| del_digest(pipeline, digest) pipeline.zrem(SidekiqUniqueJobs::DIGESTS, digest) + pipeline.zrem(SidekiqUniqueJobs::EXPIRING_DIGESTS, digest) @count += 1 end end diff --git a/lib/sidekiq_unique_jobs/lock.rb b/lib/sidekiq_unique_jobs/lock.rb index ffcf8cf1..e6cb8985 100644 --- a/lib/sidekiq_unique_jobs/lock.rb +++ b/lib/sidekiq_unique_jobs/lock.rb @@ -66,7 +66,7 @@ def lock(job_id, lock_info = {}) pipeline.set(key.digest, job_id) pipeline.hset(key.locked, job_id, now_f) info.set(lock_info, pipeline) - pipeline.zadd(key.digests, now_f, key.digest) + add_digest_to_set(pipeline, lock_info) pipeline.zadd(key.changelog, now_f, changelog_json(job_id, "queue.lua", "Queued")) pipeline.zadd(key.changelog, now_f, changelog_json(job_id, "lock.lua", "Locked")) end @@ -321,5 +321,22 @@ def changelog_json(job_id, script, message) time: now_f, ) end + + # + # Add the digest to the correct sorted set + # + # @param [Object] pipeline a redis pipeline object for issue commands + # @param [Hash] lock_info the lock info relevant to the digest + # + # @return [nil] + # + def add_digest_to_set(pipeline, lock_info) + digest_string = key.digest + if lock_info["lock"] == :until_expired + pipeline.zadd(key.expiring_digests, now_f + lock_info["ttl"], digest_string) + else + pipeline.zadd(key.digests, now_f, digest_string) + end + end end end diff --git a/spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb b/spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb index 909b18d4..9d45914d 100644 --- a/spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb +++ b/spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb @@ -135,5 +135,28 @@ expect(service).not_to have_received(:orphans) end end + + context "when a lock is until_expired" do + let(:lock_info) do + { + "job_id" => job_id, + "limit" => 1, + "lock" => :until_expired, + "time" => now_f, + "timeout" => nil, + "ttl" => 1, + "lock_args" => [], + "worker" => "MyUniqueJob", + } + end + + it "clears the lock" do + expect(redis { |conn| conn.zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS) }).to eq 1 + sleep 2 + service.call + + expect(redis { |conn| conn.zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS) }).to eq 0 + end + end end end