Skip to content

Commit

Permalink
[swarm-mode] full rework with an ability to init/start/stop swarm ele…
Browse files Browse the repository at this point in the history
…ments
  • Loading branch information
0exp committed Jun 22, 2024
1 parent 137e08f commit 04f516c
Show file tree
Hide file tree
Showing 10 changed files with 583 additions and 164 deletions.
43 changes: 27 additions & 16 deletions lib/redis_queued_locks/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,22 @@ class RedisQueuedLocks::Client
end
setting :probe_itself do
setting :enabled_for_swarm, true
setting :redis_config, {}
setting :redis_config do
setting :sentinel, false
setting :pooled, false
setting :config, {}
setting :pool_config, {}
end
setting :probe_period, 2 # NOTE: in seconds
end
setting :flush_zombies do
setting :enabled_for_swarm, true
setting :redis_config, {}
setting :lock_flushing, false
setting :lock_flushing_ttl, 5_000 # NOTE: in milliseconds
setting :redis_config do
setting :sentinel, false
setting :pooled, false
setting :config, {}
setting :pool_config, {}
end
setting :zombie_ttl, 15_000 # NOTE: in milliseconds
setting :zombie_lock_scan_size, 500
setting :zombie_queue_scan_size, 500
Expand All @@ -56,13 +64,19 @@ class RedisQueuedLocks::Client

validate('swarm.auto_swarm', :boolean)
validate('swarm.visor.check_period', :integer)

validate('swarm.probe_itself.enabled_for_swarm', :boolean)
validate('swarm.probe_itself.redis_config', :hash)
validate('swarm.probe_itself.redis_config.sentinel', :boolean)
validate('swarm.probe_itself.redis_config.pooled', :boolean)
validate('swarm.probe_itself.redis_config.config', :hash)
validate('swarm.probe_itself.redis_config.pool_config', :hash)
validate('swarm.probe_itself.probe_period', :integer)

validate('swarm.flush_zombies.enabled_for_swarm', :boolean)
validate('swarm.flush_zombies.redis_config', :hash)
validate('swarm.flush_zombies.lock_flushing', :boolean)
validate('swarm.flush_zombies.lock_flushing_ttl', :integer)
validate('swarm.flush_zombies.redis_config.sentinel', :boolean)
validate('swarm.flush_zombies.redis_config.pooled', :boolean)
validate('swarm.flush_zombies.redis_config.config', :hash)
validate('swarm.flush_zombies.redis_config.pool_config', :hash)
validate('swarm.flush_zombies.zombie_ttl', :integer)
validate('swarm.flush_zombies.zombie_lock_scan_size', :integer)
validate('swarm.flush_zombies.zombie_queue_scan_size', :integer)
Expand Down Expand Up @@ -137,7 +151,9 @@ def initialize(redis_client, &configs)
configure(&configs)
@uniq_identity = config[:uniq_identifier].call
@redis_client = redis_client
@swarm = RedisQueuedLocks::Swarm.new(self).tap { |s| s.swarm! if config[:swarm][:auto_swarm] }
@swarm = RedisQueuedLocks::Swarm.new(self).tap do |swarm|
swarm.swarm!(silently: true) if config[:swarm][:auto_swarm]
end
end

# @return [Hash<Symbol<Hash<Symbol,Boolean>>>]
Expand Down Expand Up @@ -176,24 +192,19 @@ def probe_itself
# @option zombie_ttl [Integer]
# @option lock_scan_size [Integer]
# @option queue_scan_size [Integer]
# @option lock_flushing [Boolean]
# @return [Hash<Symbol,Boolean|Array<String>|Set<String>>]
#
# @api public
# @since 1.9.0
def flush_zombies(
zombie_ttl: config[:swarm][:flush_zombies][:zombie_ttl],
lock_scan_size: config[:swarm][:flush_zombies][:zombie_lock_scan_size],
queue_scan_size: config[:swarm][:flush_zombies][:zombie_queue_scan_size],
lock_flushing: config[:swarm][:flush_zombies][:lock_flushing],
lock_flushing_ttl: config[:swarm][:flush_zombies][:lock_flushing_ttl]
queue_scan_size: config[:swarm][:flush_zombies][:zombie_queue_scan_size]
)
swarm.flush_zombies(
zombie_ttl:,
lock_scan_size:,
queue_scan_size:,
lock_flushing:,
lock_flushing_ttl:
queue_scan_size:
)
end

Expand Down
107 changes: 53 additions & 54 deletions lib/redis_queued_locks/swarm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
# @api private
# @since 1.9.0
class RedisQueuedLocks::Swarm
require_relative 'swarm/swarm_acquirers'
require_relative 'swarm/redis_client_builder'
require_relative 'swarm/super_visor'
require_relative 'swarm/acquirers'
require_relative 'swarm/swarm_element'
require_relative 'swarm/probe_itself'
require_relative 'swarm/flush_zombies'
Expand All @@ -14,32 +16,39 @@ class RedisQueuedLocks::Swarm
# @since 1.9.0
attr_reader :rql_client

# @return [Thread]
# @return [RedisQueuedLocks::Swarm::SuperVisor]
#
# @api private
# @since 1.9.0
attr_reader :swarm_visor
attr_reader :super_visor

# @return [Ractor]
# @return [RedisQueuedLocks::Swarm::ProbeItself]
#
# @api private
# @since 1.9.0
attr_reader :probe_itself_element

# @return [Ractor]
# @return [RedisQueuedLocks::Swarm::FlushZombies]
#
# @api private
# @since 1.9.0
attr_reader :flush_zombies_element

# @return [RedisQueuedLocks::Utilities::Lock]
#
# @api private
# @since 1.9.0
attr_reader :sync

# @param rql_client [RedisQueuedLocks::Client]
# @return [void]
#
# @api private
# @since 1.9.0
def initialize(rql_client)
@rql_client = rql_client
@swarm_visor = nil
@sync = RedisQueuedLocks::Utilities::Lock.new
@super_visor = RedisQueuedLocks::Swarm::SuperVisor.new(rql_client)
@probe_itself_element = RedisQueuedLocks::Swarm::ProbeItself.new(rql_client)
@flush_zombies_element = RedisQueuedLocks::Swarm::FlushZombies.new(rql_client)
end
Expand All @@ -49,32 +58,14 @@ def initialize(rql_client)
# @api public
# @since 1.9.0
def swarm_status
auto_swarm = rql_client.config[:swarm][:auto_swarm]
visor_running = swarm_visor != nil
visor_alive = swarm_visor != nil && swarm_visor.alive?
probe_itself_enabled = probe_itself_element.enabled?
probe_itself_alive = probe_itself_element.alive?
probe_itself_status = probe_itself_element.swarm_status
flush_zombies_enabled = flush_zombies_element.enabled?
flush_zombies_alive = flush_zombies_element.alive?

{
auto_swarm: auto_swarm,
visor: {
running: visor_running,
alive: visor_alive
},
probe_itself: {
enabled: probe_itself_enabled,
alive: probe_itself_alive,
main_loop: probe_itself_alive && probe_itself_element.swarm_status[:main_loop]
},
flush_zombies: {
enabled: flush_zombies_enabled,
alive: flush_zombies_alive,
main_loop: flush_zombies_alive && flush_zombies_element.swarm_status[:main_loop]
sync.synchronize do
{
auto_swarm: rql_client.config[:swarm][:auto_swarm],
super_visor: super_visor.status,
probe_itself: probe_itself_element.status,
flush_zombies: flush_zombies_element.status
}
}
end
end

# @option zombie_ttl [Integer]
Expand All @@ -83,7 +74,7 @@ def swarm_status
# @api public
# @since 1.9.0
def swarm_info(zombie_ttl: rql_client.config[:swarm][:flush_zombies][:zombie_ttl])
RedisQueuedLocks::Swarm::SwarmAcquirers.swarm_acquirers(
RedisQueuedLocks::Swarm::Acquirers.acquirers(
rql_client.redis_client,
zombie_ttl
)
Expand All @@ -109,7 +100,6 @@ def probe_itself
# @option zombie_ttl [Integer]
# @option lock_scan_size [Integer]
# @option queue_scan_size [Integer]
# @option lock_flushing [Boolean]
# @return [
# RedisQueuedLocks::Data[
# ok: <Boolean>,
Expand All @@ -123,42 +113,51 @@ def probe_itself
def flush_zombies(
zombie_ttl: rql_client.config[:swarm][:flush_zombies][:zombie_ttl],
lock_scan_size: rql_client.config[:swarm][:flush_zombies][:zombie_lock_scan_size],
queue_scan_size: rql_client.config[:swarm][:flush_zombies][:zombie_queue_scan_size],
lock_flushing: rql_client.config[:swarm][:flush_zombies][:lock_flushing],
lock_flushing_ttl: rql_client.config[:swarm][:flush_zombies][:lock_flushing_ttl]
queue_scan_size: rql_client.config[:swarm][:flush_zombies][:zombie_queue_scan_size]
)
RedisQueuedLocks::Swarm::FlushZombies.flush_zombies(
rql_client.redis_client,
zombie_ttl,
lock_scan_size,
queue_scan_size,
lock_flushing,
lock_flushing_ttl
queue_scan_size
)
end

# @return [Hash<Symbol<Hash<Symbol,Boolean>>>]
#
# @see RedisQueuedLocks::Swarm#swarm_status
# @option silently [Boolean]
# @return [void]
#
# @api public
# @since 1.9.0
def swarm!
# Step 1: start swarm elements
probe_itself_element.try_swarm!
flush_zombies_element.try_swarm!

# Step 2: start swarm element visor that should keep up swarm elements
if @swarm_visor == nil || !@swarm_visor.alive?
@swarm_visor = Thread.new do
loop do
def swarm!(silently: false)
sync.synchronize do
# Step 0:
# - stop the supervisor (kill internal observer objects if supervisor is alredy running);
super_visor.stop!

# Step 1:
# - initialize swarm elements and start their main loop;
probe_itself_element.try_swarm!
flush_zombies_element.try_swarm!

# Step 2:
# - run supercisor that should keep running created swarm elements and their main loops;
unless super_visor.running?
super_visor.observe! do
probe_itself_element.reswarm_if_dead!
flush_zombies_element.reswarm_if_dead!
sleep(rql_client.config[:swarm][:visor][:check_period])
end
end
end

RedisQueuedLocks::Data[ok: true, result: swarm_status]
# NOTE: need to give a little timespot to initialize ractor objects and their main loops;
sleep(0.1)

# NOTE:
# silently is used to prevent ractor blocking (invoked under the swarm status method)
# in auto-swarm mode when ractor and thread objects are instantiated emmideatly and that
# can lead to situation when the current thread will lock the receive/take message queue of
# some internal ractor element by a asynchronous Ractor#take invocations (it will lead to
# the infinite Ractor#take current process waiting);
RedisQueuedLocks::Data[ok: true, result: swarm_status] unless silently
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# @api private
# @since 1.9.0
module RedisQueuedLocks::Swarm::SwarmAcquirers
module RedisQueuedLocks::Swarm::Acquirers
class << self
# Returns the list of swarm acquirers are stored as a hash represented in following format:
# {
Expand All @@ -22,10 +22,10 @@ class << self
#
# @api private
# @since 1.9.0
def swarm_acquirers(redis_client, zombie_ttl)
def acquirers(redis_client, zombie_ttl)
redis_client.with do |rconn|
rconn.call('HGETALL', RedisQueuedLocks::Resource::SWARM_KEY).tap do |acquirers|
acquirers.transform_values! do |last_probe|
rconn.call('HGETALL', RedisQueuedLocks::Resource::SWARM_KEY).tap do |swarm_acqs|
swarm_acqs.transform_values! do |last_probe|
last_probe_score = last_probe.to_f
last_probe_time = Time.at(last_probe_score)
zombie_score = RedisQueuedLocks::Resource.calc_zombie_score(zombie_ttl / 1_000)
Expand Down
Loading

0 comments on commit 04f516c

Please sign in to comment.