diff --git a/lib/redis_queued_locks/client.rb b/lib/redis_queued_locks/client.rb index 63ba8ac..e52e727 100644 --- a/lib/redis_queued_locks/client.rb +++ b/lib/redis_queued_locks/client.rb @@ -39,14 +39,22 @@ class RedisQueuedLocks::Client end setting :probe_itself do setting :enabled_for_swarm, true - setting :redis_config, {} + setting :redis_config do + setting :sentinel, false + setting :pooled, false + setting :config, {} + setting :pool_config, {} + end setting :probe_period, 2 # NOTE: in seconds end setting :flush_zombies do setting :enabled_for_swarm, true - setting :redis_config, {} - setting :lock_flushing, false - setting :lock_flushing_ttl, 5_000 # NOTE: in milliseconds + setting :redis_config do + setting :sentinel, false + setting :pooled, false + setting :config, {} + setting :pool_config, {} + end setting :zombie_ttl, 15_000 # NOTE: in milliseconds setting :zombie_lock_scan_size, 500 setting :zombie_queue_scan_size, 500 @@ -56,13 +64,19 @@ class RedisQueuedLocks::Client validate('swarm.auto_swarm', :boolean) validate('swarm.visor.check_period', :integer) + validate('swarm.probe_itself.enabled_for_swarm', :boolean) - validate('swarm.probe_itself.redis_config', :hash) + validate('swarm.probe_itself.redis_config.sentinel', :boolean) + validate('swarm.probe_itself.redis_config.pooled', :boolean) + validate('swarm.probe_itself.redis_config.config', :hash) + validate('swarm.probe_itself.redis_config.pool_config', :hash) validate('swarm.probe_itself.probe_period', :integer) + validate('swarm.flush_zombies.enabled_for_swarm', :boolean) - validate('swarm.flush_zombies.redis_config', :hash) - validate('swarm.flush_zombies.lock_flushing', :boolean) - validate('swarm.flush_zombies.lock_flushing_ttl', :integer) + validate('swarm.flush_zombies.redis_config.sentinel', :boolean) + validate('swarm.flush_zombies.redis_config.pooled', :boolean) + validate('swarm.flush_zombies.redis_config.config', :hash) + validate('swarm.flush_zombies.redis_config.pool_config', :hash) validate('swarm.flush_zombies.zombie_ttl', :integer) validate('swarm.flush_zombies.zombie_lock_scan_size', :integer) validate('swarm.flush_zombies.zombie_queue_scan_size', :integer) @@ -137,7 +151,9 @@ def initialize(redis_client, &configs) configure(&configs) @uniq_identity = config[:uniq_identifier].call @redis_client = redis_client - @swarm = RedisQueuedLocks::Swarm.new(self).tap { |s| s.swarm! if config[:swarm][:auto_swarm] } + @swarm = RedisQueuedLocks::Swarm.new(self).tap do |swarm| + swarm.swarm!(silently: true) if config[:swarm][:auto_swarm] + end end # @return [Hash>>] @@ -176,7 +192,6 @@ def probe_itself # @option zombie_ttl [Integer] # @option lock_scan_size [Integer] # @option queue_scan_size [Integer] - # @option lock_flushing [Boolean] # @return [Hash|Set>] # # @api public @@ -184,16 +199,12 @@ def probe_itself def flush_zombies( zombie_ttl: config[:swarm][:flush_zombies][:zombie_ttl], lock_scan_size: config[:swarm][:flush_zombies][:zombie_lock_scan_size], - queue_scan_size: config[:swarm][:flush_zombies][:zombie_queue_scan_size], - lock_flushing: config[:swarm][:flush_zombies][:lock_flushing], - lock_flushing_ttl: config[:swarm][:flush_zombies][:lock_flushing_ttl] + queue_scan_size: config[:swarm][:flush_zombies][:zombie_queue_scan_size] ) swarm.flush_zombies( zombie_ttl:, lock_scan_size:, - queue_scan_size:, - lock_flushing:, - lock_flushing_ttl: + queue_scan_size: ) end diff --git a/lib/redis_queued_locks/swarm.rb b/lib/redis_queued_locks/swarm.rb index a125304..1aea559 100644 --- a/lib/redis_queued_locks/swarm.rb +++ b/lib/redis_queued_locks/swarm.rb @@ -3,7 +3,9 @@ # @api private # @since 1.9.0 class RedisQueuedLocks::Swarm - require_relative 'swarm/swarm_acquirers' + require_relative 'swarm/redis_client_builder' + require_relative 'swarm/super_visor' + require_relative 'swarm/acquirers' require_relative 'swarm/swarm_element' require_relative 'swarm/probe_itself' require_relative 'swarm/flush_zombies' @@ -14,24 +16,30 @@ class RedisQueuedLocks::Swarm # @since 1.9.0 attr_reader :rql_client - # @return [Thread] + # @return [RedisQueuedLocks::Swarm::SuperVisor] # # @api private # @since 1.9.0 - attr_reader :swarm_visor + attr_reader :super_visor - # @return [Ractor] + # @return [RedisQueuedLocks::Swarm::ProbeItself] # # @api private # @since 1.9.0 attr_reader :probe_itself_element - # @return [Ractor] + # @return [RedisQueuedLocks::Swarm::FlushZombies] # # @api private # @since 1.9.0 attr_reader :flush_zombies_element + # @return [RedisQueuedLocks::Utilities::Lock] + # + # @api private + # @since 1.9.0 + attr_reader :sync + # @param rql_client [RedisQueuedLocks::Client] # @return [void] # @@ -39,7 +47,8 @@ class RedisQueuedLocks::Swarm # @since 1.9.0 def initialize(rql_client) @rql_client = rql_client - @swarm_visor = nil + @sync = RedisQueuedLocks::Utilities::Lock.new + @super_visor = RedisQueuedLocks::Swarm::SuperVisor.new(rql_client) @probe_itself_element = RedisQueuedLocks::Swarm::ProbeItself.new(rql_client) @flush_zombies_element = RedisQueuedLocks::Swarm::FlushZombies.new(rql_client) end @@ -49,32 +58,14 @@ def initialize(rql_client) # @api public # @since 1.9.0 def swarm_status - auto_swarm = rql_client.config[:swarm][:auto_swarm] - visor_running = swarm_visor != nil - visor_alive = swarm_visor != nil && swarm_visor.alive? - probe_itself_enabled = probe_itself_element.enabled? - probe_itself_alive = probe_itself_element.alive? - probe_itself_status = probe_itself_element.swarm_status - flush_zombies_enabled = flush_zombies_element.enabled? - flush_zombies_alive = flush_zombies_element.alive? - - { - auto_swarm: auto_swarm, - visor: { - running: visor_running, - alive: visor_alive - }, - probe_itself: { - enabled: probe_itself_enabled, - alive: probe_itself_alive, - main_loop: probe_itself_alive && probe_itself_element.swarm_status[:main_loop] - }, - flush_zombies: { - enabled: flush_zombies_enabled, - alive: flush_zombies_alive, - main_loop: flush_zombies_alive && flush_zombies_element.swarm_status[:main_loop] + sync.synchronize do + { + auto_swarm: rql_client.config[:swarm][:auto_swarm], + super_visor: super_visor.status, + probe_itself: probe_itself_element.status, + flush_zombies: flush_zombies_element.status } - } + end end # @option zombie_ttl [Integer] @@ -83,7 +74,7 @@ def swarm_status # @api public # @since 1.9.0 def swarm_info(zombie_ttl: rql_client.config[:swarm][:flush_zombies][:zombie_ttl]) - RedisQueuedLocks::Swarm::SwarmAcquirers.swarm_acquirers( + RedisQueuedLocks::Swarm::Acquirers.acquirers( rql_client.redis_client, zombie_ttl ) @@ -109,7 +100,6 @@ def probe_itself # @option zombie_ttl [Integer] # @option lock_scan_size [Integer] # @option queue_scan_size [Integer] - # @option lock_flushing [Boolean] # @return [ # RedisQueuedLocks::Data[ # ok: , @@ -123,42 +113,51 @@ def probe_itself def flush_zombies( zombie_ttl: rql_client.config[:swarm][:flush_zombies][:zombie_ttl], lock_scan_size: rql_client.config[:swarm][:flush_zombies][:zombie_lock_scan_size], - queue_scan_size: rql_client.config[:swarm][:flush_zombies][:zombie_queue_scan_size], - lock_flushing: rql_client.config[:swarm][:flush_zombies][:lock_flushing], - lock_flushing_ttl: rql_client.config[:swarm][:flush_zombies][:lock_flushing_ttl] + queue_scan_size: rql_client.config[:swarm][:flush_zombies][:zombie_queue_scan_size] ) RedisQueuedLocks::Swarm::FlushZombies.flush_zombies( rql_client.redis_client, zombie_ttl, lock_scan_size, - queue_scan_size, - lock_flushing, - lock_flushing_ttl + queue_scan_size ) end - # @return [Hash>>] - # - # @see RedisQueuedLocks::Swarm#swarm_status + # @option silently [Boolean] + # @return [void] # # @api public # @since 1.9.0 - def swarm! - # Step 1: start swarm elements - probe_itself_element.try_swarm! - flush_zombies_element.try_swarm! - - # Step 2: start swarm element visor that should keep up swarm elements - if @swarm_visor == nil || !@swarm_visor.alive? - @swarm_visor = Thread.new do - loop do + def swarm!(silently: false) + sync.synchronize do + # Step 0: + # - stop the supervisor (kill internal observer objects if supervisor is alredy running); + super_visor.stop! + + # Step 1: + # - initialize swarm elements and start their main loop; + probe_itself_element.try_swarm! + flush_zombies_element.try_swarm! + + # Step 2: + # - run supercisor that should keep running created swarm elements and their main loops; + unless super_visor.running? + super_visor.observe! do probe_itself_element.reswarm_if_dead! flush_zombies_element.reswarm_if_dead! - sleep(rql_client.config[:swarm][:visor][:check_period]) end end - end - RedisQueuedLocks::Data[ok: true, result: swarm_status] + # NOTE: need to give a little timespot to initialize ractor objects and their main loops; + sleep(0.1) + + # NOTE: + # silently is used to prevent ractor blocking (invoked under the swarm status method) + # in auto-swarm mode when ractor and thread objects are instantiated emmideatly and that + # can lead to situation when the current thread will lock the receive/take message queue of + # some internal ractor element by a asynchronous Ractor#take invocations (it will lead to + # the infinite Ractor#take current process waiting); + RedisQueuedLocks::Data[ok: true, result: swarm_status] unless silently + end end end diff --git a/lib/redis_queued_locks/swarm/swarm_acquirers.rb b/lib/redis_queued_locks/swarm/acquirers.rb similarity index 87% rename from lib/redis_queued_locks/swarm/swarm_acquirers.rb rename to lib/redis_queued_locks/swarm/acquirers.rb index f2f0512..032cd4c 100644 --- a/lib/redis_queued_locks/swarm/swarm_acquirers.rb +++ b/lib/redis_queued_locks/swarm/acquirers.rb @@ -2,7 +2,7 @@ # @api private # @since 1.9.0 -module RedisQueuedLocks::Swarm::SwarmAcquirers +module RedisQueuedLocks::Swarm::Acquirers class << self # Returns the list of swarm acquirers are stored as a hash represented in following format: # { @@ -22,10 +22,10 @@ class << self # # @api private # @since 1.9.0 - def swarm_acquirers(redis_client, zombie_ttl) + def acquirers(redis_client, zombie_ttl) redis_client.with do |rconn| - rconn.call('HGETALL', RedisQueuedLocks::Resource::SWARM_KEY).tap do |acquirers| - acquirers.transform_values! do |last_probe| + rconn.call('HGETALL', RedisQueuedLocks::Resource::SWARM_KEY).tap do |swarm_acqs| + swarm_acqs.transform_values! do |last_probe| last_probe_score = last_probe.to_f last_probe_time = Time.at(last_probe_score) zombie_score = RedisQueuedLocks::Resource.calc_zombie_score(zombie_ttl / 1_000) diff --git a/lib/redis_queued_locks/swarm/flush_zombies.rb b/lib/redis_queued_locks/swarm/flush_zombies.rb index 3ff0f08..8b5bb07 100644 --- a/lib/redis_queued_locks/swarm/flush_zombies.rb +++ b/lib/redis_queued_locks/swarm/flush_zombies.rb @@ -8,12 +8,10 @@ class << self # @parma zombie_ttl [Numeric] # @param lock_scan_size [Integer] # @param queue_scan_size [Integer] - # @param lock_flushing [Boolean] - # @param lock_flushing_ttl [Integer] # @return [ # RedisQueuedLocks::Data[ # ok: , - # delete_zombies: >, + # deleted_zombies: >, # deleted_zombie_locks: > # ] # ] @@ -25,9 +23,7 @@ def flush_zombies( redis_client, zombie_ttl, lock_scan_size, - queue_scan_size, - lock_flushing, - lock_flushing_ttl + queue_scan_size ) # Step 1: # calculate zombie score (the time marker that shows acquirers that @@ -43,7 +39,7 @@ def flush_zombies( # Step X: exit if we have no any zombie acquirer return RedisQueuedLocks::Data[ ok: true, - delete_zombies: [], + deleted_zombies: [], deleted_zombie_locks: [], ] if zombie_acquirers.empty? @@ -78,11 +74,47 @@ def flush_zombies( # Step 6: inform about deleted zombies RedisQueuedLocks::Data[ ok: true, - delete_zombies: zombie_acquirers, + deleted_zombies: zombie_acquirers, deleted_zombie_locks: zombie_locks ] end # rubocop:enable Metrics/MethodLength + + # @param redis_config [Hash] + # @param zombie_ttl [Integer] + # @param zombie_lock_scan_size [Integer] + # @param zombie_queue_scan_size [Integer] + # @param zombie_flush_period [Numeric] + # @return [Thread] + # + # @api private + # @since 1.9.0 + def spawn_main_loop( + redis_config, + zombie_ttl, + zombie_lock_scan_size, + zombie_queue_scan_size, + zombie_flush_period + ) + Thread.new do + redis_client = RedisQueuedLocks::Swarm::RedisClientBuilder.build( + pooled: redis_config['pooled'], + sentinel: redis_config['sentinel'], + config: redis_config['config'], + pool_config: redis_config['pool_config'] + ) + + loop do + RedisQueuedLocks::Swarm::FlushZombies.flush_zombies( + redis_client, + zombie_ttl, + zombie_lock_scan_size, + zombie_queue_scan_size + ) + sleep(zombie_flush_period) + end + end + end end # @return [Boolean] @@ -93,40 +125,28 @@ def enabled? rql_client.config[:swarm][:flush_zombies][:enabled_for_swarm] end + # Swarm element lifecycle: + # => 1) init (swarm!): create a ractor, main loop is not started; + # => 2) start (start!): run main lopp inside the ractor; + # => 3) stop (stop!): stop the main loop inside a ractor; + # => 4) kill (kill!): kill the main loop inside teh ractor and kill a ractor; + # # @return [void] # # @api private # @since 1.9.0 def swarm! @swarm_element = Ractor.new( - rql_client.config[:swarm][:flush_zombies][:redis_config], + rql_client.config.slice_value('swarm.flush_zombies.redis_config'), rql_client.config[:swarm][:flush_zombies][:zombie_ttl], rql_client.config[:swarm][:flush_zombies][:zombie_lock_scan_size], rql_client.config[:swarm][:flush_zombies][:zombie_queue_scan_size], - rql_client.config[:swarm][:flush_zombies][:zombie_flush_period], - rql_client.config[:swarm][:flush_zombies][:lock_flushing], - rql_client.config[:swarm][:flush_zombies][:lock_flushing_ttl] - ) do |rc, z_ttl, lss, qss, fl_prd, l_fl, l_fl_ttl| - thrd = nil - - loop do - command = Ractor.receive - case command - when :status - Ractor.yield({ main_loop: { alive: thrd.alive? } }) - when :run - thrd.kill if thrd != nil - thrd = Thread.new do - rcl = RedisClient.config(**rc).new_client - loop do - RedisQueuedLocks::Swarm::FlushZombies.flush_zombies(rcl, z_ttl, lss, qss, l_fl, l_fl_ttl) - sleep(fl_prd) - end - end - when :stop - thrd.kill - exit - end + rql_client.config[:swarm][:flush_zombies][:zombie_flush_period] + ) do |rc, z_ttl, z_lss, z_qss, z_fl_prd| + RedisQueuedLocks::Swarm::FlushZombies.swarm_loop do + RedisQueuedLocks::Swarm::FlushZombies.spawn_main_loop( + rc, z_ttl, z_lss, z_qss, z_fl_prd + ) end end end diff --git a/lib/redis_queued_locks/swarm/probe_itself.rb b/lib/redis_queued_locks/swarm/probe_itself.rb index efcb912..302687f 100644 --- a/lib/redis_queued_locks/swarm/probe_itself.rb +++ b/lib/redis_queued_locks/swarm/probe_itself.rb @@ -17,14 +17,39 @@ class << self # @api private # @since 1.9.0 def probe_itself(redis_client, acquirer_id) - redis_client.call( - 'HSET', - RedisQueuedLocks::Resource::SWARM_KEY, - acquirer_id, - probe_score = Time.now.to_f - ) + redis_client.with do |rconn| + rconn.call( + 'HSET', + RedisQueuedLocks::Resource::SWARM_KEY, + acquirer_id, + probe_score = Time.now.to_f + ) - RedisQueuedLocks::Data[ok: true, acq_id: acquirer_id, probe_score:] + RedisQueuedLocks::Data[ok: true, acq_id: acquirer_id, probe_score:] + end + end + + # @param redis_config [Hash] + # @param acquirer_id [String] + # @param probe_period [Integer] + # @return [Thread] + # + # @api private + # @since 1.9.0 + def spawn_main_loop(redis_config, acquirer_id, probe_period) + Thread.new do + redis_client = RedisQueuedLocks::Swarm::RedisClientBuilder.build( + pooled: redis_config['pooled'], + sentinel: redis_config['sentinel'], + config: redis_config['config'], + pool_config: redis_config['pool_config'] + ) + + loop do + RedisQueuedLocks::Swarm::ProbeItself.probe_itself(redis_client, acquirer_id) + sleep(probe_period) + end + end end end @@ -42,38 +67,12 @@ def enabled? # @since 1.9.0 def swarm! @swarm_element = Ractor.new( - rql_client.config[:swarm][:probe_itself][:redis_config], + rql_client.config.slice_value('swarm.probe_itself.redis_config'), rql_client.current_acquier_id, - rql_client.config['swarm.probe_itself.probe_period'] + rql_client.config[:swarm][:probe_itself][:probe_period] ) do |rc, acq_id, prb_prd| - thrd = Thread.new do - rcl = RedisClient.config(**rc).new_client - - loop do - RedisQueuedLocks::Swarm::ProbeItself.probe_itself(rcl, acq_id) - sleep(prb_prd) - end - end - - loop do - command = Ractor.receive - case command - when :status - Ractor.yield({ main_loop: { alive: thrd.alive? } }) - when :restart - thrd.kill - thrd = Thread.new do - rcl = RedisClient.config(**rc).new_client - - loop do - RedisQueuedLocks::Swarm::ProbeItself.probe_itself(rcl, acq_id) - sleep(prb_prd) - end - end - when :stop - thrd.kill - exit - end + RedisQueuedLocks::Swarm::ProbeItself.swarm_loop do + RedisQueuedLocks::Swarm::ProbeItself.spawn_main_loop(rc, acq_id, prb_prd) end end end diff --git a/lib/redis_queued_locks/swarm/redis_client_builder.rb b/lib/redis_queued_locks/swarm/redis_client_builder.rb new file mode 100644 index 0000000..5d965f1 --- /dev/null +++ b/lib/redis_queued_locks/swarm/redis_client_builder.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +# @api private +# @since 1.9.0 +module RedisQueuedLocks::Swarm::RedisClientBuilder + class << self + # @option pooled [Boolean] + # @option sentinel [Boolean] + # @option config [Hash] + # @return [RedisClient] + # + # @api private + # @since 1.9.0 + # rubocop:disable Style/RedundantAssignment + def build(pooled: false, sentinel: false, config: {}, pool_config: {}) + config.transform_keys!(&:to_sym) + pool_config.transform_keys!(&:to_sym) + + redis_config = + sentinel ? sentinel_config(config) : non_sentinel_config(config) + redis_client = + pooled ? pooled_client(redis_config, pool_config) : non_pooled_client(redis_config) + + redis_client + end + # rubocop:enable Style/RedundantAssignment + + private + + # @param config [Hash] + # @return [RedisClient::Config] + # + # @api private + # @since 1.9.0 + def sentinel_config(config) + RedisClient.sentinel(**config) + end + + # @param config [Hash] + # @return [RedisClient::Config] + # + # @api private + # @since 1.9.0 + def non_sentinel_config(config) + RedisClient.config(**config) + end + + # @param redis_config [ReidsClient::Config] + # @param pool_config [Hash] + # @return [RedisClient] + # + # @api private + # @since 1.9.0 + def pooled_client(redis_config, pool_config) + redis_config.new_pool(**pool_config) + end + + # @param redis_config [ReidsClient::Config] + # @return [RedisClient] + # + # @api private + # @since 1.9.0 + def non_pooled_client(redis_config) + redis_config.new_client + end + end +end diff --git a/lib/redis_queued_locks/swarm/super_visor.rb b/lib/redis_queued_locks/swarm/super_visor.rb new file mode 100644 index 0000000..b104673 --- /dev/null +++ b/lib/redis_queued_locks/swarm/super_visor.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +# @api private +# @since 1.9.0 +class RedisQueuedLocks::Swarm::SuperVisor + # @since 1.9.0 + include RedisQueuedLocks::Utilities + + # @return [RedisQueuedLocks::Client] + # + # @api private + # @since 1.9.0 + attr_reader :rql_client + + # @return [Thread,NilClass] + # + # @api private + # @since 1.9.0 + attr_reader :visor + + # @return [Proc,NilClass] + # + # @api private + # @since 1.9.0 + attr_reader :observable + + # @return [void] + # + # @api private + # @since 1.9.0 + def initialize(rql_client) + @rql_client = rql_client + @visor = nil + @observable = nil + end + + # @param observable [Block] + # @return [void] + # + # @api private + # @since 1.9.0 + def observe!(&observable) + @observable = observable + @visor = Thread.new do + loop do + yield # TODO: error handling + sleep(rql_client.config[:swarm][:visor][:check_period]) + end + end + + # NOTE: need to give a timespot to initialize ractor objects and their main loops; + sleep(0.1) + end + + # @return [Boolean] + # + # @api private + # @since 1.9.0 + def running? + visor != nil && visor.alive? + end + + # @return [void] + # + # @api private + # @since 1.9.0 + def stop! + visor.kill if running? + @visor = nil + @observable = nil + end + + # @return [Hash>] + # + # @api private + # @since 1.9.0 + def status + { + running: running?, + state: (visor == nil) ? 'non_initialized' : thread_state(visor), + observable: (observable == nil) ? 'non_initialized' : 'initialized' + } + end +end diff --git a/lib/redis_queued_locks/swarm/swarm_element.rb b/lib/redis_queued_locks/swarm/swarm_element.rb index 46cd2c8..da56884 100644 --- a/lib/redis_queued_locks/swarm/swarm_element.rb +++ b/lib/redis_queued_locks/swarm/swarm_element.rb @@ -2,20 +2,29 @@ # @api private # @since 1.9.0 +# rubocop:disable Metrics/ClassLength class RedisQueuedLocks::Swarm::SwarmElement + # @since 1.9.0 + include RedisQueuedLocks::Utilities + # @return [RedisQueuedLocks::Client] # # @api private # @since 1.9.0 attr_reader :rql_client - # @return [NilClass,Ractor] + # @return [Ractor,NilClass] # # @api private # @since 1.9.0 attr_reader :swarm_element - # @param rql_client [RedisQueuedLocks::Client] + # @return [RedisQueuedLocks::Utilities::Lock] + # + # @api private + # @since 1.9.0 + attr_reader :sync + # @return [void] # # @api private @@ -23,6 +32,7 @@ class RedisQueuedLocks::Swarm::SwarmElement def initialize(rql_client) @rql_client = rql_client @swarm_element = nil + @sync = RedisQueuedLocks::Utilities::Lock.new end # @return [void] @@ -30,8 +40,13 @@ def initialize(rql_client) # @api private # @since 1.9.0 def try_swarm! - return if alive? - swarm! && start! if enabled? + return unless enabled? + + sync.synchronize do + swarm_loop__kill + swarm! + swarm_loop__start + end end # @return [void] @@ -39,7 +54,16 @@ def try_swarm! # @api private # @since 1.9.0 def reswarm_if_dead! - try_swarm! unless alive? + return unless enabled? + + sync.synchronize do + if swarmed__stopped? + swarm_loop__start + elsif swarmed__dead? || idle? + swarm! + swarm_loop__start + end + end end # @return [Boolean] @@ -47,48 +71,194 @@ def reswarm_if_dead! # @api private # @since 1.9.0 def enabled? - # NOTE: check configs for the correspondng swarm element + # NOTE: provde an logic here by analyzing the redis queued locks config. end - # @return [Boolean] + # @return [Hash>] # # @api private # @since 1.9.0 - def alive? - swarm_element != nil && RedisQueuedLocks::Utilities.ractor_alive?(swarm_element) + def status + sync.synchronize do + ractor_running = swarmed__alive? + ractor_state = begin + # TODO: error handling; + swarmed? ? ractor_status(swarm_element) : 'non_initialized' + end + + main_loop_running = swarmed__running? + main_loop_state = begin + # TODO: error handling; + main_loop_running ? swarm_loop__status[:main_loop][:state] : 'non_initialized' + end + + { + enabled: enabled?, + ractor: { + running: ractor_running, + state: ractor_state + }, + main_loop: { + running: main_loop_running, + state: main_loop_state + } + } + end end - # @return [Hash>] + # Swarm element lifecycle should have the following scheme: + # => 1) init (swarm!): create a ractor, main loop is not started; + # => 2) start (swarm_loop__start!): run main lopp inside the ractor; + # => 3) stop (swarm_loop__stop!): stop the main loop inside a ractor; + # => 4) kill (swarm_loop__kill!): kill the main loop inside teh ractor and kill a ractor; + # + # @return [void] # # @api private # @since 1.9.0 - def swarm_status - swarm_element.send(:status).take + def swarm! + # IMPORTANT №1: initialize @swarm_element here with Ractor; + # IMPORTANT №2: your Ractor should invoke .swarm_loop inside (see below); + # IMPORTANT №3: you should provde the main_loop_logic to .swarm_loop as a block; end + # NOTE: + # This self-related part of code is placed here in order to provide better code readability + # (it is placed next to the method inside wich it should be called (see #swarm!)). + # + # @param main_loop_spawner [Block] # @return [void] # # @api private # @since 1.9.0 - def run! - swarm_element.send(:start) + # rubocop:disable Layout/ClassStructure, Metrics/MethodLength + def self.swarm_loop(&main_loop_spawner) + main_loop = nil + + loop do + command = Ractor.receive + + case command + when :status + main_loop_alive = main_loop != nil && main_loop.alive? + main_loop_state = + if main_loop == nil + 'non_initialized' + else + RedisQueuedLocks::Utilities.thread_state(main_loop) + end + Ractor.yield({ + main_loop: { + alive: main_loop_alive, + state: main_loop_state + } + }) + when :is_active + Ractor.yield(main_loop != nil && main_loop.alive?) + when :start + main_loop.kill unless main_loop == nil + main_loop = yield # NOTE: => main_loop_spawner.call + when :stop + main_loop.kill unless main_loop == nil + when :kill + main_loop.kill unless main_loop == nil + exit + end + end + end + # rubocop:enable Layout/ClassStructure, Metrics/MethodLength + + # @return [Boolean] + # + # @api private + # @since 1.9.0 + def idle? + swarm_element == nil + end + + # @return [Boolean] + # + # @api private + # @since 1.9.0 + def swarmed? + swarm_element != nil + end + + # @return [Boolean] + # + # @api private + # @since 1.9.0 + def swarmed__alive? + swarm_element != nil && ractor_alive?(swarm_element) + end + + # @return [Boolean] + # + # @api private + # @since 1.9.0 + def swarmed__dead? + swarm_element != nil && !ractor_alive?(swarm_element) + end + + # @return [Boolean] + # + # @api private + # @since 1.9.0 + def swarmed__running? + swarm_element != nil && ractor_alive?(swarm_element) && swarm_loop__is_active + end + + # @return [Boolean] + # + # @api private + # @since 1.9.0 + def swarmed__stopped? + swarm_element != nil && ractor_alive?(swarm_element) && !swarm_loop__is_active + end + + # @return [Boolean] + # + # @api private + # @since 1.9.0 + def swarm_loop__is_active + return if idle? || swarmed__dead? + sync.synchronize { swarm_element.send(:is_active).take } + end + + # @return [Hash] + # + # @api private + # @since 1.9.0 + def swarm_loop__status + return if idle? || swarmed__dead? + sync.synchronize { swarm_element.send(:status).take } end # @return [void] # # @api private # @since 1.9.0 - def stop! - swarm_element.send(:stop) + def swarm_loop__start + return if idle? || swarmed__dead? + sync.synchronize { swarm_element.send(:start) } end - private + # @return [void] + # + # @api private + # @since 1.9.0 + def swarm_loop__pause + return if idle? || swarmed__dead? + sync.synchronize { swarm_element.send(:stop) } + end # @return [void] # # @api private # @since 1.9.0 - def swarm! - # NOTE: Initialize @swarm_element here with a Ractor object + def swarm_loop__kill + return if idle? || swarmed__dead? + sync.synchronize { swarm_element.send(:kill) } end end +# rubocop:enable Metrics/ClassLength diff --git a/lib/redis_queued_locks/utilities.rb b/lib/redis_queued_locks/utilities.rb index f726d51..8c75ddc 100644 --- a/lib/redis_queued_locks/utilities.rb +++ b/lib/redis_queued_locks/utilities.rb @@ -3,6 +3,8 @@ # @api private # @since 1.0.0 module RedisQueuedLocks::Utilities + require_relative 'utilities/lock' + module_function # Ractor class has no methods for Ractor object status identification. @@ -17,7 +19,16 @@ module RedisQueuedLocks::Utilities # # @api private # @since 1.9.0 - RACTOR_LIVENESS_PATTERN = /\A.*?(created|running|blocking).*?\z/ + RACTOR_LIVENESS_PATTERN = /\A.*?(created|running|blocking).*?\z/i + + # Ractor status as a string extracted from the object string representation. + # This way is used cuz the ractor class has no any detailed status extraction API. + # + # @return [Regexp] + # + # @api private + # @since 1.9.0 + RACTOR_STATUS_PATTERN = /\A.*?\s(?\w+)>\z/i # @param block [Block] # @return [Any] @@ -36,4 +47,40 @@ def run_non_critical(&block) def ractor_alive?(ractor) ractor.to_s.match?(RACTOR_LIVENESS_PATTERN) end + + # @param ractor [Ractor] + # @return [String] + # + # @api private + # @since 1.9.0 + def ractor_status(ractor) + ractor.to_s.match(RACTOR_STATUS_PATTERN)[:status] + end + + # Returns the status of the passed thread object. + # Possible thread statuses: + # - "run" (thread is executing); + # - "sleep" (thread is sleeping or waiting on I/O); + # - "aborting" (thread is aborting) + # - "dead" (thread is terminated normally); + # - "failed" (thread is terminated with an exception); + # See Thread#status official documentation. + # + # @param [Thread] + # @return [String] + # + # @api private + # @since 1.9.0 + def thread_state(thread) + status = thread.status + + case + when status == false + 'dead' + when status == nil + 'failed' + else + status + end + end end diff --git a/lib/redis_queued_locks/utilities/lock.rb b/lib/redis_queued_locks/utilities/lock.rb new file mode 100644 index 0000000..68d8a27 --- /dev/null +++ b/lib/redis_queued_locks/utilities/lock.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +# @api private +# @since 1.9.0 +class RedisQueuedLocks::Utilities::Lock + # @return [void] + # + # @api private + # @since 1.9.0 + def initialize + @lock = ::Mutex.new + end + + # @param block [Block] + # @return [Any] + # + # @api private + # @since 1.9.0 + def synchronize(&block) + @lock.owned? ? yield : @lock.synchronize(&block) + end +end