Skip to content

Commit

Permalink
Make default thread pool size configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
jexp committed Aug 30, 2017
1 parent 0c48b17 commit 9ec76b8
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 8 deletions.
1 change: 1 addition & 0 deletions docs/overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ All boolean options default to **false**, i.e. they are disabled, unless mention
| apoc.mongodb.<key>.uri=mongodb-url-with-credentials | store mongodb-urls under a key to be used by mongodb procedures
| apoc.couchbase.<key>.uri=couchbase-url-with-credentials | store couchbase-urls under a key to be used by couchbase procedures
| apoc.jobs.scheduled.num_threads=number-of-threads | Many periodic procedures rely on a scheduled executor that has a pool of threads with a default fixed size. You can configure the pool size using this configuration property
| apoc.jobs.default.num_threads=number-of-threads | Number of threads in the default APOC thread pool used for background executions.
|===


Expand Down
8 changes: 6 additions & 2 deletions docs/periodic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ Also sometimes you want to schedule execution of Cypher statements to run regula

The `apoc.periodic.*` procedures provide such capabilities.

Many periodic procedures rely on a scheduled executor that has a pool of threads with a default fixed size. You can configure the pool size using the following configuration property:
Many procedures run in the background or asynchronously. This setting overrides the default thread pool size (processors*2).

`apoc.jobs.default.num_threads=10`

Many periodic procedures rely on a scheduled executor that has a pool of threads with a default fixed size (processors/4, at least 1). You can configure the pool size using the following configuration property:

`apoc.jobs.scheduled.num_threads=10`

Expand Down Expand Up @@ -137,4 +141,4 @@ CALL apoc.periodic.iterate('
match (p:Person) return p',
'SET p.lastname=p.name', {batchSize:20000,parallel:true, iterateList:true})
----
////
////
23 changes: 17 additions & 6 deletions src/main/java/apoc/Pools.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package apoc;

import apoc.util.Util;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.impl.util.JobScheduler;
Expand All @@ -10,19 +11,24 @@
import java.util.function.Consumer;

public class Pools {
private final static String DEFAULT_NUM_OF_THREADS = new Integer(Runtime.getRuntime().availableProcessors() / 4).toString();

static final String CONFIG_JOBS_SCHEDULED_NUM_THREADS = "jobs.scheduled.num_threads";
static final String CONFIG_JOBS_POOL_NUM_THREADS = "jobs.pool.num_threads";

private final static int DEFAULT_SCHEDULED_THREADS = Runtime.getRuntime().availableProcessors() / 4;
private final static int DEFAULT_POOL_THREADS = Runtime.getRuntime().availableProcessors() * 2;

public final static ExecutorService SINGLE = createSinglePool();
public final static ExecutorService DEFAULT = createDefaultPool();
public final static ScheduledExecutorService SCHEDULED = createScheduledPool(Integer.parseInt(ApocConfiguration.get("jobs.scheduled.num_threads", DEFAULT_NUM_OF_THREADS)));
public final static ScheduledExecutorService SCHEDULED = createScheduledPool();
public static JobScheduler NEO4J_SCHEDULER = null;

private Pools() {
throw new UnsupportedOperationException();
}

public static ExecutorService createDefaultPool() {
int threads = Runtime.getRuntime().availableProcessors()*2;
int threads = getNoThreadsInDefaultPool();
int queueSize = threads * 25;
return new ThreadPoolExecutor(threads / 2, threads, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize),
new CallerBlocksPolicy());
Expand All @@ -45,15 +51,20 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}

public static int getNoThreadsInDefaultPool() {
return Runtime.getRuntime().availableProcessors();
Integer maxThreads = Util.toInteger(ApocConfiguration.get(CONFIG_JOBS_POOL_NUM_THREADS, DEFAULT_POOL_THREADS));
return Math.max(1, maxThreads == null ? DEFAULT_POOL_THREADS : maxThreads);
}
public static int getNoThreadsInScheduledPool() {
Integer maxThreads = Util.toInteger(ApocConfiguration.get(CONFIG_JOBS_SCHEDULED_NUM_THREADS, DEFAULT_SCHEDULED_THREADS));
return Math.max(1, maxThreads == null ? DEFAULT_POOL_THREADS : maxThreads);
}

private static ExecutorService createSinglePool() {
return Executors.newSingleThreadExecutor();
}

private static ScheduledExecutorService createScheduledPool(int numOfThreads) {
return Executors.newScheduledThreadPool(Math.max(1, numOfThreads));
private static ScheduledExecutorService createScheduledPool() {
return Executors.newScheduledThreadPool(getNoThreadsInScheduledPool());
}

public static <T> Future<Void> processBatch(List<T> batch, GraphDatabaseService db, Consumer<T> action) {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/apoc/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,15 @@ public static Long toLong(Object value) {
return null;
}
}
public static Integer toInteger(Object value) {
if (value == null) return null;
if (value instanceof Number) return ((Number)value).intValue();
try {
return Integer.parseInt(value.toString());
} catch (NumberFormatException e) {
return null;
}
}

public static URLConnection openUrlConnection(String url, Map<String, Object> headers) throws IOException {
URL src = new URL(url);
Expand Down

0 comments on commit 9ec76b8

Please sign in to comment.