Skip to content

Commit

Permalink
new-features (part 2)
Browse files Browse the repository at this point in the history
  • Loading branch information
0exp committed Dec 25, 2024
1 parent bb3b573 commit 9def8a1
Show file tree
Hide file tree
Showing 19 changed files with 577 additions and 38 deletions.
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
- Test coverage (via `simplecov` with `html` and `lcov` formats). `minimum_coverage` config is temporary disabled (and the CI step is not configured yet) cuz we need to refactor tests in first;
- CI: `rspec-retry` is temporary added until the tests are fully refactored;
- Support for `ActiveSupport::BroadcastLogger` logger instances;
- New method: `remove_from_queues`;
- New method: `release_locks_of`;
- New method: `release_all_of`
- New method: `remove_from_queues` - remove the concrete `acquirer_id` form each lock queue;
- New method: `release_locks_of` - remove all locks of the concrete `acquirer_id` or `host_id` (or in combination of both of them);
- New method: `release_all_of` - the combination of `remove_from_queues` and `release_locks_of` rmovement/releasing;

## [1.12.1]
### Changed
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2084,7 +2084,7 @@ Detalized event semantics and payload structure:
```
- an ability to release all locks and all requests of the concrete acquirer id or host id (or both in validation-orianted combination);
- **Minor**:
- Scan requests indexing;
- Scan requests indexing (with `Redis keyspace notifications` investigation);
- Support for detailed `OpenTelemetry` tracing;
- `light mode`: an ability to work without any debug and instrumentation logic and data (totally reduced debugging and instrumenting possibilities, but better performance);
- support for `Dragonfly` database backend (https://github.com/dragonflydb/dragonfly) (https://www.dragonflydb.io/);
Expand Down
2 changes: 1 addition & 1 deletion lib/redis_queued_locks/acquirer/locks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def scan_locks(redis_client, scan_size)
RedisQueuedLocks::Resource::LOCK_PATTERN,
count: scan_size
) do |lock_key|
# TODO: reduce unnecessary iterations
# TODO: reduce unnecessary iterations (with idnexing)
lock_keys.add(lock_key)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/redis_queued_locks/acquirer/queues.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def scan_queues(redis_client, scan_size)
RedisQueuedLocks::Resource::LOCK_QUEUE_PATTERN,
count: scan_size
) do |lock_queue|
# TODO: reduce unnecessary iterations
# TODO: reduce unnecessary iterations (with idnexing)
# @type var lock_queue: String
lock_queues.add(lock_queue)
end
Expand Down
1 change: 1 addition & 0 deletions lib/redis_queued_locks/acquirer/release_all_locks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def release_all_locks(
# @api private
# @since 1.0.0
def fully_release_all_locks(redis, batch_size)
# TODO: reduce memory allocations in result
result = redis.with do |rconn|
rconn.pipelined do |pipeline|
# Step A: release all queus and their related locks
Expand Down
95 changes: 75 additions & 20 deletions lib/redis_queued_locks/acquirer/release_locks_of.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# frozen_string_literal: true

# @api private
# @since 1.13.0
module RedisQueuedLocks::Acquirer::ReleaseLocksOf
class << self
def release_locks_of(
# @api private
# @since 1.13.0
def release(
redis,
acquirer_id,
host_id,
Expand All @@ -19,28 +23,79 @@ def release_locks_of(
instr_sampler,
instr_sample_this
)
rel_start_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond)

if acquirer_id != nil && host_id != nil
remove_locks_of_combination(redis, acquirer_id, host_id, scan_size)
elsif acuquirer_id != nil && host_id == nil
remove_locks_of_acquirer(redis, acquirer_id, scan_size)
# NOTE: release by acquirer_id
if acquirer_id != nil && host_id == nil
# @type var acquirer_id: String
# @type var host_id: nil
result = OfAcquirer.release(
redis,
acquirer_id,
scan_size,
logger,
instrumenter,
instrument,
log_sampling_enabled,
log_sampling_percent,
log_sampler,
log_sample_this,
instr_sampling_enabled,
instr_sampling_percent,
instr_sampler,
instr_sample_this
)
{ ok: true, result: }
# NOTE: release by host_id
elsif acquirer_id == nil && host_id != nil
remove_locks_of_host(redis, host_id, scan_size)
# @type var host_id: String
# @type var acquirer_id: nil
result = OfHost.release(
redis,
host_id,
scan_size,
logger,
instrumenter,
instrument,
log_sampling_enabled,
log_sampling_percent,
log_sampler,
log_sample_this,
instr_sampling_enabled,
instr_sampling_percent,
instr_sampler,
instr_sample_this
)
{ ok: true, result: }
# NOTE: release by combination of host_id and acquirer_id
elsif acquirer_id != nil && host_id != nil
# @type var host_id: String
# @type var acquirer_id: String
result = OfCombination.release(
redis,
acquirer_id,
host_id,
scan_size,
logger,
instrumenter,
instrument,
log_sampling_enabled,
log_sampling_percent,
log_sampler,
log_sample_this,
instr_sampling_enabled,
instr_sampling_percent,
instr_sampler,
instr_sample_this
)
{ ok: true, result: }
else
raise
# @type var host_id: nil
# @type var acquirer_id: nil
raise(
RedisQueuedLocks::NoLockObtainerForReleaseArgumentError,
"You must specify at least one lock obtainer: " \
"eaither acquirer_id or host_id (or both of them)."
)
end
end

private

def remove_locks_of_acquirer(redis, acquirer_id, scan_size)
end

def remove_locks_of_host(redis, host_id, scan_size)
end

def remove_locks_of_combination(redis, acquirer_id, host_id, scan_size)
end
end
end
111 changes: 111 additions & 0 deletions lib/redis_queued_locks/acquirer/release_locks_of/of_acquirer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# frozen_string_literal: true

# @api private
# @since 1.13.0
module RedisQueuedLocks::Acquirer::ReleaseLocksOf::OfAcquirer
# @since 1.13.0
extend RedisQueuedLocks::Utilities

class << self
# @param redis [RedisClient]
# @param acquirer_id [String]
# @param scan_size [Integer]
# @param logger [::Logger,#debug]
# @param instrumenter [#notify]
# @param instrument [NilClass,Any]
# @param log_sampling_enabled [Boolean]
# @param log_sampling_percent [Integer]
# @param log_sampler [#sampling_happened?,Module<RedisQueuedLocks::Logging::Sampler>]
# @param log_sample_this [Boolean]
# @param instr_sampling_enabled [Boolean]
# @param instr_sampling_percent [Integer]
# @param instr_sampler [#sampling_happened?,Module<RedisQueuedLocks::Instrument::Sampler>]
# @param instr_sample_this [Boolean]
# @return [Hash<Symbol,Float|Integer>]
# Format: { rel_time: Float, rel_key_cnt: Integer }
#
# @api private
# @since 1.13.0
def release(
redis,
acquirer_id,
scan_size,
logger,
instrumenter,
instrument,
log_sampling_enabled,
log_sampling_percent,
log_sampler,
log_sample_this,
instr_sampling_enabled,
instr_sampling_percent,
instr_sampler,
instr_sample_this
)
rel_start_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond)

rel_key_cnt = remove_locks(redis, acquirer_id, scan_size)

time_at = Time.now.to_f
rel_end_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond)
rel_time = ((rel_end_time - rel_start_time) / 1_000.0).ceil(2) #: Float

instr_sampled = RedisQueuedLocks::Instrument.should_instrument?(
instr_sampling_enabled,
instr_sample_this,
instr_sampling_percent,
instr_sampler
)

run_non_critical do
instrumenter.notify('redis_queued_locks.release_all_locks_of_acquirer', {
at: time_at,
acq_id: acquirer_id,
rel_time:,
rel_key_cnt:,
})
end if instr_sampled

{ rel_time:, rel_key_cnt: }
end

private

# @param redis [RedisClient]
# @param acquirer_id [String]
# @param scan_size [Integer]
# @return [Integer]
#
# @api private
# @since 1.13.0
def remove_locks(redis, acquirer_id, scan_size)
released_keys_count = 0

redis.with do |rconn|
locks_to_release = Set.new #: Set[String]

# TODO: reduce unnecessary iterations (with indexing)
rconn.scan(
"MATCH", RedisQueuedLocks::Resource::LOCK_PATTERN, count: scan_size
) do |lock_key|
lock_acquirer_id = rconn.call("HMGET", lock_key, "acq_id")

if acquirer_id == lock_acquirer_id
locks_to_release << lock_key
end

if locks_to_release.size >= scan_size
rconn.call("DEL", *locks_to_release) # steep:ignore
released_keys_count += locks_to_release.size
locks_to_release.clear
end
end

released_keys_count += locks_to_release.size
rconn.call("DEL", *locks_to_release) if locks_to_release.any? # steep:ignore
end

released_keys_count
end
end
end
117 changes: 117 additions & 0 deletions lib/redis_queued_locks/acquirer/release_locks_of/of_combination.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# frozen_string_literal: true

# @api private
# @since 1.13.0
module RedisQueuedLocks::Acquirer::ReleaseLocksOf::OfCombination
# @since 1.13.0
extend RedisQueuedLocks::Utilities

class << self
# @param redis [RedisClient]
# @param acquirer_id [String]
# @param host_id [String]
# @param scan_size [Integer]
# @param logger [::Logger,#debug]
# @param instrumenter [#notify]
# @param instrument [NilClass,Any]
# @param log_sampling_enabled [Boolean]
# @param log_sampling_percent [Integer]
# @param log_sampler [#sampling_happened?,Module<RedisQueuedLocks::Logging::Sampler>]
# @param log_sample_this [Boolean]
# @param instr_sampling_enabled [Boolean]
# @param instr_sampling_percent [Integer]
# @param instr_sampler [#sampling_happened?,Module<RedisQueuedLocks::Instrument::Sampler>]
# @param instr_sample_this [Boolean]
# @return [Hash<Symbol,Float|Integer>]
# Format: { rel_time: Float, rel_key_cnt: Integer }
#
# @api private
# @since 1.13.0
# @api private
# @since 1.13.0
def release(
redis,
acquirer_id,
host_id,
scan_size,
logger,
instrumenter,
instrument,
log_sampling_enabled,
log_sampling_percent,
log_sampler,
log_sample_this,
instr_sampling_enabled,
instr_sampling_percent,
instr_sampler,
instr_sample_this
)
rel_start_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond)

rel_key_cnt = remove_locks(redis, acquirer_id, host_id, scan_size)

time_at = Time.now.to_f
rel_end_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond)
rel_time = ((rel_end_time - rel_start_time) / 1_000.0).ceil(2) #: Float

instr_sampled = RedisQueuedLocks::Instrument.should_instrument?(
instr_sampling_enabled,
instr_sample_this,
instr_sampling_percent,
instr_sampler
)

run_non_critical do
instrumenter.notify('redis_queued_locks.release_all_locks_of_acquirer_host_combination', {
at: time_at,
acq_id: acquirer_id,
hst_id: host_id,
rel_time:,
rel_key_cnt:,
})
end if instr_sampled

{ rel_time:, rel_key_cnt: }
end

private

# @param redis [RedisClient]
# @param acquirer_id [String]
# @param host_id [String]
# @param scan_size [Integer]
# @return [Integer]
#
# @api private
# @since 1.13.0
def remove_locks(redis, acquirer_id, host_id, scan_size)
released_keys_count = 0

redis.with do |rconn|
locks_to_release = Set.new #: Set[String]

# TODO: reduce unnecessary iterations (with indexing)
rconn.scan(
"MATCH", RedisQueuedLocks::Resource::LOCK_PATTERN, count: scan_size
) do |lock_key|
lock_acquirer_id, lock_host_id = rconn.call("HMGET", lock_key, "acq_id", "hst_id")

if acquirer_id == lock_acquirer_id && host_id == lock_host_id
locks_to_release << lock_key
end

if locks_to_release.size >= scan_size
rconn.call("DEL", *locks_to_release) # steep:ignore
released_keys_count += locks_to_release.size
locks_to_release.clear
end
end

released_keys_count += locks_to_release.size
rconn.call("DEL", *locks_to_release) if locks_to_release.any? # steep:ignore
end

released_keys_count
end
end
end
Loading

0 comments on commit 9def8a1

Please sign in to comment.