Skip to content

Commit

Permalink
Issue Fix wait time calculation in AtomicRateLimiter (ReactiveX#824)
Browse files Browse the repository at this point in the history
  • Loading branch information
storozhukBM authored Jan 29, 2020
1 parent e9a6a81 commit 2508cd0
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) {
}

public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig,
Map<String, String> tags) {
Map<String, String> tags) {
this.name = name;
this.tags = tags;

Expand Down Expand Up @@ -197,7 +197,7 @@ private boolean compareAndSet(final State current, final State next) {
* @return next {@link State}
*/
private State calculateNextState(final int permits, final long timeoutInNanos,
final State activeState) {
final State activeState) {
long cyclePeriodInNanos = activeState.config.getLimitRefreshPeriod().toNanos();
int permissionsPerCycle = activeState.config.getLimitForPeriod();

Expand Down Expand Up @@ -235,18 +235,28 @@ private State calculateNextState(final int permits, final long timeoutInNanos,
* wait for the next permission
*/
private long nanosToWaitForPermission(final int permits, final long cyclePeriodInNanos,
final int permissionsPerCycle,
final int availablePermissions, final long currentNanos, final long currentCycle) {
final int permissionsPerCycle,
final int availablePermissions, final long currentNanos, final long currentCycle) {
if (availablePermissions >= permits) {
return 0L;
}
long nextCycleTimeInNanos = (currentCycle + 1) * cyclePeriodInNanos;
long nanosToNextCycle = nextCycleTimeInNanos - currentNanos;
int permissionsAtTheStartOfNextCycle = availablePermissions + permissionsPerCycle;
int fullCyclesToWait = -(permissionsAtTheStartOfNextCycle - permits) / permissionsPerCycle;
int fullCyclesToWait = divCeil(-(permissionsAtTheStartOfNextCycle - permits), permissionsPerCycle);
return (fullCyclesToWait * cyclePeriodInNanos) + nanosToNextCycle;
}

/**
* Divide two integers and round result to the bigger near mathematical integer.
*
* @param x - should be > 0
* @param y - should be > 0
*/
private static int divCeil(int x, int y) {
return (x + y - 1) / y;
}

/**
* Determines whether caller can acquire permission before timeout or not and then creates
* corresponding {@link State}. Reserves permissions only if caller can successfully wait for
Expand All @@ -261,8 +271,8 @@ private long nanosToWaitForPermission(final int permits, final long cyclePeriodI
* @return new {@link State} with possibly reserved permissions and time to wait
*/
private State reservePermissions(final RateLimiterConfig config, final int permits,
final long timeoutInNanos,
final long cycle, final int permissions, final long nanosToWait) {
final long timeoutInNanos,
final long cycle, final int permissions, final long nanosToWait) {
boolean canAcquireInTime = timeoutInNanos >= nanosToWait;
int permissionsWithReservation = permissions;
if (canAcquireInTime) {
Expand All @@ -281,7 +291,7 @@ private State reservePermissions(final RateLimiterConfig config, final int permi
* not exceed timeout
*/
private boolean waitForPermissionIfNecessary(final long timeoutInNanos,
final long nanosToWait) {
final long nanosToWait) {
boolean canAcquireImmediately = nanosToWait <= 0;
boolean canAcquireInTime = timeoutInNanos >= nanosToWait;

Expand Down Expand Up @@ -409,7 +419,7 @@ private static class State {
private final long nanosToWait;

private State(RateLimiterConfig config,
final long activeCycle, final int activePermissions, final long nanosToWait) {
final long activeCycle, final int activePermissions, final long nanosToWait) {
this.config = config;
this.activeCycle = activeCycle;
this.activePermissions = activePermissions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@

import java.time.Duration;
import java.time.Instant;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static java.lang.Math.ceil;
import static java.util.Collections.synchronizedList;
import static org.assertj.core.api.BDDAssertions.then;

/**
Expand Down Expand Up @@ -66,8 +74,61 @@ public void tryAquiringBigNumberOfPermitsAtEndOfCycleTest() {
then(retryInSecondCyclePermission).isTrue();
}

@Test
public void reservePermissionsUpfront() throws InterruptedException {
final int limitForPeriod = 3;
final int tasksNum = 9;
Duration limitRefreshPeriod = Duration.ofMillis(1000);
Duration timeoutDuration = Duration.ofMillis(1200);

Duration durationToWait = limitRefreshPeriod.multipliedBy((long) ceil(((double) tasksNum) / limitForPeriod));

RateLimiterConfig config = RateLimiterConfig.custom()
.limitForPeriod(limitForPeriod)
.limitRefreshPeriod(limitRefreshPeriod)
.timeoutDuration(timeoutDuration)
.build();

ExecutorService executorService = Executors.newFixedThreadPool(tasksNum);
List<Duration> times = synchronizedList(new ArrayList<>(9));

RateLimiter limiter = buildRateLimiter(config);
RateLimiter.Metrics metrics = limiter.getMetrics();
waitForRefresh(metrics, config, '$');

LocalDateTime testStart = LocalDateTime.now();
Runnable runnable = RateLimiter.decorateRunnable(limiter, () -> {
times.add(Duration.between(testStart, LocalDateTime.now()));
});
for (int i = 0; i < tasksNum; i++) {
executorService.submit(runnable);
}

executorService.shutdown();
boolean terminated = executorService.awaitTermination(durationToWait.toMillis(), TimeUnit.MILLISECONDS);
then(terminated).isTrue();


ArrayList<Long> runningDeltas = new ArrayList<>();
long previousDuration = times.get(0).toMillis();
for (Duration time : times) {
long current = time.toMillis();
long delta = Math.abs(previousDuration - current);
runningDeltas.add(delta);
previousDuration = current;
}

then(runningDeltas.get(0)).isZero();
then(runningDeltas.get(1)).isLessThan(20);
then(runningDeltas.get(2)).isLessThan(20);
then(runningDeltas.get(3)).isBetween(200L, 1050L);
then(runningDeltas.get(4)).isLessThan(20);
then(runningDeltas.get(5)).isLessThan(20);
then(times).hasSize(6);
}

protected void waitForRefresh(RateLimiter.Metrics metrics, RateLimiterConfig config,
char printedWhileWaiting) {
char printedWhileWaiting) {
Instant start = Instant.now();
while (Instant.now().isBefore(start.plus(config.getLimitRefreshPeriod()))) {
try {
Expand Down

0 comments on commit 2508cd0

Please sign in to comment.