Skip to content

Commit

Permalink
[swarm] (checking specs) + [host_ids] #possible_host_ids
Browse files Browse the repository at this point in the history
  • Loading branch information
0exp committed Jul 14, 2024
1 parent 264ffd2 commit 5de0952
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 13 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
- (changelog draft) (instrumentation: added `hst_id` field);
- (changelog draft) (added **hst_id** to `#lock_info` / `#lock_data` / `#locks_info` method results);
- (changelog draft) (`#current_host_id`);
- (changelog draft) (`#possible_host_ids`);
- (changelog draft) (added **hst_id** to `RedisQueuedLocks::TimedLockTimeoutError` error message);
- (changelog draft) (an ability to mark any loggable/instrumentable method as sampled for instrumentation/logging despite of the enabled instrumentation/log sampling;
- (changelog draft) (an ability to mark any loggable/instrumentable method as sampled for instrumentation/logging despite of the enabled instrumentation/log sampling);

```ruby
daiver => ~/Projects/redis_queued_locks  master [$]
Expand Down
13 changes: 12 additions & 1 deletion lib/redis_queued_locks/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class RedisQueuedLocks::Client
setting :swarm do
setting :auto_swarm, false
setting :supervisor do
setting :liveness_probing_period, 2 # in seconds
setting :liveness_probing_period, 2 # NOTE: in seconds
end
setting :probe_hosts do
setting :enabled_for_swarm, true
Expand Down Expand Up @@ -697,6 +697,17 @@ def current_host_id(
)
end

# Return the list of possible host identifiers that can be reached from the current ractor.
#
# @param identity [String] Unique identity (RedisQueuedLocks::Client#uniq_identity by default)
# @return [Array<String>]
#
# @api public
# @since 1.9.0
def possible_host_ids(identity = uniq_identity)
RedisQueuedLocks::Resource.possible_host_identifiers(identity)
end

# This method is non-atomic cuz redis does not provide an atomic function for TTL/PTTL extension.
# So the methid is spliited into the two commands:
# (1) read current pttl
Expand Down
18 changes: 9 additions & 9 deletions lib/redis_queued_locks/resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,17 @@ def possible_host_identifiers(identity)
# object space API (or super memory-expensive) so host identification works without fibers;
# NOTE №3: we still can extract thread objects via Thread.list API;
current_process_id = get_process_id
current_threads = ::Thread.list
current_ractor = ::Ractor.current

[].tap do |acquiers|
::Thread.list.each do |thread|
::ObjectSpace.each_object(::Ractor) do |ractor|
acquiers << host_identifier(
current_process_id,
thread.object_id,
ractor.object_id,
identity
)
end
current_threads.each do |thread|
acquiers << host_identifier(
current_process_id,
thread.object_id,
current_ractor.object_id,
identity
)
end
end
end
Expand Down
128 changes: 126 additions & 2 deletions spec/redis_queued_locks_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
end

describe 'swarm' do
specify 'supervisor keeps the swarm elements up and working' do

end

specify 'swarm_status / swarm_info' do
aggregate_failures 'non-auto-swarmed => swarm is not initialized' do
client = RedisQueuedLocks::Client.new(redis) do |conf|
Expand Down Expand Up @@ -96,9 +100,72 @@
})
})
end

aggregate_failures 'swarmed => swarm info => probes and zombie status' do
client = RedisQueuedLocks::Client.new(redis) do |config|
config.swarm.auto_swarm = true
config.swarm.probe_hosts.probe_period = 2
config.swarm.probe_hosts.enabled_for_swarm = true
config.swarm.flush_zombies.enabled_for_swarm = true
end

expect(client.swarm_status).to match({
auto_swarm: true,
supervisor: match({
running: true,
state: eq('sleep').or(eq('run')),
observable: 'initialized'
}),
probe_hosts: match({
enabled: true,
thread: match({ running: true, state: eq('sleep').or(eq('run')) }),
main_loop: match({ running: true, state: eq('sleep').or(eq('run')) })
}),
flush_zombies: match({
enabled: true,
ractor: match({ running: true, state: 'running' }),
main_loop: match({ running: true, state: eq('sleep').or(eq('run')) })
})
})

sleep(3) # NOTE: wait for host probing

swarm_info = client.possible_host_ids.each_with_object({}) do |host_id, memo|
memo[host_id] = match({
zombie: false,
last_probe_time: be_a(Time),
last_probe_score: be_a(Numeric)
})
end
expect(client.swarm_info).to match(swarm_info)

# try to kill the swarm and get the corresponding swarm state
result = client.deswarmize!
expect(result).to eq({ ok: true, result: :terminating })
sleep(1) # give a time to terminate async objects

expect(client.swarm_status).to match({
auto_swarm: true,
supervisor: match({
running: false,
state: 'non_initialized',
observable: 'non_initialized'
}),
probe_hosts: match({
enabled: true,
thread: match({ running: false, state: 'dead' }),
main_loop: match({ running: false, state: 'non_initialized' })
}),
flush_zombies: match({
enabled: true,
ractor: match({ running: false, state: 'terminated' }),
main_loop: match({ running: false, state: 'non_initialized' })
})
})
end
end

specify 'manual host probing' do
specify 'manual host probing (with statuses)' do
client = RedisQueuedLocks::Client.new(redis) do |conf|
conf.swarm.auto_swarm = false
conf.swarm.flush_zombies.zombie_ttl = 4_000
Expand Down Expand Up @@ -152,7 +219,64 @@
end)
end

specify 'zombie locks (with hosts and acquiers)' do
specify 'manual zombie flushing (with statuses)' do
client = RedisQueuedLocks::Client.new(redis) do |conf|
conf.swarm.auto_swarm = false
# NOTE: we will manually probe hosts and flush zombies
conf.swarm.probe_hosts.enabled_for_swarm = false
conf.swarm.flush_zombies.enabled_for_swarm = false
conf.swarm.flush_zombies.zombie_ttl = 4_000
end

# create a zombie lock
client.lock('super-mega-long-lock', ttl: 500_000)
# probe hosts => made some info about the swarm
client.probe_hosts
expect(client.swarm_info).to match(hash_including({
client.current_host_id => match({
zombie: false,
last_probe_time: be_a(Time),
last_probe_score: be_a(Numeric)
})
}))
# sleep the zombie ttl a stop host probing
sleep(4)

# now we have one guaranteed zombie
expect(client.swarm_info).to match(hash_including({
client.current_host_id => match({
zombie: true,
last_probe_time: be_a(Time),
last_probe_score: be_a(Numeric)
})
}))

zombie_host = client.current_host_id
zombie_acquier = client.current_acquier_id
zombie_lock = 'rql:lock:super-mega-long-lock'

expect(client.locked?('super-mega-long-lock')).to eq(true)
expect(client.zombie_locks).to include(zombie_lock)
expect(client.zombie_acquiers).to include(zombie_acquier)
expect(client.zombie_hosts).to include(zombie_host)

# try to flush them all
aggregate_failures 'flush zombies' do
result = client.flush_zombies
expect(result).to match({
ok: true,
deleted_zombie_hosts: include(zombie_host),
deleted_zombie_acquiers: include(zombie_acquier),
deleted_zombie_locks: include(zombie_lock)
})
expect(client.locked?('super-mega-long-lock')).to eq(false)
expect(client.zombie_locks).to eq(Set.new)
expect(client.zombie_acquiers).not_to include(zombie_acquier)
expect(client.zombie_hosts).not_to include(zombie_host)
end
end

specify '(auto-swarming!): zombie locks (with hosts and acquiers)' do
main_client = RedisQueuedLocks::Client.new(redis) do |conf|
conf.swarm.auto_swarm = true
conf.swarm.probe_hosts.enabled_for_swarm = true
Expand Down

0 comments on commit 5de0952

Please sign in to comment.