From 9ec76b8e14354d85c202005c5dddd29a5fc439e9 Mon Sep 17 00:00:00 2001 From: Michael Hunger Date: Wed, 30 Aug 2017 14:21:27 +0200 Subject: [PATCH] Make default thread pool size configurable --- docs/overview.adoc | 1 + docs/periodic.adoc | 8 ++++++-- src/main/java/apoc/Pools.java | 23 +++++++++++++++++------ src/main/java/apoc/util/Util.java | 9 +++++++++ 4 files changed, 33 insertions(+), 8 deletions(-) diff --git a/docs/overview.adoc b/docs/overview.adoc index bcb15df881..7868b8cb56 100644 --- a/docs/overview.adoc +++ b/docs/overview.adoc @@ -59,6 +59,7 @@ All boolean options default to **false**, i.e. they are disabled, unless mention | apoc.mongodb..uri=mongodb-url-with-credentials | store mongodb-urls under a key to be used by mongodb procedures | apoc.couchbase..uri=couchbase-url-with-credentials | store couchbase-urls under a key to be used by couchbase procedures | apoc.jobs.scheduled.num_threads=number-of-threads | Many periodic procedures rely on a scheduled executor that has a pool of threads with a default fixed size. You can configure the pool size using this configuration property +| apoc.jobs.default.num_threads=number-of-threads | Number of threads in the default APOC thread pool used for background executions. |=== diff --git a/docs/periodic.adoc b/docs/periodic.adoc index be8311a64e..5a5e055d54 100644 --- a/docs/periodic.adoc +++ b/docs/periodic.adoc @@ -13,7 +13,11 @@ Also sometimes you want to schedule execution of Cypher statements to run regula The `apoc.periodic.*` procedures provide such capabilities. -Many periodic procedures rely on a scheduled executor that has a pool of threads with a default fixed size. You can configure the pool size using the following configuration property: +Many procedures run in the background or asynchronously. This setting overrides the default thread pool size (processors*2). + +`apoc.jobs.default.num_threads=10` + +Many periodic procedures rely on a scheduled executor that has a pool of threads with a default fixed size (processors/4, at least 1). You can configure the pool size using the following configuration property: `apoc.jobs.scheduled.num_threads=10` @@ -137,4 +141,4 @@ CALL apoc.periodic.iterate(' match (p:Person) return p', 'SET p.lastname=p.name', {batchSize:20000,parallel:true, iterateList:true}) ---- -//// \ No newline at end of file +//// diff --git a/src/main/java/apoc/Pools.java b/src/main/java/apoc/Pools.java index be4e9dbaa3..f9f39d740e 100644 --- a/src/main/java/apoc/Pools.java +++ b/src/main/java/apoc/Pools.java @@ -1,5 +1,6 @@ package apoc; +import apoc.util.Util; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Transaction; import org.neo4j.kernel.impl.util.JobScheduler; @@ -10,11 +11,16 @@ import java.util.function.Consumer; public class Pools { - private final static String DEFAULT_NUM_OF_THREADS = new Integer(Runtime.getRuntime().availableProcessors() / 4).toString(); + + static final String CONFIG_JOBS_SCHEDULED_NUM_THREADS = "jobs.scheduled.num_threads"; + static final String CONFIG_JOBS_POOL_NUM_THREADS = "jobs.pool.num_threads"; + + private final static int DEFAULT_SCHEDULED_THREADS = Runtime.getRuntime().availableProcessors() / 4; + private final static int DEFAULT_POOL_THREADS = Runtime.getRuntime().availableProcessors() * 2; public final static ExecutorService SINGLE = createSinglePool(); public final static ExecutorService DEFAULT = createDefaultPool(); - public final static ScheduledExecutorService SCHEDULED = createScheduledPool(Integer.parseInt(ApocConfiguration.get("jobs.scheduled.num_threads", DEFAULT_NUM_OF_THREADS))); + public final static ScheduledExecutorService SCHEDULED = createScheduledPool(); public static JobScheduler NEO4J_SCHEDULER = null; private Pools() { @@ -22,7 +28,7 @@ private Pools() { } public static ExecutorService createDefaultPool() { - int threads = Runtime.getRuntime().availableProcessors()*2; + int threads = getNoThreadsInDefaultPool(); int queueSize = threads * 25; return new ThreadPoolExecutor(threads / 2, threads, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize), new CallerBlocksPolicy()); @@ -45,15 +51,20 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { } public static int getNoThreadsInDefaultPool() { - return Runtime.getRuntime().availableProcessors(); + Integer maxThreads = Util.toInteger(ApocConfiguration.get(CONFIG_JOBS_POOL_NUM_THREADS, DEFAULT_POOL_THREADS)); + return Math.max(1, maxThreads == null ? DEFAULT_POOL_THREADS : maxThreads); + } + public static int getNoThreadsInScheduledPool() { + Integer maxThreads = Util.toInteger(ApocConfiguration.get(CONFIG_JOBS_SCHEDULED_NUM_THREADS, DEFAULT_SCHEDULED_THREADS)); + return Math.max(1, maxThreads == null ? DEFAULT_POOL_THREADS : maxThreads); } private static ExecutorService createSinglePool() { return Executors.newSingleThreadExecutor(); } - private static ScheduledExecutorService createScheduledPool(int numOfThreads) { - return Executors.newScheduledThreadPool(Math.max(1, numOfThreads)); + private static ScheduledExecutorService createScheduledPool() { + return Executors.newScheduledThreadPool(getNoThreadsInScheduledPool()); } public static Future processBatch(List batch, GraphDatabaseService db, Consumer action) { diff --git a/src/main/java/apoc/util/Util.java b/src/main/java/apoc/util/Util.java index 3108b35a54..31473d8026 100644 --- a/src/main/java/apoc/util/Util.java +++ b/src/main/java/apoc/util/Util.java @@ -253,6 +253,15 @@ public static Long toLong(Object value) { return null; } } + public static Integer toInteger(Object value) { + if (value == null) return null; + if (value instanceof Number) return ((Number)value).intValue(); + try { + return Integer.parseInt(value.toString()); + } catch (NumberFormatException e) { + return null; + } + } public static URLConnection openUrlConnection(String url, Map headers) throws IOException { URL src = new URL(url);