Skip to content

Commit 1078464

Browse files
refactor, completed docs
1 parent e4b796d commit 1078464

File tree

5 files changed

+65
-47
lines changed

5 files changed

+65
-47
lines changed

core/src/main/scala/ox/resilience/DurationRateLimiterAlgorithm.scala

+19-9
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,18 @@ import java.util.concurrent.atomic.AtomicReference
77
import scala.annotation.tailrec
88
import scala.collection.immutable.Queue
99
import scala.concurrent.duration.FiniteDuration
10-
10+
import ox.discard
11+
12+
/** DurationRateLimiterAlgorithm: decides whether permit for operation can be acquired. Unlike RateLimiterAlgorithm it considers whole
13+
* execution time for an operation.
14+
*
15+
* There is no leakyBucket algorithm implemented because effectively it would result in "max number of operations currently running", which
16+
* can be achieved with single semaphore.
17+
*/
1118
object DurationRateLimiterAlgorithm:
12-
/** Fixed window algorithm: allows to run at most `rate` operations in consecutively segments of duration `per`. */
19+
/** Fixed window algorithm: allows running at most `rate` operations in consecutively segments of duration `per`. Considers whole
20+
* execution time of an operation. Operation spanning more than one window blocks permits in all windows that it spans.
21+
*/
1322
case class FixedWindow(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm:
1423
private val lastUpdate = new AtomicLong(System.nanoTime())
1524
private val semaphore = new Semaphore(rate)
@@ -28,18 +37,20 @@ object DurationRateLimiterAlgorithm:
2837
def update(): Unit =
2938
val now = System.nanoTime()
3039
lastUpdate.set(now)
40+
// We treat running operation in new window the same as a new operation that started in this window, so we replenish permits to: rate - operationsRunning
3141
semaphore.release(rate - semaphore.availablePermits() - runningOperations.get())
3242
end update
3343

3444
def runOperation[T](operation: => T, permits: Int): T =
3545
runningOperations.updateAndGet(_ + permits)
36-
val result = operation
37-
runningOperations.updateAndGet(current => (current - permits).max(0))
38-
result
46+
try operation
47+
finally runningOperations.updateAndGet(_ - permits).discard
3948

4049
end FixedWindow
4150

42-
/** Sliding window algorithm: allows to run at most `rate` operations in the lapse of `per` before current time. */
51+
/** Sliding window algorithm: allows to run at most `rate` operations in the lapse of `per` before current time. Considers whole execution
52+
* time of an operation. Operation release permit after `per` passed since operation ended.
53+
*/
4354
case class SlidingWindow(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm:
4455
// stores the timestamp and the number of permits acquired after finishing running operation
4556
private val log = new AtomicReference[Queue[(Long, Int)]](Queue[(Long, Int)]())
@@ -70,10 +81,9 @@ object DurationRateLimiterAlgorithm:
7081
end getNextUpdate
7182

7283
def runOperation[T](operation: => T, permits: Int): T =
73-
val result = operation
84+
try operation
7485
// Consider end of operation as a point to release permit after `per` passes
75-
addTimestampToLog(permits)
76-
result
86+
finally addTimestampToLog(permits)
7787

7888
def update(): Unit =
7989
val now = System.nanoTime()

core/src/main/scala/ox/resilience/RateLimiter.scala

+29-21
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,8 @@ object RateLimiter:
5252
* Interval of time between replenishing the rate limiter. The rate limiter is replenished to allow up to [[maxOperations]] in the next
5353
* time window.
5454
*/
55-
def fixedWindow(maxOperations: Int, window: FiniteDuration, operationMode: RateLimiterMode = RateLimiterMode.OperationStart)(using
56-
Ox
57-
): RateLimiter =
58-
operationMode match
59-
case RateLimiterMode.OperationStart => apply(RateLimiterAlgorithm.FixedWindow(maxOperations, window))
60-
case RateLimiterMode.OperationDuration => apply(DurationRateLimiterAlgorithm.FixedWindow(maxOperations, window))
55+
def fixedWindow(maxOperations: Int, window: FiniteDuration)(using Ox): RateLimiter =
56+
apply(RateLimiterAlgorithm.FixedWindow(maxOperations, window))
6157

6258
/** Creates a rate limiter using a sliding window algorithm.
6359
*
@@ -68,12 +64,8 @@ object RateLimiter:
6864
* @param window
6965
* Length of the window.
7066
*/
71-
def slidingWindow(maxOperations: Int, window: FiniteDuration, operationMode: RateLimiterMode = RateLimiterMode.OperationStart)(using
72-
Ox
73-
): RateLimiter =
74-
operationMode match
75-
case RateLimiterMode.OperationStart => apply(RateLimiterAlgorithm.SlidingWindow(maxOperations, window))
76-
case RateLimiterMode.OperationDuration => apply(DurationRateLimiterAlgorithm.SlidingWindow(maxOperations, window))
67+
def slidingWindow(maxOperations: Int, window: FiniteDuration)(using Ox): RateLimiter =
68+
apply(RateLimiterAlgorithm.SlidingWindow(maxOperations, window))
7769

7870
/** Creates a rate limiter with token/leaky bucket algorithm.
7971
*
@@ -84,15 +76,31 @@ object RateLimiter:
8476
* @param refillInterval
8577
* Interval of time between adding a single token to the bucket.
8678
*/
87-
def leakyBucket(maxTokens: Int, refillInterval: FiniteDuration)(using
88-
Ox
89-
): RateLimiter =
79+
def leakyBucket(maxTokens: Int, refillInterval: FiniteDuration)(using Ox): RateLimiter =
9080
apply(RateLimiterAlgorithm.LeakyBucket(maxTokens, refillInterval))
9181

92-
end RateLimiter
82+
/** Creates a rate limiter with duration fixed window algorithm.
83+
*
84+
* Must be run within an [[Ox]] concurrency scope, as a background fork is created, to replenish the rate limiter.
85+
*
86+
* @param maxOperations
87+
* Maximum number of operations that are allowed to **run** (finishing from previous windows or start new) within a time [[window]].
88+
* @param window
89+
* Length of the window.
90+
*/
91+
def durationFixedWindow(maxOperations: Int, window: FiniteDuration)(using Ox): RateLimiter =
92+
apply(DurationRateLimiterAlgorithm.FixedWindow(maxOperations, window))
9393

94-
/** Decides if RateLimiter should consider only start of an operation or whole time of execution.
95-
*/
96-
enum RateLimiterMode:
97-
case OperationStart
98-
case OperationDuration
94+
/** Creates a rate limiter using a duration sliding window algorithm.
95+
*
96+
* Must be run within an [[Ox]] concurrency scope, as a background fork is created, to replenish the rate limiter.
97+
*
98+
* @param maxOperations
99+
* Maximum number of operations that are allowed to **run** (start or finishing) within any [[window]] of time.
100+
* @param window
101+
* Length of the window.
102+
*/
103+
def durationSlidingWindow(maxOperations: Int, window: FiniteDuration)(using Ox): RateLimiter =
104+
apply(DurationRateLimiterAlgorithm.SlidingWindow(maxOperations, window))
105+
106+
end RateLimiter

core/src/main/scala/ox/resilience/RateLimiterAlgorithm.scala

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ trait RateLimiterAlgorithm:
3838

3939
end RateLimiterAlgorithm
4040

41+
/** RateLimiterAlgorithm: decides whether permit for operation can be acquired. Considers only start of an operation.
42+
*/
4143
object RateLimiterAlgorithm:
4244
/** Fixed window algorithm: allows starting at most `rate` operations in consecutively segments of duration `per`. */
4345
case class FixedWindow(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm:

core/src/test/scala/ox/resilience/RateLimiterTest.scala

+3-6
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,8 @@ import ox.*
44
import ox.util.ElapsedTime
55
import org.scalatest.flatspec.AnyFlatSpec
66
import org.scalatest.matchers.should.Matchers
7-
87
import java.util.concurrent.atomic.AtomicLong
98
import org.scalatest.{EitherValues, TryValues}
10-
import ox.resilience.RateLimiterMode.OperationDuration
11-
129
import scala.concurrent.duration.*
1310
import java.util.concurrent.atomic.AtomicReference
1411

@@ -222,7 +219,7 @@ class RateLimiterTest extends AnyFlatSpec with Matchers with EitherValues with T
222219

223220
it should "not allow to run more long running operations concurrently than max rate when considering operation time" in {
224221
supervised:
225-
val rateLimiter = RateLimiter.fixedWindow(2, FiniteDuration(1, "second"), OperationDuration)
222+
val rateLimiter = RateLimiter.durationFixedWindow(2, FiniteDuration(1, "second"))
226223

227224
def operation =
228225
sleep(3.seconds)
@@ -392,7 +389,7 @@ class RateLimiterTest extends AnyFlatSpec with Matchers with EitherValues with T
392389

393390
it should "not allow to run more operations when operations are still running when considering operation time" in {
394391
supervised:
395-
val rateLimiter = RateLimiter.slidingWindow(2, FiniteDuration(1, "second"), OperationDuration)
392+
val rateLimiter = RateLimiter.durationSlidingWindow(2, FiniteDuration(1, "second"))
396393

397394
def operation =
398395
sleep(3.seconds)
@@ -429,7 +426,7 @@ class RateLimiterTest extends AnyFlatSpec with Matchers with EitherValues with T
429426

430427
it should "not allow to run more operations when operations are still running in window span when considering operation time" in {
431428
supervised:
432-
val rateLimiter = RateLimiter.slidingWindow(3, FiniteDuration(1, "second"), OperationDuration)
429+
val rateLimiter = RateLimiter.durationSlidingWindow(3, FiniteDuration(1, "second"))
433430

434431
def longOperation =
435432
sleep(3.seconds)

doc/utils/rate-limiter.md

+12-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Rate limiter
22

3-
The rate limiter mechanism allows controlling the rate at which operations are executed. It ensures that at most a certain number of operations are run concurrently within a specified time frame, preventing system overload and ensuring fair resource usage. Note that you can choose if algorithm takes into account only the start of execution or the whole execution of an operation.
3+
The rate limiter mechanism allows controlling the rate at which operations are executed. It ensures that at most a certain number of operations are run concurrently within a specified time frame, preventing system overload and ensuring fair resource usage. Note that you can choose algorithm that takes into account only the start of execution or the whole execution time of an operation.
44

55
## API
66

@@ -34,26 +34,27 @@ The `operation` can be provided directly using a by-name parameter, i.e. `f: =>
3434
## Configuration
3535

3636
The configuration of a `RateLimiter` depends on an underlying algorithm that controls whether an operation can be executed or not. The following algorithms are available:
37-
- `RateLimiterAlgorithm.FixedWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in fixed windows of `dur` duration.
38-
- `RateLimiterAlgorithm.SlidingWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in any window of time of duration `dur`.
39-
- `RateLimiterAlgorithm.Bucket(maximum: Int, dur: FiniteDuration)` - where `maximum` is the maximum capacity of tokens available in the token bucket algorithm and one token is added each `dur`. It can represent both the leaky bucket algorithm or the token bucket algorithm.
40-
- `DurationRateLimiterAlgorithm.FixedWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations which execution spans fixed windows of `dur` duration.
41-
- `DurationRateLimiterAlgorithm.SlidingWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations which execution spans any window of time of duration `dur`.
37+
- `RateLimiterAlgorithm.FixedWindow(rate: Int, per: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in fixed windows of `per` duration.
38+
- `RateLimiterAlgorithm.SlidingWindow(rate: Int, per: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in any window of time of duration `per`.
39+
- `RateLimiterAlgorithm.Bucket(maximum: Int, per: FiniteDuration)` - where `rate` is the maximum capacity of tokens available in the token bucket algorithm and one token is added each `per`. It can represent both the leaky bucket algorithm or the token bucket algorithm.
40+
- `DurationRateLimiterAlgorithm.FixedWindow(rate: Int, per: FiniteDuration)` - where `rate` is the maximum number of operations which execution spans fixed windows of `per` duration. Considers whole execution time of an operation. Operation spanning more than one window blocks permits in all windows that it spans.
41+
- `DurationRateLimiterAlgorithm.SlidingWindow(rate: Int, per: FiniteDuration)` - where `rate` is the maximum number of operations which execution spans any window of time of duration `per`. Considers whole execution time of an operation. Operation release permit after `per` passed since operation ended.
4242

4343
### API shorthands
4444

4545
You can use one of the following shorthands to define a Rate Limiter with the corresponding algorithm:
4646

47-
- `RateLimiter.fixedWindow(rate: Int, dur: FiniteDuration, operationMode: RateLimiterMode = RateLimiterMode.OperationStart)`,
48-
- `RateLimiter.slidingWindow(rate: Int, dur: FiniteDuration, operationMode: RateLimiterMode = RateLimiterMode.OperationStart)`,
49-
- `RateLimiter.leakyBucket(maximum: Int, dur: FiniteDuration)`
47+
- `RateLimiter.fixedWindow(maxOperations: Int, window: FiniteDuration)`
48+
- `RateLimiter.slidingWindow(maxOperations: Int, window: FiniteDuration)`
49+
- `RateLimiter.leakyBucket(maxTokens: Int, refillInterval: FiniteDuration)`
50+
- `RateLimiter.durationFixedWindow(maxOperations: Int, window: FiniteDuration)`
51+
- `RateLimiter.durationSlidingWindow(maxOperations: Int, window: FiniteDuration)`
5052

51-
These shorthands also allow to define if the whole execution time of an operation should be considered.
5253
See the tests in `ox.resilience.*` for more.
5354

5455
## Custom rate limiter algorithms
5556

56-
The `RateLimiterAlgorithm` employed by `RateLimiter` can be extended to implement new algorithms or modify existing ones. Its interface is modelled like that of a `Semaphore` although the underlying implementation could be different. For best compatibility with the existing interface of `RateLimiter`, methods `acquire` and `tryAcquire` should offer the same guaranties as Java's `Semaphores`. There is also method `def runOperation[T](operation: => T, permits: Int): T` for cases where considering span of execution may be necessary.
57+
The `RateLimiterAlgorithm` employed by `RateLimiter` can be extended to implement new algorithms or modify existing ones. Its interface is modelled like that of a `Semaphore` although the underlying implementation could be different. For best compatibility with the existing interface of `RateLimiter`, methods `acquire` and `tryAcquire` should offer the same guaranties as Java's `Semaphores`. There is also method `def runOperation[T](operation: => T, permits: Int): T` for cases where considering span of execution may be necessary(see implementations in `DurationRateLimiterAlgorithm`).
5758

5859
Additionally, there are two methods employed by the `GenericRateLimiter` for updating its internal state automatically:
5960
- `def update(): Unit`: Updates the internal state of the rate limiter to reflect its current situation. Invoked in a background fork repeatedly, when a rate limiter is created.

0 commit comments

Comments
 (0)