diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java index 14d1ad5b64638..ac917e72b2d2f 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java @@ -148,11 +148,17 @@ protected void afterExecute(Runnable r, Throwable t) { assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue"; final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r); final long taskNanos = timedRunnable.getTotalNanos(); + final boolean failedOrRejected = timedRunnable.getFailedOrRejected(); final long totalNanos = totalTaskNanos.addAndGet(taskNanos); final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos(); - assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos; - executionEWMA.addValue(taskExecutionNanos); + assert taskExecutionNanos >= 0 || (failedOrRejected && taskExecutionNanos == -1) : + "expected task to always take longer than 0 nanoseconds or have '-1' failure code, got: " + taskExecutionNanos + + ", failedOrRejected: " + failedOrRejected; + if (taskExecutionNanos != -1) { + // taskExecutionNanos may be -1 if the task threw an exception + executionEWMA.addValue(taskExecutionNanos); + } if (taskCount.incrementAndGet() == this.tasksPerFrame) { final long endTimeNs = System.nanoTime(); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java index b6b9ef1ad05bf..f2de68453a6c2 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java @@ -30,6 +30,7 @@ class TimedRunnable extends AbstractRunnable implements WrappedRunnable { private final long creationTimeNanos; private long startTimeNanos; private long finishTimeNanos = -1; + private boolean failedOrRejected = false; TimedRunnable(final Runnable original) { this.original = original; @@ -48,6 +49,7 @@ public void doRun() { @Override public void onRejection(final Exception e) { + this.failedOrRejected = true; if (original instanceof AbstractRunnable) { ((AbstractRunnable) original).onRejection(e); } else { @@ -64,6 +66,7 @@ public void onAfter() { @Override public void onFailure(final Exception e) { + this.failedOrRejected = true; if (original instanceof AbstractRunnable) { ((AbstractRunnable) original).onFailure(e); } else { @@ -100,6 +103,14 @@ long getTotalExecutionNanos() { return Math.max(finishTimeNanos - startTimeNanos, 1); } + /** + * If the task was failed or rejected, return true. + * Otherwise, false. + */ + boolean getFailedOrRejected() { + return this.failedOrRejected; + } + @Override public Runnable unwrap() { return original; diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java index 8e4c729ee9cef..6b5f7d95700d1 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java @@ -226,16 +226,43 @@ public void testExecutionEWMACalculation() throws Exception { context.close(); } + /** Use a runnable wrapper that simulates a task with unknown failures. */ + public void testExceptionThrowingTask() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + ResizableBlockingQueue queue = + new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), + 100); + + QueueResizingEsThreadPoolExecutor executor = + new QueueResizingEsThreadPoolExecutor( + "test-threadpool", 1, 1, 1000, + TimeUnit.MILLISECONDS, queue, 10, 200, exceptionalWrapper(), 10, TimeValue.timeValueMillis(1), + EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(0L)); + executeTask(executor, 1); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + context.close(); + } + private Function fastWrapper() { - return (runnable) -> { - return new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100)); - }; + return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100), false); } private Function slowWrapper() { - return (runnable) -> { - return new SettableTimedRunnable(TimeUnit.MINUTES.toNanos(2)); - }; + return (runnable) -> new SettableTimedRunnable(TimeUnit.MINUTES.toNanos(2), false); + } + + /** + * The returned function outputs a WrappedRunnabled that simulates the case + * where {@link TimedRunnable#getTotalExecutionNanos()} returns -1 because + * the job failed or was rejected before it finished. + */ + private Function exceptionalWrapper() { + return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(-1), true); } /** Execute a blank task {@code times} times for the executor */ @@ -248,10 +275,12 @@ private void executeTask(QueueResizingEsThreadPoolExecutor executor, int times) public class SettableTimedRunnable extends TimedRunnable { private final long timeTaken; + private final boolean testFailedOrRejected; - public SettableTimedRunnable(long timeTaken) { + public SettableTimedRunnable(long timeTaken, boolean failedOrRejected) { super(() -> {}); this.timeTaken = timeTaken; + this.testFailedOrRejected = failedOrRejected; } @Override @@ -263,5 +292,10 @@ public long getTotalNanos() { public long getTotalExecutionNanos() { return timeTaken; } + + @Override + public boolean getFailedOrRejected() { + return testFailedOrRejected; + } } } diff --git a/server/src/test/java/org/elasticsearch/node/NodeTests.java b/server/src/test/java/org/elasticsearch/node/NodeTests.java index c083928436446..6f0419421b868 100644 --- a/server/src/test/java/org/elasticsearch/node/NodeTests.java +++ b/server/src/test/java/org/elasticsearch/node/NodeTests.java @@ -174,7 +174,6 @@ public void testAwaitCloseTimeoutsOnNonInterruptibleTask() throws Exception { shouldRun.set(false); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41448") public void testCloseOnInterruptibleTask() throws Exception { Node node = new MockNode(baseSettings().build(), basePlugins()); node.start();