Skip to content

Commit

Permalink
promisefactory: Use a dedicated ScheduledThreadPoolExecutor
Browse files Browse the repository at this point in the history
The Processor's ScheduledThreadPoolExecutor is used by other tasks in
Bnd and may become jammed up. Since promises uses its
ScheduledThreadPoolExecutor for quick important actions like timeout
and delay, we need to make sure those actions proceed and are not
caught behind other scheduled tasks.

Since the static initializer in Processor is getting complicated, we
refactor the executor creation into a separate class.

Fixes #5112

Signed-off-by: BJ Hargrave <[email protected]>
  • Loading branch information
bjhargrave committed Feb 22, 2022
1 parent 986d53a commit 1df9181
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 94 deletions.
139 changes: 139 additions & 0 deletions biz.aQute.bndlib/src/aQute/bnd/osgi/ExecutorGroup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package aQute.bnd.osgi;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import org.osgi.util.promise.PromiseFactory;

import aQute.bnd.unmodifiable.Lists;

public class ExecutorGroup {

static final class ExecutorThreadFactory implements ThreadFactory {
private final ThreadFactory delegate;
private final String prefix;

ExecutorThreadFactory(ThreadFactory delegate, String prefix) {
this.delegate = delegate;
this.prefix = prefix;
}

@Override
public Thread newThread(Runnable r) {
Thread t = delegate.newThread(r);
t.setName(prefix.concat(t.getName()));
t.setDaemon(true);
return t;
}
}

static final class RejectedExecution implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
return;
}
try {
runnable.run();
} catch (Throwable t) {
// We are stealing another's thread because we have hit max
// pool size, so we cannot let the runnable's exception
// propagate back up this thread.
try {
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler()
.uncaughtException(thread, t);
} catch (Throwable for_real) {
// we will ignore this
}
}
}
}

private final ThreadPoolExecutor executor;
private final ScheduledThreadPoolExecutor scheduledExecutor;
// Use dedicated ScheduledThreadPoolExecutor for the promise factory
private final ScheduledThreadPoolExecutor promiseScheduledExecutor;
private final PromiseFactory promiseFactory;

public ExecutorGroup() {
this(2, Integer.getInteger("bnd.executor.maximumPoolSize", 256)
.intValue());
}

public ExecutorGroup(int corePoolSize, int maximumPoolSize) {
ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecution();
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ExecutorThreadFactory(defaultThreadFactory, "Bnd-Executor,"),
rejectedExecutionHandler);
scheduledExecutor = new ScheduledThreadPoolExecutor(corePoolSize,
new ExecutorThreadFactory(defaultThreadFactory, "Bnd-ScheduledExecutor,"), rejectedExecutionHandler);
// Use dedicated ScheduledThreadPoolExecutor for the promise factory
promiseScheduledExecutor = new ScheduledThreadPoolExecutor(corePoolSize,
new ExecutorThreadFactory(defaultThreadFactory, "Bnd-PromiseScheduledExecutor,"), rejectedExecutionHandler);
promiseFactory = new PromiseFactory(executor, promiseScheduledExecutor);

List<ThreadPoolExecutor> executors = Lists.of(scheduledExecutor, promiseScheduledExecutor, executor);

// Handle shutting down executors via shutdown hook
AtomicBoolean shutdownHookInstalled = new AtomicBoolean();
Function<ThreadPoolExecutor, ThreadFactory> shutdownHookInstaller = threadPoolExecutor -> {
ThreadFactory threadFactory = threadPoolExecutor.getThreadFactory();
return (Runnable r) -> {
threadPoolExecutor.setThreadFactory(threadFactory);
if (shutdownHookInstalled.compareAndSet(false, true)) {
Thread shutdownThread = new Thread(() -> {
executors.forEach(executor -> {
executor.shutdown();
try {
executor.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
}
});
}, "Bnd-ExecutorShutdownHook");
try {
Runtime.getRuntime()
.addShutdownHook(shutdownThread);
} catch (IllegalStateException e) {
// VM is already shutting down...
executors.forEach(ThreadPoolExecutor::shutdown);
}
}
return threadFactory.newThread(r);
};
};
executors.forEach(executor -> {
executor.setThreadFactory(shutdownHookInstaller.apply(executor));
if (executor instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor scheduledExecutor = (ScheduledThreadPoolExecutor) executor;
scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
}
});
}

public Executor getExecutor() {
return executor;
}

public ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
}

public PromiseFactory getPromiseFactory() {
return promiseFactory;
}
}
103 changes: 9 additions & 94 deletions biz.aQute.bndlib/src/aQute/bnd/osgi/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.jar.Attributes;
import java.util.jar.Manifest;
Expand Down Expand Up @@ -107,90 +100,9 @@ public class Processor extends Domain implements Reporter, Registry, Constants,
static final int BUFFER_SIZE = IOConstants.PAGE_SIZE * 1;

final static ThreadLocal<Processor> current = new ThreadLocal<>();
private final static ScheduledThreadPoolExecutor scheduledExecutor;
private final static ThreadPoolExecutor executor;
static {
Function<String, ThreadFactory> threadFactoryFactory = prefix -> {
ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
return (Runnable r) -> {
Thread t = defaultThreadFactory.newThread(r);
t.setName(prefix + t.getName());
t.setDaemon(true);
return t;
};
};
ThreadFactory executorThreadFactory = threadFactoryFactory.apply("Bnd-Executor,");
ThreadFactory scheduledExecutorThreadFactory = threadFactoryFactory.apply("Bnd-ScheduledExecutor,");
RejectedExecutionHandler rejectedExecutionHandler = (Runnable r, ThreadPoolExecutor e) -> {
if (e.isShutdown()) {
return;
}
try {
r.run();
} catch (Throwable t) {
/*
* We are stealing another's thread because we have hit max pool
* size, so we cannot let the runnable's exception propagate
* back up this thread.
*/
try {
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler()
.uncaughtException(thread, t);
} catch (Throwable for_real) {
// we will ignore this
}
}
};
final int corePoolSize = 2;
final int maximumPoolSize = Integer.getInteger("bnd.executor.maximumPoolSize", 256)
.intValue();
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), executorThreadFactory, rejectedExecutionHandler);
scheduledExecutor = new ScheduledThreadPoolExecutor(corePoolSize, scheduledExecutorThreadFactory,
rejectedExecutionHandler);
scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);

// Handle shutting down executors via shutdown hook
AtomicBoolean shutdownHookInstalled = new AtomicBoolean();
Function<ThreadFactory, ThreadFactory> shutdownHookInstaller = threadFactory -> (Runnable r) -> {
if (shutdownHookInstalled.compareAndSet(false, true)) {
executor.setThreadFactory(executorThreadFactory);
scheduledExecutor.setThreadFactory(scheduledExecutorThreadFactory);
Thread shutdownThread = new Thread(() -> {
// limit new thread creation
executor.setMaximumPoolSize(Math.max(corePoolSize, executor.getPoolSize()));
scheduledExecutor.shutdown();
try {
scheduledExecutor.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
}
executor.shutdown();
try {
executor.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
}
}, "Bnd-ExecutorShutdownHook");
try {
Runtime.getRuntime()
.addShutdownHook(shutdownThread);
} catch (IllegalStateException e) {
// VM is already shutting down...
executor.shutdown();
scheduledExecutor.shutdown();
}
}
return threadFactory.newThread(r);
};
executor.setThreadFactory(shutdownHookInstaller.apply(executorThreadFactory));
scheduledExecutor.setThreadFactory(shutdownHookInstaller.apply(scheduledExecutorThreadFactory));
}
private static final PromiseFactory promiseFactory = new PromiseFactory(executor, scheduledExecutor);

private static final Memoize<ExecutorGroup> executors = Memoize.supplier(ExecutorGroup::new);

private static final Memoize<Random> random = Memoize.supplier(Random::new);
public final static String LIST_SPLITTER = "\\s*,\\s*";
final List<String> errors = new ArrayList<>();
Expand Down Expand Up @@ -1956,15 +1868,18 @@ protected void endHandleErrors(Processor previous) {
}

public static Executor getExecutor() {
return executor;
return executors.get()
.getExecutor();
}

public static ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
return executors.get()
.getScheduledExecutor();
}

public static PromiseFactory getPromiseFactory() {
return promiseFactory;
return executors.get()
.getPromiseFactory();
}

/**
Expand Down

0 comments on commit 1df9181

Please sign in to comment.