Skip to content

Commit

Permalink
[clear_dead_requests] specs + updates AND [docs] updates
Browse files Browse the repository at this point in the history
  • Loading branch information
0exp committed Mar 31, 2024
1 parent d6c16f2 commit 3e1ee6e
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 7 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def lock(
- `timed` - (optiona) `[Boolean]`
- Limit the invocation time period of the passed block of code by the lock's TTL.
- pre-configured in `config[:is_timed_by_default]`;
- `false` by default;
- `retry_count` - (optional) `[Integer,NilClass]`
- How many times we should try to acquire a lock. Nil means "infinite retries".
- pre-configured in `config[:retry_count]`;
Expand Down
1 change: 1 addition & 0 deletions lib/redis_queued_locks/acquier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ module RedisQueuedLocks::Acquier
require_relative 'acquier/queues'
require_relative 'acquier/keys'
require_relative 'acquier/extend_lock_ttl'
require_relative 'acquier/clear_dead_requests'
end
4 changes: 2 additions & 2 deletions lib/redis_queued_locks/acquier/clear_dead_requests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ class << self
# @api private
# @since 0.1.0
def clear_dead_requests(redis_client, scan_size, dead_ttl, logger, instrumenter, instrument)
dead_score = Time.now.to_f - dead_ttl
dead_score = RedisQueuedLocks::Resource.acquier_dead_score(dead_ttl / 1000.0)

result = Set.new do |processed_queues|
result = Set.new.tap do |processed_queues|
redis_client.with do |rconn|
each_lock_queue(rconn, scan_size) do |lock_queue|
rconn.call('ZREMRANGEBYSCORE', lock_queue, '-inf', dead_score)
Expand Down
7 changes: 4 additions & 3 deletions lib/redis_queued_locks/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,7 @@ def lock!(
)
end

# @param lock_name [String]
# The lock name that should be released.
# @param lock_name [String] The lock name that should be released.
# @option logger [::Logger,#debug]
# @option instrumenter [#notify]
# @option instrument [NilClass,Any]
Expand All @@ -213,7 +212,9 @@ def lock!(
# result: {
# rel_time: Integer, # <millisecnds>
# rel_key: String, # lock key
# rel_queue: String # lock queue
# rel_queue: String, # lock queue
# queue_res: Symbol, # :released or :nothing_to_release
# lock_res: Symbol # :released or :nothing_to_release
# }
# }
#
Expand Down
114 changes: 112 additions & 2 deletions spec/redis_queued_locks_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,118 @@

after { redis.call('FLUSHDB') }

specify 'clear_dead_queus' do
RedisQueuedLocks::Client.new(redis)
specify 'clear_dead_queues' do
client = RedisQueuedLocks::Client.new(redis)
client.lock('kek.dead.lock1', ttl: 30_000)
client.lock('kek.dead.lock2', ttl: 30_000)

# seed requests - make them dead
lockers1 = Array.new(10) do
# seed dead short-living requests
Thread.new do
client.lock('kek.dead.lock1', ttl: 50_000, queue_ttl: 60, timeout: nil, retry_count: nil)
end
end
lockers2 = Array.new(6) do
# seed dead short-living requests
Thread.new do
client.lock('kek.dead.lock2', ttl: 50_000, queue_ttl: 60, timeout: nil, retry_count: nil)
end
end
sleep(4)
# seed super long-living request
locker3 = Thread.new do
client.lock('kek.dead.lock1', ttl: 50_000, queue_ttl: 60, timeout: nil, retry_count: nil)
end
# seed super long-living request
locker4 = Thread.new do
client.lock('kek.dead.lock2', ttl: 50_000, queue_ttl: 60, timeout: nil, retry_count: nil)
end
sleep(1)
# kill acquiers => requests will live in redis now (zombie requests! bu!)
lockers1.each(&:kill)
lockers2.each(&:kill)
locker3.kill
locker4.kill

expect(client.queues).to contain_exactly(
'rql:lock_queue:kek.dead.lock1',
'rql:lock_queue:kek.dead.lock2'
)
expect(client.queues_info.size).to eq(2)

queue_info1 = client.queue_info('kek.dead.lock1')
expect(queue_info1['queue'].size).to eq(11)
queue_info2 = client.queue_info('kek.dead.lock2')
expect(queue_info2['queue'].size).to eq(7)

expect(client.queue_info('kek.dead.lock1')).to match({
'lock_queue' => 'rql:lock_queue:kek.dead.lock1',
'queue' => contain_exactly(
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) }
)
})

expect(client.queue_info('kek.dead.lock2')).to match({
'lock_queue' => 'rql:lock_queue:kek.dead.lock2',
'queue' => contain_exactly(
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) },
{ 'acq_id' => be_a(String), 'score' => be_a(Numeric) }
)
})

# drop short living requests
result = client.clear_dead_requests(dead_ttl: 3_500)
expect(result).to match({
ok: true,
result: match({
processed_queues: contain_exactly(
'rql:lock_queue:kek.dead.lock1',
'rql:lock_queue:kek.dead.lock2'
)
})
})

# long-living requests remain
expect(client.queues).to contain_exactly(
'rql:lock_queue:kek.dead.lock1',
'rql:lock_queue:kek.dead.lock2'
)
expect(client.queues_info.size).to eq(2)

queue_info1 = client.queue_info('kek.dead.lock1')
expect(queue_info1['queue'].size).to eq(1) # long-living requests
queue_info2 = client.queue_info('kek.dead.lock2')
expect(queue_info2['queue'].size).to eq(1) # long-living requests

# drop long-living requests
result = client.clear_dead_requests(dead_ttl: 1_000)
expect(result).to match({
ok: true,
result: match({
processed_queues: contain_exactly(
'rql:lock_queue:kek.dead.lock1',
'rql:lock_queue:kek.dead.lock2'
)
})
})
expect(client.queues).to be_empty
redis.call('FLUSHDB')
end

specify 'logger' do
Expand Down

0 comments on commit 3e1ee6e

Please sign in to comment.