Skip to content

Commit

Permalink
[prepare-to-1.0.0] added new log + removed debugger injections
Browse files Browse the repository at this point in the history
  • Loading branch information
0exp committed Mar 30, 2024
1 parent c3632e6 commit fd378aa
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 71 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## [Unreleased]

## [0.0.39] - 2024-03-31
### Added
- Logging: added new log `[redis_queued_locks.fail_fast_or_limits_reached__dequeue]`;
### Changed
- Removed `RadisQueuedLocks::Debugger.debug(...)` injections;

## [0.0.38] - 2024-03-28
### Changed
- Minor update (dropped useless constant);
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ clinet = RedisQueuedLocks::Client.new(redis_client) do |config|
# - "[redis_queued_locks.start_try_to_lock_cycle]" (logs "lock_key", "queue_ttl", "acq_id");
# - "[redis_queued_locks.dead_score_reached__reset_acquier_position]" (logs "lock_key", "queue_ttl", "acq_id");
# - "[redis_queued_locks.lock_obtained]" (logs "lockkey", "queue_ttl", "acq_id", "acq_time");
# - "[redis_queued_locks.fail_fast_or_limits_reached__dequeue] (logs "lock_key", "queue_ttl", "acq_id");
# - by default uses VoidLogger that does nothing;
config.logger = RedisQueuedLocks::Logging::VoidLogger

Expand All @@ -180,8 +181,8 @@ clinet = RedisQueuedLocks::Client.new(redis_client) do |config|
# - "[redis_queued_locks.try_lock.get_first_from_queue]" (logs "lock_key", "queue_ttl", "acq_id", "first_acq_id_in_queue");
# - "[redis_queued_locks.try_lock.exit__queue_ttl_reached]" (logs "lock_key", "queue_ttl", "acq_id");
# - "[redis_queued_locks.try_lock.exit__no_first]" (logs "lock_key", "queue_ttl", "acq_id", "first_acq_id_in_queue", "<current_lock_data>");
# - "[redis_queued_locks.try_lock.exit__still_obtained]" (logs "lock_key", "queue_ttl", "acq_id", "first_acq_id_in_queue", "locked_by_acq_id", "<current_lock_data>");
# - "[redis_queued_locks.try_lock.run__free_to_acquire]" (logs "lock_key", "queue_ttl", "acq_id");
# - "[redis_queued_locks.try_lock.exit__lock_still_obtained]" (logs "lock_key", "queue_ttl", "acq_id", "first_acq_id_in_queue", "locked_by_acq_id", "<current_lock_data>");
# - "[redis_queued_locks.try_lock.obtain_free_to_acquire]" (logs "lock_key", "queue_ttl", "acq_id");
config.log_lock_try = false
end
```
Expand Down
11 changes: 10 additions & 1 deletion lib/redis_queued_locks/acquier/acquire_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,16 @@ def acquire_lock(
hold_time: nil, # NOTE: in milliseconds
rel_time: nil # NOTE: in milliseconds
}
acq_dequeue = -> { dequeue_from_lock_queue(redis, lock_key_queue, acquier_id) }

acq_dequeue = proc do
dequeue_from_lock_queue(
redis, logger,
lock_key,
lock_key_queue,
queue_ttl,
acquier_id
)
end

run_non_critical do
logger.debug(
Expand Down
72 changes: 17 additions & 55 deletions lib/redis_queued_locks/acquier/acquire_lock/try_to_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def try_to_lock(
inter_result = :fail_fast_no_try
else
# Step 1: add an acquier to the lock acquirement queue
res = rconn.call('ZADD', lock_key_queue, 'NX', acquier_position, acquier_id)
rconn.call('ZADD', lock_key_queue, 'NX', acquier_position, acquier_id)

if log_lock_try
run_non_critical do
Expand All @@ -87,12 +87,8 @@ def try_to_lock(
end
end

RedisQueuedLocks.debug(
"Step №1: добавление в очередь (#{acquier_id}). [ZADD to the queue: #{res}]"
)

# Step 2.1: drop expired acquiers from the lock queue
res = rconn.call(
rconn.call(
'ZREMRANGEBYSCORE',
lock_key_queue,
'-inf',
Expand All @@ -110,10 +106,6 @@ def try_to_lock(
end
end

RedisQueuedLocks.debug(
"Step №2: дропаем из очереди просроченных ожидающих. [ZREMRANGE: #{res}]"
)

# Step 3: get the actual acquier waiting in the queue
waiting_acquier = Array(rconn.call('ZRANGE', lock_key_queue, '0', '0')).first

Expand All @@ -129,11 +121,6 @@ def try_to_lock(
end
end

RedisQueuedLocks.debug(
"Step №3: какой процесс в очереди сейчас ждет. " \
"[ZRANGE <следующий процесс>: #{waiting_acquier} :: <текущий процесс>: #{acquier_id}]"
)

# Step PRE-4.x: check if the request time limit is reached
# (when the current try self-removes itself from queue (queue ttl has come))
if waiting_acquier == nil
Expand All @@ -148,11 +135,6 @@ def try_to_lock(
end
end

RedisQueuedLocks.debug(
"Step PRE-ROLLBACK №0: достигли лимита времени эквайра лока (queue ttl). выходим. " \
"[Наша позиция: #{acquier_id}. queue_ttl: #{queue_ttl}]"
)

inter_result = :dead_score_reached
# Step 4: check the actual acquier: is it ours? are we aready to lock?
elsif waiting_acquier != acquier_id
Expand All @@ -171,33 +153,20 @@ def try_to_lock(
end
end

RedisQueuedLocks.debug(
"Step ROLLBACK №1: не одинаковые ключи. выходим. " \
"[Ждет: #{waiting_acquier}. А нужен: #{acquier_id}]"
)

inter_result = :acquier_is_not_first_in_queue
else
# NOTE: our time has come! let's try to acquire the lock!

# Step 5: check if the our lock is already acquired
# Step 5: find the lock -> check if the our lock is already acquired
locked_by_acquier = rconn.call('HGET', lock_key, 'acq_id')

# rubocop:disable Layout/LineLength
RedisQueuedLocks.debug(
"Ste №5: Ищем требуемый лок. " \
"[HGET<#{lock_key}>: " \
"#{(locked_by_acquier == nil) ? 'не занят' : "занят процессом <#{locked_by_acquier}>"}"
)
# rubocop:enable Layout/LineLength

if locked_by_acquier
# Step ROLLBACK 2: required lock is stil acquired. retry!

if log_lock_try
run_non_critical do
logger.debug(
"[redis_queued_locks.try_lock.exit__still_obtained] " \
"[redis_queued_locks.try_lock.exit__lock_still_obtained] " \
"lock_key => '#{lock_key}' " \
"queue_ttl => #{queue_ttl} " \
"acq_id => '#{acquier_id}' " \
Expand All @@ -208,28 +177,13 @@ def try_to_lock(
end
end

RedisQueuedLocks.debug(
"Step ROLLBACK №2: Ключ уже занят. Ничего не делаем. " \
"[Занят процессом: #{locked_by_acquier}]"
)

inter_result = :lock_is_still_acquired
else
# NOTE: required lock is free and ready to be acquired! acquire!

# Step 6.1: remove our acquier from waiting queue
transact.call('ZREM', lock_key_queue, acquier_id)

RedisQueuedLocks.debug(
'Step №4: Забираем наш текущий процесс из очереди. [ZREM]'
)

# rubocop:disable Layout/LineLength
RedisQueuedLocks.debug(
"===> <FINAL> Step №6: закрепляем лок за процессом [HSET<#{lock_key}>: #{acquier_id}]"
)
# rubocop:enable Layout/LineLength

# Step 6.2: acquire a lock and store an info about the acquier
transact.call(
'HSET',
Expand All @@ -246,7 +200,7 @@ def try_to_lock(
if log_lock_try
run_non_critical do
logger.debug(
"[redis_queued_locks.try_lock.run__free_to_acquire] " \
"[redis_queued_locks.try_lock.obtain_free_to_acquire] " \
"lock_key => '#{lock_key}' " \
"queue_ttl => #{queue_ttl} " \
"acq_id => '#{acquier_id}'"
Expand Down Expand Up @@ -297,18 +251,26 @@ def try_to_lock(
# rubocop:enable Metrics/MethodLength, Metrics/PerceivedComplexity

# @param redis [RedisClient]
# @param logger [::Logger,#debug]
# @param lock_key [String]
# @param lock_key_queue [String]
# @param queue_ttl [Integer]
# @param acquier_id [String]
# @return [Hash<Symbol,Any>] Format: { ok: true/false, result: Any }
#
# @api private
# @since 0.1.0
def dequeue_from_lock_queue(redis, lock_key_queue, acquier_id)
def dequeue_from_lock_queue(redis, logger, lock_key, lock_key_queue, queue_ttl, acquier_id)
result = redis.call('ZREM', lock_key_queue, acquier_id)

RedisQueuedLocks.debug(
"Step ~ [СМЕРТЬ ПРОЦЕССА]: [#{acquier_id} :: #{lock_key_queue}] РЕЗУЛЬТАТ: #{result}"
)
run_non_critical do
logger.debug(
"[redis_queued_locks.fail_fast_or_limits_reached__dequeue] " \
"lock_key => '#{lock_key}' " \
"queue_ttl => '#{queue_ttl}' " \
"acq_id => '#{acquier_id}'"
)
end

RedisQueuedLocks::Data[ok: true, result: result]
end
Expand Down
1 change: 0 additions & 1 deletion lib/redis_queued_locks/data.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# frozen_string_literal: true

# NOTE: wiill be rewritten with Ruby's 3.2 "Data" class;
class RedisQueuedLocks::Data < Hash
end
37 changes: 25 additions & 12 deletions spec/redis_queued_locks_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

# NOTE: these specs will be totally reworked
RSpec.describe RedisQueuedLocks do
let(:redis) { RedisClient.config(db: 0).new_pool(timeout: 5, size: 50) }

Expand Down Expand Up @@ -80,7 +81,7 @@ def debug(progname = nil, &block)
expect(test_logger.logs[6]).to include('first_acq_id_in_queue =>')

# NOTE: try to lock - fre to acquire
expect(test_logger.logs[7]).to include('[redis_queued_locks.try_lock.run__free_to_acquire]')
expect(test_logger.logs[7]).to include('[redis_queued_locks.try_lock.obtain_free_to_acquire]')
expect(test_logger.logs[7]).to include("lock_key => 'rql:lock:pek.kek.cheburek'")
expect(test_logger.logs[7]).to include("queue_ttl => #{queue_ttl}")
expect(test_logger.logs[7]).to include('acq_id =>')
Expand Down Expand Up @@ -245,8 +246,8 @@ def notify(event, payload = {})
expect(client.queued?(lock_name)).to eq(false)

# NOTE: two new requests
Thread.new { client.lock(lock_name, ttl: 10_000, timeout: nil, retry_count: nil) }
Thread.new { client.lock(lock_name, ttl: 10_000, timeout: nil, retry_count: nil) }
thread_a = Thread.new { client.lock(lock_name, ttl: 10_000, timeout: nil, retry_count: nil) }
thread_b = Thread.new { client.lock(lock_name, ttl: 10_000, timeout: nil, retry_count: nil) }
sleep(1)

expect(client.queued?(lock_name)).to eq(true)
Expand All @@ -257,6 +258,9 @@ def notify(event, payload = {})
match({ 'acq_id' => be_a(String), 'score' => be_a(Numeric) })
])
})

thread_a.join
thread_b.join
end

specify 'notifications' do
Expand All @@ -278,31 +282,33 @@ def notify(event, payload = {})
end

redis_for_info = RedisClient.config(db: 1).new_pool(timeout: 5, size: 50)
redis_for_info.call('FLUSHDB')

client_for_info = RedisQueuedLocks::Client.new(redis) do |config|
config.retry_count = 3
config.instrumenter = test_notifier
end

Array.new(4) do |kek|
inf_threads1 = Array.new(4) do |kek|
Thread.new do
client_for_info.lock(
'locklock-pekpek-123',
ttl: 30_000,
timeout: nil,
retry_count: nil,
meta: { 'kek' => 'pek', 'a' => 123 }
) { sleep(3) }
) { sleep(4) }
end
end
Array.new(4) do |kek|
inf_threads2 = Array.new(4) do |kek|
Thread.new do
client_for_info.lock(
'locklock-pekpek-567',
ttl: 30_000,
timeout: nil,
retry_count: nil,
meta: { 'pek' => 'mek', 'b' => 55.66 }
) { sleep(3) }
) { sleep(4) }
end
end

Expand All @@ -315,6 +321,8 @@ def notify(event, payload = {})
queue_info_a = client_for_info.queues_info
queue_info_b = client_for_info.queues(with_info: true)

redis_for_info.call('FLUSHDB')

# TODO: more time for work => better spec
expect(locks_info_a).to be_a(Set)
expect(locks_info_b).to be_a(Set)
Expand Down Expand Up @@ -368,21 +376,19 @@ def notify(event, payload = {})
)
)

redis_for_info.call('FLUSHDB')

Array.new(5) do |kek|
a_threads = Array.new(5) do |kek|
Thread.new do
client.lock!("locklock#{kek}", retry_count: nil, timeout: nil)
end
end

Array.new(5) do |kek|
b_threads = Array.new(5) do |kek|
Thread.new do
client.lock!("locklock#{kek}", retry_count: nil, timeout: nil) { 'some_logic' }
end
end.each(&:join)

Array.new(120) do |kek|
c_threads = Array.new(120) do |kek|
Thread.new do
client.lock!("locklock#{kek}", ttl: 10_000, retry_count: nil, timeout: nil)
end
Expand All @@ -398,6 +404,13 @@ def notify(event, payload = {})

puts test_notifier.notifications

a_threads.each(&:join)
b_threads.each(&:join)
c_threads.each(&:join)

inf_threads1.each(&:join)
inf_threads2.each(&:join)

redis.call('FLUSHDB')
end
end

0 comments on commit fd378aa

Please sign in to comment.