-
Notifications
You must be signed in to change notification settings - Fork 81
/
Copy pathclient.rb
322 lines (283 loc) · 10.8 KB
/
client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
require 'redis-client'
require 'securerandom'
module Redlock
include Scripts
class Client
DEFAULT_REDIS_HOST = ENV["DEFAULT_REDIS_HOST"] || "localhost"
DEFAULT_REDIS_PORT = ENV["DEFAULT_REDIS_PORT"] || "6379"
DEFAULT_REDIS_URLS = ["redis://#{DEFAULT_REDIS_HOST}:#{DEFAULT_REDIS_PORT}"]
DEFAULT_REDIS_TIMEOUT = 0.1
DEFAULT_RETRY_COUNT = 3
DEFAULT_RETRY_DELAY = 200
DEFAULT_RETRY_JITTER = 50
CLOCK_DRIFT_FACTOR = 0.01
##
# Returns default time source function depending on CLOCK_MONOTONIC availability.
#
def self.default_time_source
if defined?(Process::CLOCK_MONOTONIC)
proc { (Process.clock_gettime(Process::CLOCK_MONOTONIC) * 1000).to_i }
else
proc { (Time.now.to_f * 1000).to_i }
end
end
# Create a distributed lock manager implementing redlock algorithm.
# Params:
# +servers+:: The array of redis connection URLs or Redis connection instances. Or a mix of both.
# +options+::
# * `retry_count` being how many times it'll try to lock a resource (default: 3)
# * `retry_delay` being how many ms to sleep before try to lock again (default: 200)
# * `retry_jitter` being how many ms to jitter retry delay (default: 50)
# * `redis_timeout` being how the Redis timeout will be set in seconds (default: 0.1)
# * `time_source` being a callable object returning a monotonic time in milliseconds
# (default: see #default_time_source)
def initialize(servers = DEFAULT_REDIS_URLS, options = {})
redis_timeout = options[:redis_timeout] || DEFAULT_REDIS_TIMEOUT
@servers = servers.map do |server|
if server.is_a?(String)
RedisInstance.new(url: server, timeout: redis_timeout)
else
RedisInstance.new(server)
end
end
@quorum = (servers.length / 2).to_i + 1
@retry_count = options[:retry_count] || DEFAULT_RETRY_COUNT
@retry_delay = options[:retry_delay] || DEFAULT_RETRY_DELAY
@retry_jitter = options[:retry_jitter] || DEFAULT_RETRY_JITTER
@time_source = options[:time_source] || self.class.default_time_source
end
# Locks a resource for a given time.
# Params:
# +resource+:: the resource (or key) string to be locked.
# +ttl+:: The time-to-live in ms for the lock.
# +options+:: Hash of optional parameters
# * +retry_count+: see +initialize+
# * +retry_delay+: see +initialize+
# * +retry_jitter+: see +initialize+
# * +extend+: A lock ("lock_info") to extend.
# * +extend_only_if_locked+: Boolean, if +extend+ is given, only acquire lock if currently held
# * +extend_only_if_life+: Deprecated, same as +extend_only_if_locked+
# * +extend_life+: Deprecated, same as +extend_only_if_locked+
# +block+:: an optional block to be executed; after its execution, the lock (if successfully
# acquired) is automatically unlocked.
def lock(resource, ttl, options = {}, &block)
lock_info = try_lock_instances(resource, ttl, options)
if options[:extend_only_if_life] && !Gem::Deprecate.skip
warn 'DEPRECATION WARNING: The `extend_only_if_life` option has been renamed `extend_only_if_locked`.'
options[:extend_only_if_locked] = options[:extend_only_if_life]
end
if options[:extend_life] && !Gem::Deprecate.skip
warn 'DEPRECATION WARNING: The `extend_life` option has been renamed `extend_only_if_locked`.'
options[:extend_only_if_locked] = options[:extend_life]
end
if block_given?
begin
yield lock_info
!!lock_info
ensure
unlock(lock_info) if lock_info
end
else
lock_info
end
end
# Unlocks a resource.
# Params:
# +lock_info+:: the lock that has been acquired when you locked the resource.
def unlock(lock_info)
@servers.each { |s| s.unlock(lock_info[:resource], lock_info[:value]) }
end
# Locks a resource, executing the received block only after successfully acquiring the lock,
# and returning its return value as a result.
# See Redlock::Client#lock for parameters.
def lock!(resource, *args)
fail 'No block passed' unless block_given?
lock(resource, *args) do |lock_info|
raise LockError, resource unless lock_info
return yield
end
end
# Gets remaining ttl of a resource. The ttl is returned if the holder
# currently holds the lock and it has not expired, otherwise the method
# returns nil.
# Params:
# +lock_info+:: the lock that has been acquired when you locked the resource
def get_remaining_ttl_for_lock(lock_info)
ttl_info = try_get_remaining_ttl(lock_info[:resource])
return nil if ttl_info.nil? || ttl_info[:value] != lock_info[:value]
ttl_info[:ttl]
end
# Gets remaining ttl of a resource. If there is no valid lock, the method
# returns nil.
# Params:
# +resource+:: the name of the resource (string) for which to check the ttl
def get_remaining_ttl_for_resource(resource)
ttl_info = try_get_remaining_ttl(resource)
return nil if ttl_info.nil?
ttl_info[:ttl]
end
# Checks if a resource is locked
# Params:
# +lock_info+:: the lock that has been acquired when you locked the resource
def locked?(resource)
ttl = get_remaining_ttl_for_resource(resource)
!(ttl.nil? || ttl.zero?)
end
# Checks if a lock is still valid
# Params:
# +lock_info+:: the lock that has been acquired when you locked the resource
def valid_lock?(lock_info)
ttl = get_remaining_ttl_for_lock(lock_info)
!(ttl.nil? || ttl.zero?)
end
private
class RedisInstance
module ConnectionPoolLike
def with
yield self
end
end
def initialize(connection)
if connection.respond_to?(:with)
@redis = connection
else
if connection.respond_to?(:client)
@redis = connection
else
@redis = RedisClient.new(connection)
end
@redis.extend(ConnectionPoolLike)
end
end
def lock(resource, val, ttl, allow_new_lock)
recover_from_script_flush do
@redis.with { |conn|
conn.call('EVALSHA', Scripts::LOCK_SCRIPT_SHA, 1, resource, val, ttl, allow_new_lock)
}
end
end
def unlock(resource, val)
recover_from_script_flush do
@redis.with { |conn|
conn.call('EVALSHA', Scripts::UNLOCK_SCRIPT_SHA, 1, resource, val)
}
end
rescue
# Nothing to do, unlocking is just a best-effort attempt.
end
def get_remaining_ttl(resource)
recover_from_script_flush do
@redis.with { |conn|
conn.call('EVALSHA', Scripts::PTTL_SCRIPT_SHA, 1, resource)
}
end
rescue RedisClient::ConnectionError
nil
end
private
def load_scripts
scripts = [
Scripts::UNLOCK_SCRIPT,
Scripts::LOCK_SCRIPT,
Scripts::PTTL_SCRIPT
]
@redis.with do |connnection|
scripts.each do |script|
connnection.call('SCRIPT', 'LOAD', script)
end
end
end
def recover_from_script_flush
retry_on_noscript = true
begin
yield
rescue RedisClient::CommandError => e
# When somebody has flushed the Redis instance's script cache, we might
# want to reload our scripts. Only attempt this once, though, to avoid
# going into an infinite loop.
if retry_on_noscript && e.message.include?('NOSCRIPT')
load_scripts
retry_on_noscript = false
retry
else
raise
end
end
end
end
def try_lock_instances(resource, ttl, options)
retry_count = options[:retry_count] || @retry_count
tries = options[:extend] ? 1 : (retry_count + 1)
tries.times do |attempt_number|
# Wait a random delay before retrying.
sleep(attempt_retry_delay(attempt_number, options)) if attempt_number > 0
lock_info = lock_instances(resource, ttl, options)
return lock_info if lock_info
end
false
end
def attempt_retry_delay(attempt_number, options)
retry_delay = options[:retry_delay] || @retry_delay
retry_jitter = options[:retry_jitter] || @retry_jitter
retry_delay =
if retry_delay.respond_to?(:call)
retry_delay.call(attempt_number)
else
retry_delay
end
(retry_delay + rand(retry_jitter)).to_f / 1000
end
def lock_instances(resource, ttl, options)
value = (options[:extend] || { value: SecureRandom.uuid })[:value]
allow_new_lock = options[:extend_only_if_locked] ? 'no' : 'yes'
locked, time_elapsed = timed do
@servers.select { |s| s.lock resource, value, ttl, allow_new_lock }.size
end
validity = ttl - time_elapsed - drift(ttl)
if locked >= @quorum && validity >= 0
{ validity: validity, resource: resource, value: value }
else
@servers.each { |s| s.unlock(resource, value) }
false
end
end
def try_get_remaining_ttl(resource)
# Responses from the servers are a 2 tuple of format [lock_value, ttl].
# The lock_value is nil if it does not exist. Since servers may have
# different lock values, the responses are grouped by the lock_value and
# transofrmed into a hash: { lock_value1 => [ttl1, ttl2, ttl3],
# lock_value2 => [ttl4, tt5] }
ttls_by_value, time_elapsed = timed do
@servers.map { |s| s.get_remaining_ttl(resource) }
.select { |ttl_tuple| ttl_tuple&.first }
.group_by(&:first)
.transform_values { |ttl_tuples| ttl_tuples.map { |t| t.last } }
end
# Authoritative lock value is that which is returned by the majority of
# servers
authoritative_value, ttls =
ttls_by_value.max_by { |(lock_value, ttls)| ttls.length }
if ttls && ttls.size >= @quorum
# Return the minimum TTL of an N/2+1 selection. It will always be
# correct (it will guarantee that at least N/2+1 servers have a TTL that
# value or longer)
min_ttl = ttls.sort.last(@quorum).first
min_ttl = min_ttl - time_elapsed - drift(min_ttl)
{ value: authoritative_value, ttl: min_ttl }
else
# No lock_value is authoritatively held for the resource
nil
end
end
def drift(ttl)
# Add 2 milliseconds to the drift to account for Redis expires
# precision, which is 1 millisecond, plus 1 millisecond min drift
# for small TTLs.
(ttl * CLOCK_DRIFT_FACTOR).to_i + 2
end
def timed
start_time = @time_source.call()
[yield, @time_source.call() - start_time]
end
end
end