Skip to content

Commit

Permalink
Fix StackOverflowException in PriorityWorkerPool.
Browse files Browse the repository at this point in the history
Enqueues directly into the ForkJoinPool instead of executing inline when the
queue is full. Inline execution may recursively enqueue more tasks, eventually
leading to stack overflow.

Deletes TieredPriorityExecutor.tryDoQueuedWork. This has never been shown to
improve performance.

PiperOrigin-RevId: 552579621
Change-Id: I2f5c6154b26b785ebc31aba44283abeeec6a6a99
  • Loading branch information
aoeui authored and copybara-github committed Jul 31, 2023
1 parent 1997f09 commit cb0cf7f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.DO_CPU_HEAVY_TASK;
import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.DO_TASK;
import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.IDLE;
import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.QUIESCENT;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
Expand Down Expand Up @@ -131,15 +129,18 @@ final class PriorityWorkerPool {
this.pool = newForkJoinPool();
this.errorClassifier = errorClassifier;

long baseAddress = createPaddedBaseAddress(4);
long baseAddress = createPaddedBaseAddress(5);
cleaner.register(this, new AddressFreer(baseAddress));
this.countersAddress = getAlignedAddress(baseAddress, /* offset= */ 0);

this.queue =
new TaskFifo(
/* sizeAddress= */ getAlignedAddress(baseAddress, /* offset= */ 1),
/* appendIndexAddress= */ getAlignedAddress(baseAddress, /* offset= */ 2),
/* takeIndexAddress= */ getAlignedAddress(baseAddress, /* offset= */ 3));

this.activeWorkerCountAddress = getAlignedAddress(baseAddress, /* offset= */ 4);

resetExecutionCounters();
}

Expand All @@ -157,29 +158,34 @@ void execute(Runnable rawTask) {
if (task.isCpuHeavy()) {
cpuHeavyQueue.add(task);
if (acquireThreadAndCpuPermitElseReleaseCpuHeavyTask()) {
UNSAFE.getAndAddInt(null, activeWorkerCountAddress, 1);
pool.execute(RUN_CPU_HEAVY_TASK);
}
return;
}
}

while (!queue.tryAppend(rawTask)) {
// If the queue is full, this thread donates some work to reduce the queue.
if (!tryAcquireTask()) {
// This should be very hard to reach if the queue is full except under cancellation. It's
// possible to perform the cancellation check in advance, but we can save doing the check in
// most cases by deferring it to this branch.
if (isCancelled()) {
return;
}
logger.atWarning().atMostEvery(5, SECONDS).log(
"Queue is full but no tasks could be acquired: %s", this);
continue;
if (!queue.tryAppend(rawTask)) {
if (!isCancelled()) {
// The task queue is full (and the pool is not cancelled). Enqueues the task directly in the
// ForkJoinPool. This should be rare in practice.
UNSAFE.getAndAddInt(null, activeWorkerCountAddress, 1);
pool.execute(
() -> {
try {
rawTask.run();
} catch (Throwable uncaught) {
handleUncaughtError(uncaught);
} finally {
workerBecomingIdle();
}
});
}
dequeueTaskAndRun();
return;
}

if (acquireThreadElseReleaseTask()) {
UNSAFE.getAndAddInt(null, activeWorkerCountAddress, 1);
pool.execute(RUN_TASK);
}
}
Expand Down Expand Up @@ -328,12 +334,9 @@ private ForkJoinPool newForkJoinPool() {
*
* <p>After completing a task, the worker checks if there are any available tasks that it may
* execute, subject to CPU permit constraints. On finding and reserving an appropriate task, the
* worker returns its next planned activity, {@link #IDLE} or {@link #QUIESCENT} if it finds
* nothing to do.
* worker returns its next planned activity, {@link #IDLE} if it finds nothing to do.
*/
enum NextWorkerActivity {
/** The worker will stop and is the last worker working. */
QUIESCENT,
/** The worker will stop. */
IDLE,
/** The worker will perform a non-CPU heavy task. */
Expand Down Expand Up @@ -386,12 +389,8 @@ private WorkerThread(ForkJoinPool pool, String name) {
private void runLoop(NextWorkerActivity nextActivity) {
while (true) {
switch (nextActivity) {
case QUIESCENT:
synchronized (quiescenceMonitor) {
quiescenceMonitor.notifyAll();
}
return;
case IDLE:
workerBecomingIdle();
return;
case DO_TASK:
dequeueTaskAndRun();
Expand All @@ -404,14 +403,6 @@ private void runLoop(NextWorkerActivity nextActivity) {
}
}
}

boolean tryDoQueuedWork() {
if (!tryAcquireTask()) {
return false;
}
dequeueTaskAndRun();
return true;
}
}

private void dequeueTaskAndRun() {
Expand All @@ -431,6 +422,14 @@ private void dequeueCpuHeavyTaskAndRun() {
}
}

private void workerBecomingIdle() {
if (UNSAFE.getAndAddInt(null, activeWorkerCountAddress, -1) == 1) {
synchronized (quiescenceMonitor) {
quiescenceMonitor.notifyAll();
}
}
}

// The constants below apply to the 64-bit execution counters value.

private static final long CANCEL_BIT = 0x8000_0000_0000_0000L;
Expand Down Expand Up @@ -479,10 +478,10 @@ private void dequeueCpuHeavyTaskAndRun() {
*/
private final long countersAddress;

private final long activeWorkerCountAddress;

boolean isQuiescent() {
long snapshot = getExecutionCounters();
int threadsSnapshot = (int) ((snapshot & THREADS_MASK) >> THREADS_BIT_OFFSET);
return threadsSnapshot == poolSize;
return UNSAFE.getInt(null, activeWorkerCountAddress) == 0;
}

boolean isCancelled() {
Expand All @@ -505,6 +504,7 @@ private void resetExecutionCounters() {
countersAddress,
(((long) poolSize) << THREADS_BIT_OFFSET)
| (((long) cpuPermits) << CPU_PERMITS_BIT_OFFSET));
UNSAFE.putInt(null, activeWorkerCountAddress, 0);
}

private boolean acquireThreadElseReleaseTask() {
Expand Down Expand Up @@ -532,17 +532,6 @@ private boolean acquireThreadAndCpuPermitElseReleaseCpuHeavyTask() {
} while (true);
}

private boolean tryAcquireTask() {
long snapshot;
do {
snapshot = getExecutionCounters();
if ((snapshot & TASKS_MASK) == 0 || snapshot < 0) {
return false;
}
} while (!tryUpdateExecutionCounters(snapshot, snapshot - ONE_TASK));
return true;
}

/**
* Worker threads determine their next action after completing a task using this method.
*
Expand All @@ -564,7 +553,7 @@ private NextWorkerActivity getActivityFollowingTask() {
} else {
long target = snapshot + ONE_THREAD;
if (UNSAFE.compareAndSwapLong(null, countersAddress, snapshot, target)) {
return quiescentOrIdle(target);
return IDLE;
}
}
snapshot = UNSAFE.getLong(null, countersAddress);
Expand All @@ -574,8 +563,8 @@ private NextWorkerActivity getActivityFollowingTask() {
/**
* Worker threads call this to determine their next action after completing a CPU heavy task.
*
* <p>This releases a CPU permit when returning {@link NextWorkerActivity#QUIESCENT}, {@link
* NextWorkerActivity#IDLE} or {@link NextWorkerActivity#DO_TASK}.
* <p>This releases a CPU permit when returning {@link NextWorkerActivity#IDLE} or {@link
* NextWorkerActivity#DO_TASK}.
*/
private NextWorkerActivity getActivityFollowingCpuHeavyTask() {
long snapshot = UNSAFE.getLongVolatile(null, countersAddress);
Expand All @@ -593,18 +582,13 @@ private NextWorkerActivity getActivityFollowingCpuHeavyTask() {
} else {
long target = snapshot + CPU_HEAVY_RESOURCES;
if (UNSAFE.compareAndSwapLong(null, countersAddress, snapshot, target)) {
return quiescentOrIdle(target);
return IDLE;
}
}
snapshot = UNSAFE.getLong(null, countersAddress);
} while (true);
}

private NextWorkerActivity quiescentOrIdle(long snapshot) {
int snapshotThreads = (int) ((snapshot & THREADS_MASK) >> THREADS_BIT_OFFSET);
return snapshotThreads == poolSize ? QUIESCENT : IDLE;
}

// Throughout this class, the following wrappers are used where possible, but they are often not
// inlined by the JVM even though they show up on profiles, so they are inlined explicitly in
// numerous cases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.lang.Thread.currentThread;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -41,11 +40,6 @@
*
* <p>The queue for non-CPUHeavy tasks has a fixed capacity. When full, callers of execute assist
* with enqueued work.
*
* <p>Threads may voluntarily assist with queued work by calling {@link
* TieredPriorityExecutor#tryDoQueuedWork} when a thread is about to block. If tasks are available,
* the current thread may perform a task inside {@link TieredPriorityExecutor#tryDoQueuedWork}
* before it returns. This may add latency to the donating thread but can reduce overhead.
*/
public final class TieredPriorityExecutor implements QuiescingExecutor {
/** A common cleaner shared by all executors. */
Expand Down Expand Up @@ -163,23 +157,6 @@ public CountDownLatch getInterruptionLatchForTestingOnly() {
throw new UnsupportedOperationException();
}

/**
* Attempts to donate work on the current thread.
*
* <p>Calling this method may be useful if the current thread is about to block. Subject to
* scheduling constraints, attempts to poll work from the queue and execute it on the current
* thread.
*
* @return true if work was donated, false otherwise.
*/
public static boolean tryDoQueuedWork() {
var thread = currentThread();
if (!(thread instanceof PriorityWorkerPool.WorkerThread)) {
return false;
}
return ((PriorityWorkerPool.WorkerThread) thread).tryDoQueuedWork();
}

/**
* The parallelism target of the underlying thread pool.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -493,94 +492,6 @@ public void criticalError_disposesPool() throws InterruptedException {
} while (true);
}

@Test
public void workDonation_processesAllTasks() throws InterruptedException {
var holdAllThreads = new CountDownLatch(1);
for (int i = 0; i < POOL_SIZE - 1; ++i) {
executor.execute(() -> awaitUninterruptibly(holdAllThreads));
}

var donor = new AtomicReference<Thread>();
var donorSet = new CountDownLatch(1);
var gate = new CountDownLatch(1);
var donationDone = new CountDownLatch(1);
executor.execute(
() -> {
donor.set(Thread.currentThread());
donorSet.countDown();
awaitUninterruptibly(gate);
for (int i = 0; i < 100; ++i) {
assertThat(TieredPriorityExecutor.tryDoQueuedWork()).isTrue();
}
donationDone.countDown();
awaitUninterruptibly(holdAllThreads);
});

var receiver = new AtomicInteger();
for (int i = 0; i < 100; ++i) {
executor.execute(
() -> {
// This task is running from the donor's thread.
assertThat(Thread.currentThread()).isEqualTo(donor.get());
receiver.getAndIncrement();
});
}

donorSet.await();
gate.countDown();
donationDone.await();
assertThat(receiver.get()).isEqualTo(100);
holdAllThreads.countDown();
executor.awaitQuiescence(/* interruptWorkers= */ true);
}

@Test
public void workDonation_handlesErrorsInDonatedWork() throws InterruptedException {
var interruptedCount = new AtomicInteger();
var holdAllThreads = new CountDownLatch(1);
for (int i = 0; i < POOL_SIZE - 1; ++i) {
executor.execute(
() -> {
try {
while (!holdAllThreads.await(INTERRUPT_POLL_MS, MILLISECONDS)) {}
} catch (InterruptedException e) {
interruptedCount.getAndIncrement();
}
});
}

var donor = new AtomicReference<Thread>();
var donorSet = new CountDownLatch(1);
var gate = new CountDownLatch(1);
executor.execute(
() -> {
donor.set(Thread.currentThread());
donorSet.countDown();
awaitUninterruptibly(gate);
assertThat(TieredPriorityExecutor.tryDoQueuedWork()).isTrue();
try {
while (!holdAllThreads.await(INTERRUPT_POLL_MS, MILLISECONDS)) {}
} catch (InterruptedException e) {
interruptedCount.getAndIncrement();
}
});

executor.execute(
() -> {
assertThat(Thread.currentThread()).isEqualTo(donor.get());
throw new AssertionError("critical error");
});

donorSet.await();
gate.countDown();

var error =
assertThrows(
AssertionError.class, () -> executor.awaitQuiescence(/* interruptWorkers= */ true));
assertThat(error).hasMessageThat().contains("critical error");
assertThat(interruptedCount.get()).isEqualTo(POOL_SIZE);
}

@Test
public void settableFuture_respondsToInterrupt() throws InterruptedException {
var interruptedCount = new AtomicInteger();
Expand Down Expand Up @@ -682,31 +593,22 @@ public void taskQueueOverflow_executesTasks() throws InterruptedException {
// Waits for holders to start, otherwise they might race against the filling of the queue below.
allHoldersStarted.await();

// Fills up the queue.
var executed = new ArrayList<Integer>();
// Over-fills the queue.
var executed = Sets.<Integer>newConcurrentHashSet();
var expected = new ArrayList<Integer>();
for (int i = 0; i < PriorityWorkerPool.TASKS_MAX_VALUE; ++i) {
for (int i = 0; i < 2 * PriorityWorkerPool.TASKS_MAX_VALUE; ++i) {
expected.add(i);

final int index = i;
executor.execute(() -> executed.add(index));
}

// Adds tasks that would overflow the queue. Since overflows consume tasks from the queue, this
// causes all the tasks above to be executed.
var donorValues = Sets.<Integer>newConcurrentHashSet();
for (int i = 0; i < PriorityWorkerPool.TASKS_MAX_VALUE; ++i) {
final int index = i;
executor.execute(() -> donorValues.add(index));
}

assertThat(executed).isEqualTo(expected);
assertThat(donorValues).isEmpty();
assertThat(executed).isEmpty();

holdAllThreads.countDown();
executor.awaitQuiescence(/* interruptWorkers= */ true);

assertThat(donorValues).containsExactlyElementsIn(expected);
assertThat(executed).containsExactlyElementsIn(expected);
}

@Test
Expand Down

0 comments on commit cb0cf7f

Please sign in to comment.