diff --git a/reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java b/reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java index dfa05b6da2..5b07f8c86b 100644 --- a/reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java +++ b/reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java @@ -250,14 +250,26 @@ public Mono disposeGracefully() { public Disposable schedule(Runnable task) { //tasks running once will call dispose on the BoundedState, decreasing its usage by one BoundedState picked = state.currentResource.pick(); - return Schedulers.directSchedule(picked.executor, task, picked, 0L, TimeUnit.MILLISECONDS); + try { + return Schedulers.directSchedule(picked.executor, task, picked, 0L, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ex) { + // ensure to free the BoundedState so it can be reused + picked.dispose(); + throw ex; + } } @Override public Disposable schedule(Runnable task, long delay, TimeUnit unit) { //tasks running once will call dispose on the BoundedState, decreasing its usage by one final BoundedState picked = state.currentResource.pick(); - return Schedulers.directSchedule(picked.executor, task, picked, delay, unit); + try { + return Schedulers.directSchedule(picked.executor, task, picked, delay, unit); + } catch (RejectedExecutionException ex) { + // ensure to free the BoundedState so it can be reused + picked.dispose(); + throw ex; + } } @Override @@ -266,14 +278,20 @@ public Disposable schedulePeriodically(Runnable task, long period, TimeUnit unit) { final BoundedState picked = state.currentResource.pick(); - Disposable scheduledTask = Schedulers.directSchedulePeriodically(picked.executor, - task, - initialDelay, - period, - unit); - //a composite with picked ensures the cancellation of the task releases the BoundedState - // (ie decreases its usage by one) - return Disposables.composite(scheduledTask, picked); + try { + Disposable scheduledTask = Schedulers.directSchedulePeriodically(picked.executor, + task, + initialDelay, + period, + unit); + //a composite with picked ensures the cancellation of the task releases the BoundedState + // (ie decreases its usage by one) + return Disposables.composite(scheduledTask, picked); + } catch (RejectedExecutionException ex) { + // ensure to free the BoundedState so it can be reused + picked.dispose(); + throw ex; + } } @Override diff --git a/reactor-core/src/test/java/reactor/core/scheduler/BoundedElasticSchedulerTest.java b/reactor-core/src/test/java/reactor/core/scheduler/BoundedElasticSchedulerTest.java index debe29b14e..984adac6db 100644 --- a/reactor-core/src/test/java/reactor/core/scheduler/BoundedElasticSchedulerTest.java +++ b/reactor-core/src/test/java/reactor/core/scheduler/BoundedElasticSchedulerTest.java @@ -1551,6 +1551,89 @@ void toStringOfExecutorReflectsIdleVsActive() throws InterruptedException { } } + @Test + public void transitionsToIdleAfterRejectionAndFollowingCompletion() throws InterruptedException { + int taskRejected = 0; + + int maxThreads = 2; + int maxTaskQueuedPerThread = 10; + + BoundedElasticScheduler scheduler = afterTest.autoDispose( + new BoundedElasticScheduler( + maxThreads, + maxTaskQueuedPerThread, + new ReactorThreadFactory( + "moveToIdleAfterHaveRejectTasksAndCompleteAllTasks", + new AtomicLong(), + false, + false, + null), + 60)); + scheduler.start(); + + CountDownLatch releaseAllTasksLatch = new CountDownLatch(1); + CountDownLatch tasksStartedLatch = new CountDownLatch(maxThreads); + + Runnable startedAndWaitLatchRunnable = () -> { + try { + tasksStartedLatch.countDown(); + releaseAllTasksLatch.await(30, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + }; + + Runnable waitLatchRunnable = () -> { + try { + releaseAllTasksLatch.await(30, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + }; + + // initial tasks that block the threads, causing other tasks to enter the pending queue + for (int i = 0; i < maxThreads; i++) { + scheduler.schedule(startedAndWaitLatchRunnable); + } + + // small window to start the first task + assertThat(tasksStartedLatch.await(1, TimeUnit.SECONDS)).as("task picked").isTrue(); + + // fill up pending queue + for (int i = 0; i < maxThreads * maxTaskQueuedPerThread; i++) { + scheduler.schedule(waitLatchRunnable); + } + + // check new tasks are rejected + assertThatExceptionOfType(RejectedExecutionException.class) + .as("must reject for method schedule without delay") + .isThrownBy(() -> scheduler.schedule(waitLatchRunnable)); + taskRejected++; + + assertThatExceptionOfType(RejectedExecutionException.class) + .as("must reject for method schedule with delay") + .isThrownBy(() -> scheduler.schedule(waitLatchRunnable, 100, TimeUnit.MILLISECONDS)); + taskRejected++; + + assertThatExceptionOfType(RejectedExecutionException.class) + .as("must reject for method schedulePeriodically") + .isThrownBy(() -> scheduler.schedulePeriodically(waitLatchRunnable, 100, 100, TimeUnit.MILLISECONDS)); + taskRejected++; + + assertThat(taskRejected).as("task rejected").isEqualTo(3); + + releaseAllTasksLatch.countDown(); + + Awaitility.with().pollInterval(50, TimeUnit.MILLISECONDS).pollDelay(50, TimeUnit.MILLISECONDS) + .await().atMost(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> assertThat(scheduler.estimateIdle()) + .as("all BoundedStates are idle after all pending tasks finish") + .isEqualTo(maxThreads) + ); + } + private static boolean canSubmitTask(Scheduler scheduler) { CountDownLatch latch = new CountDownLatch(1); scheduler.schedule(latch::countDown);