diff --git a/README.md b/README.md index fe2d1bf..d974771 100644 --- a/README.md +++ b/README.md @@ -274,6 +274,7 @@ def lock( - `timed` - (optiona) `[Boolean]` - Limit the invocation time period of the passed block of code by the lock's TTL. - pre-configured in `config[:is_timed_by_default]`; + - `false` by default; - `retry_count` - (optional) `[Integer,NilClass]` - How many times we should try to acquire a lock. Nil means "infinite retries". - pre-configured in `config[:retry_count]`; diff --git a/lib/redis_queued_locks/acquier.rb b/lib/redis_queued_locks/acquier.rb index d9c7d4f..97de363 100644 --- a/lib/redis_queued_locks/acquier.rb +++ b/lib/redis_queued_locks/acquier.rb @@ -14,4 +14,5 @@ module RedisQueuedLocks::Acquier require_relative 'acquier/queues' require_relative 'acquier/keys' require_relative 'acquier/extend_lock_ttl' + require_relative 'acquier/clear_dead_requests' end diff --git a/lib/redis_queued_locks/acquier/clear_dead_requests.rb b/lib/redis_queued_locks/acquier/clear_dead_requests.rb index 90a5a85..6310f6b 100644 --- a/lib/redis_queued_locks/acquier/clear_dead_requests.rb +++ b/lib/redis_queued_locks/acquier/clear_dead_requests.rb @@ -15,9 +15,9 @@ class << self # @api private # @since 0.1.0 def clear_dead_requests(redis_client, scan_size, dead_ttl, logger, instrumenter, instrument) - dead_score = Time.now.to_f - dead_ttl + dead_score = RedisQueuedLocks::Resource.acquier_dead_score(dead_ttl / 1000.0) - result = Set.new do |processed_queues| + result = Set.new.tap do |processed_queues| redis_client.with do |rconn| each_lock_queue(rconn, scan_size) do |lock_queue| rconn.call('ZREMRANGEBYSCORE', lock_queue, '-inf', dead_score) diff --git a/lib/redis_queued_locks/client.rb b/lib/redis_queued_locks/client.rb index aec53d7..f8782dc 100644 --- a/lib/redis_queued_locks/client.rb +++ b/lib/redis_queued_locks/client.rb @@ -202,8 +202,7 @@ def lock!( ) end - # @param lock_name [String] - # The lock name that should be released. + # @param lock_name [String] The lock name that should be released. # @option logger [::Logger,#debug] # @option instrumenter [#notify] # @option instrument [NilClass,Any] @@ -213,7 +212,9 @@ def lock!( # result: { # rel_time: Integer, # # rel_key: String, # lock key - # rel_queue: String # lock queue + # rel_queue: String, # lock queue + # queue_res: Symbol, # :released or :nothing_to_release + # lock_res: Symbol # :released or :nothing_to_release # } # } # diff --git a/spec/redis_queued_locks_spec.rb b/spec/redis_queued_locks_spec.rb index 3aca7f2..48ae7ce 100644 --- a/spec/redis_queued_locks_spec.rb +++ b/spec/redis_queued_locks_spec.rb @@ -13,8 +13,118 @@ after { redis.call('FLUSHDB') } - specify 'clear_dead_queus' do - RedisQueuedLocks::Client.new(redis) + specify 'clear_dead_queues' do + client = RedisQueuedLocks::Client.new(redis) + client.lock('kek.dead.lock1', ttl: 30_000) + client.lock('kek.dead.lock2', ttl: 30_000) + + # seed requests - make them dead + lockers1 = Array.new(10) do + # seed dead short-living requests + Thread.new do + client.lock('kek.dead.lock1', ttl: 50_000, queue_ttl: 60, timeout: nil, retry_count: nil) + end + end + lockers2 = Array.new(6) do + # seed dead short-living requests + Thread.new do + client.lock('kek.dead.lock2', ttl: 50_000, queue_ttl: 60, timeout: nil, retry_count: nil) + end + end + sleep(4) + # seed super long-living request + locker3 = Thread.new do + client.lock('kek.dead.lock1', ttl: 50_000, queue_ttl: 60, timeout: nil, retry_count: nil) + end + # seed super long-living request + locker4 = Thread.new do + client.lock('kek.dead.lock2', ttl: 50_000, queue_ttl: 60, timeout: nil, retry_count: nil) + end + sleep(1) + # kill acquiers => requests will live in redis now (zombie requests! bu!) + lockers1.each(&:kill) + lockers2.each(&:kill) + locker3.kill + locker4.kill + + expect(client.queues).to contain_exactly( + 'rql:lock_queue:kek.dead.lock1', + 'rql:lock_queue:kek.dead.lock2' + ) + expect(client.queues_info.size).to eq(2) + + queue_info1 = client.queue_info('kek.dead.lock1') + expect(queue_info1['queue'].size).to eq(11) + queue_info2 = client.queue_info('kek.dead.lock2') + expect(queue_info2['queue'].size).to eq(7) + + expect(client.queue_info('kek.dead.lock1')).to match({ + 'lock_queue' => 'rql:lock_queue:kek.dead.lock1', + 'queue' => contain_exactly( + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) } + ) + }) + + expect(client.queue_info('kek.dead.lock2')).to match({ + 'lock_queue' => 'rql:lock_queue:kek.dead.lock2', + 'queue' => contain_exactly( + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) }, + { 'acq_id' => be_a(String), 'score' => be_a(Numeric) } + ) + }) + + # drop short living requests + result = client.clear_dead_requests(dead_ttl: 3_500) + expect(result).to match({ + ok: true, + result: match({ + processed_queues: contain_exactly( + 'rql:lock_queue:kek.dead.lock1', + 'rql:lock_queue:kek.dead.lock2' + ) + }) + }) + + # long-living requests remain + expect(client.queues).to contain_exactly( + 'rql:lock_queue:kek.dead.lock1', + 'rql:lock_queue:kek.dead.lock2' + ) + expect(client.queues_info.size).to eq(2) + + queue_info1 = client.queue_info('kek.dead.lock1') + expect(queue_info1['queue'].size).to eq(1) # long-living requests + queue_info2 = client.queue_info('kek.dead.lock2') + expect(queue_info2['queue'].size).to eq(1) # long-living requests + + # drop long-living requests + result = client.clear_dead_requests(dead_ttl: 1_000) + expect(result).to match({ + ok: true, + result: match({ + processed_queues: contain_exactly( + 'rql:lock_queue:kek.dead.lock1', + 'rql:lock_queue:kek.dead.lock2' + ) + }) + }) + expect(client.queues).to be_empty + redis.call('FLUSHDB') end specify 'logger' do