Skip to content

Commit

Permalink
Dispose picked worker when BoundedElasticScheduler rejects task (#3183)
Browse files Browse the repository at this point in the history
This commit makes sure that the picked worker is released (calling
dispose() method) if a RejectedExecutionException is raised when the
BoundedElasticScheduler tries to submit a task to its picked worker.

Fixes #3182.

Co-authored-by: Stanislav Kotik <[email protected]>
Co-authored-by: Kuznetsov Stanislav <[email protected]>
  • Loading branch information
3 people authored Sep 20, 2022
1 parent 91c4bdf commit f2ffd70
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,26 @@ public Mono<Void> disposeGracefully() {
public Disposable schedule(Runnable task) {
//tasks running once will call dispose on the BoundedState, decreasing its usage by one
BoundedState picked = state.currentResource.pick();
return Schedulers.directSchedule(picked.executor, task, picked, 0L, TimeUnit.MILLISECONDS);
try {
return Schedulers.directSchedule(picked.executor, task, picked, 0L, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException ex) {
// ensure to free the BoundedState so it can be reused
picked.dispose();
throw ex;
}
}

@Override
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
//tasks running once will call dispose on the BoundedState, decreasing its usage by one
final BoundedState picked = state.currentResource.pick();
return Schedulers.directSchedule(picked.executor, task, picked, delay, unit);
try {
return Schedulers.directSchedule(picked.executor, task, picked, delay, unit);
} catch (RejectedExecutionException ex) {
// ensure to free the BoundedState so it can be reused
picked.dispose();
throw ex;
}
}

@Override
Expand All @@ -266,14 +278,20 @@ public Disposable schedulePeriodically(Runnable task,
long period,
TimeUnit unit) {
final BoundedState picked = state.currentResource.pick();
Disposable scheduledTask = Schedulers.directSchedulePeriodically(picked.executor,
task,
initialDelay,
period,
unit);
//a composite with picked ensures the cancellation of the task releases the BoundedState
// (ie decreases its usage by one)
return Disposables.composite(scheduledTask, picked);
try {
Disposable scheduledTask = Schedulers.directSchedulePeriodically(picked.executor,
task,
initialDelay,
period,
unit);
//a composite with picked ensures the cancellation of the task releases the BoundedState
// (ie decreases its usage by one)
return Disposables.composite(scheduledTask, picked);
} catch (RejectedExecutionException ex) {
// ensure to free the BoundedState so it can be reused
picked.dispose();
throw ex;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,89 @@ void toStringOfExecutorReflectsIdleVsActive() throws InterruptedException {
}
}

@Test
public void transitionsToIdleAfterRejectionAndFollowingCompletion() throws InterruptedException {
int taskRejected = 0;

int maxThreads = 2;
int maxTaskQueuedPerThread = 10;

BoundedElasticScheduler scheduler = afterTest.autoDispose(
new BoundedElasticScheduler(
maxThreads,
maxTaskQueuedPerThread,
new ReactorThreadFactory(
"moveToIdleAfterHaveRejectTasksAndCompleteAllTasks",
new AtomicLong(),
false,
false,
null),
60));
scheduler.start();

CountDownLatch releaseAllTasksLatch = new CountDownLatch(1);
CountDownLatch tasksStartedLatch = new CountDownLatch(maxThreads);

Runnable startedAndWaitLatchRunnable = () -> {
try {
tasksStartedLatch.countDown();
releaseAllTasksLatch.await(30, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
e.printStackTrace();
}
};

Runnable waitLatchRunnable = () -> {
try {
releaseAllTasksLatch.await(30, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
e.printStackTrace();
}
};

// initial tasks that block the threads, causing other tasks to enter the pending queue
for (int i = 0; i < maxThreads; i++) {
scheduler.schedule(startedAndWaitLatchRunnable);
}

// small window to start the first task
assertThat(tasksStartedLatch.await(1, TimeUnit.SECONDS)).as("task picked").isTrue();

// fill up pending queue
for (int i = 0; i < maxThreads * maxTaskQueuedPerThread; i++) {
scheduler.schedule(waitLatchRunnable);
}

// check new tasks are rejected
assertThatExceptionOfType(RejectedExecutionException.class)
.as("must reject for method schedule without delay")
.isThrownBy(() -> scheduler.schedule(waitLatchRunnable));
taskRejected++;

assertThatExceptionOfType(RejectedExecutionException.class)
.as("must reject for method schedule with delay")
.isThrownBy(() -> scheduler.schedule(waitLatchRunnable, 100, TimeUnit.MILLISECONDS));
taskRejected++;

assertThatExceptionOfType(RejectedExecutionException.class)
.as("must reject for method schedulePeriodically")
.isThrownBy(() -> scheduler.schedulePeriodically(waitLatchRunnable, 100, 100, TimeUnit.MILLISECONDS));
taskRejected++;

assertThat(taskRejected).as("task rejected").isEqualTo(3);

releaseAllTasksLatch.countDown();

Awaitility.with().pollInterval(50, TimeUnit.MILLISECONDS).pollDelay(50, TimeUnit.MILLISECONDS)
.await().atMost(500, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertThat(scheduler.estimateIdle())
.as("all BoundedStates are idle after all pending tasks finish")
.isEqualTo(maxThreads)
);
}

private static boolean canSubmitTask(Scheduler scheduler) {
CountDownLatch latch = new CountDownLatch(1);
scheduler.schedule(latch::countDown);
Expand Down

0 comments on commit f2ffd70

Please sign in to comment.