diff --git a/docs/reference/cat.asciidoc b/docs/reference/cat.asciidoc index 3dff5abc52d9a..7a2262b7962bb 100644 --- a/docs/reference/cat.asciidoc +++ b/docs/reference/cat.asciidoc @@ -93,8 +93,8 @@ Responds with: // TESTRESPONSE[s/9300 27 sLBaIGK/\\d+ \\d+ .+/ _cat] You can also request multiple columns using simple wildcards like -`/_cat/thread_pool?h=ip,bulk.*` to get all headers (or aliases) starting -with `bulk.`. +`/_cat/thread_pool?h=ip,queue*` to get all headers (or aliases) starting +with `queue`. [float] [[numeric-formats]] diff --git a/docs/reference/cat/thread_pool.asciidoc b/docs/reference/cat/thread_pool.asciidoc index de41359a23b62..f1c7664ae33f6 100644 --- a/docs/reference/cat/thread_pool.asciidoc +++ b/docs/reference/cat/thread_pool.asciidoc @@ -15,7 +15,6 @@ Which looks like: [source,txt] -------------------------------------------------- node-0 analyze 0 0 0 -node-0 bulk 0 0 0 node-0 fetch_shard_started 0 0 0 node-0 fetch_shard_store 0 0 0 node-0 flush 0 0 0 @@ -29,6 +28,7 @@ node-0 refresh 0 0 0 node-0 search 0 0 0 node-0 snapshot 0 0 0 node-0 warmer 0 0 0 +node-0 write 0 0 0 -------------------------------------------------- // TESTRESPONSE[s/\d+/\\d+/ _cat] @@ -45,7 +45,6 @@ The second column is the thread pool name -------------------------------------------------- name analyze -bulk fetch_shard_started fetch_shard_store flush @@ -59,6 +58,7 @@ refresh search snapshot warmer +write -------------------------------------------------- diff --git a/docs/reference/migration/migrate_6_3.asciidoc b/docs/reference/migration/migrate_6_3.asciidoc index c6165983342d4..a658a24c7c9bc 100644 --- a/docs/reference/migration/migrate_6_3.asciidoc +++ b/docs/reference/migration/migrate_6_3.asciidoc @@ -44,3 +44,19 @@ users we think this is fine as analyze requests are useful for debugging and so probably do not see high concurrency. If you are impacted by this you can increase the size of the thread pool by using the `thread_pool.analyze.size` setting. + +==== Renaming the bulk thread pool + +The `bulk` thread pool has been renamed to the `write` thread pool. This change +was made to reflect the fact that this thread pool is used to execute all write +operations: single-document index/delete/update requests, as well as bulk +requests. The settings `thread_pool.bulk.size` and `thread_pool.bulk.queue_size` +are still supported as fallback settings although you should transition to +`thread_pool.write.size` and `thread_pool.write.queue_size`, respectively. The +fallback settings will be removed in 7.0.0. Additionally, this means that some +APIs (e.g., the node stats API) will now display the name of this thread pool as +`write`. If this change impacts you (e.g., for monitoring that you have in +place) you can start Elasticsearch with the JVM option +`-Des.thread_pool.write.use_bulk_as_display_name=true` to have Elasticsearch +continue to display the name of this thread pool as `bulk`. Elasticsearch will +stop observing this system property in 7.0.0. diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index fa8522ea1cbb0..9519dbf3e03a8 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -33,11 +33,10 @@ There are several thread pools, but the important ones include: `analyze`:: For analyze requests. Thread pool type is `fixed` with a size of 1, queue size of 16. -`bulk`:: - For bulk operations. Thread pool type is `fixed` - with a size of `# of available processors`, - queue_size of `200`. The maximum size for this pool - is `1 + # of available processors`. +`write`:: + For single-document index/delete/update and bulk requests. Thread pool type + is `fixed` with a size of `# of available processors`, queue_size of `200`. + The maximum size for this pool is `1 + # of available processors`. `snapshot`:: For snapshot/restore operations. Thread pool type is `scaling` with a diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java index da0dbf2aae345..131c959af8afc 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java @@ -158,10 +158,10 @@ private void testCase( final Settings nodeSettings = Settings.builder() // use pools of size 1 so we can block them - .put("thread_pool.bulk.size", 1) + .put("thread_pool.write.size", 1) .put("thread_pool.search.size", 1) // use queues of size 1 because size 0 is broken and because search requests need the queue to function - .put("thread_pool.bulk.queue_size", 1) + .put("thread_pool.write.queue_size", 1) .put("thread_pool.search.queue_size", 1) .put("node.attr.color", "blue") .build(); @@ -203,7 +203,7 @@ private void testCase( assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L))); logger.info("Blocking bulk and unblocking search so we start to get bulk rejections"); - CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.BULK, node); + CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.WRITE, node); initialSearchBlock.await(); logger.info("Waiting for bulk rejections"); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml index 9cd970341412a..7a11388d1bf89 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml @@ -1,5 +1,8 @@ --- "Test cat thread_pool output": + - skip: + version: " - 6.2.99" + reason: the write thread pool was renamed from bulk in 6.3.0 - do: cat.thread_pool: {} @@ -29,30 +32,30 @@ - do: cat.thread_pool: - thread_pool_patterns: bulk,management,flush,index,generic,force_merge + thread_pool_patterns: write,management,flush,index,generic,force_merge h: id,name,active v: true - match: $body: | /^ id \s+ name \s+ active \n - (\S+\s+ bulk \s+ \d+ \n - \S+\s+ flush \s+ \d+ \n + (\S+\s+ flush \s+ \d+ \n \S+\s+ force_merge \s+ \d+ \n \S+\s+ generic \s+ \d+ \n \S+\s+ index \s+ \d+ \n - \S+\s+ management \s+ \d+ \n)+ $/ + \S+\s+ management \s+ \d+ \n + \S+\s+ write \s+ \d+ \n)+ $/ - do: cat.thread_pool: - thread_pool_patterns: bulk + thread_pool_patterns: write h: id,name,type,active,size,queue,queue_size,rejected,largest,completed,min,max,keep_alive v: true - match: $body: | - /^ id \s+ name \s+ type \s+ active \s+ size \s+ queue \s+ queue_size \s+ rejected \s+ largest \s+ completed \s+ min \s+ max \s+ keep_alive \n - (\S+ \s+ bulk \s+ fixed \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \n)+ $/ + /^ id \s+ name \s+ type \s+ active \s+ size \s+ queue \s+ queue_size \s+ rejected \s+ largest \s+ completed \s+ min \s+ max \s+ keep_alive \n + (\S+ \s+ write \s+ fixed \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \n)+ $/ - do: cat.thread_pool: @@ -68,12 +71,12 @@ - do: cat.thread_pool: - thread_pool_patterns: bulk,index,search + thread_pool_patterns: write,index,search size: "" - match: $body: | / #node_name name active queue rejected - ^ (\S+ \s+ bulk \s+ \d+ \s+ \d+ \s+ \d+ \n - \S+ \s+ index \s+ \d+ \s+ \d+ \s+ \d+ \n - \S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/ + ^ (\S+ \s+ index \s+ \d+ \s+ \d+ \s+ \d+ \n + \S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n + \S+ \s+ write \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/ diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 7221118d2ef50..8fb490c4b6531 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -83,7 +83,7 @@ public TransportShardBulkAction(Settings settings, TransportService transportSer MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.BULK); + indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE); this.updateHelper = updateHelper; this.mappingUpdatedAction = mappingUpdatedAction; } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 4e7c66afdcaf0..c182fb24ffb11 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -60,7 +60,7 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran ShardStateAction shardStateAction, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.BULK); + indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE); } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index 69018db2c9365..d2503476d3353 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -56,7 +56,7 @@ public PipelineExecutionService(PipelineStore store, ThreadPool threadPool) { public void executeBulkRequest(Iterable actionRequests, BiConsumer itemFailureHandler, Consumer completionHandler) { - threadPool.executor(ThreadPool.Names.BULK).execute(new AbstractRunnable() { + threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { diff --git a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java index 314eb1df71a4b..b50443a8f9e06 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java @@ -48,7 +48,7 @@ protected static String settingsKey(final String prefix, final String key) { } protected int applyHardSizeLimit(final Settings settings, final String name) { - if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) { + if (name.equals("bulk") || name.equals(ThreadPool.Names.INDEX) || name.equals(ThreadPool.Names.WRITE)) { return 1 + EsExecutors.numberOfProcessors(settings); } else { return Integer.MAX_VALUE; diff --git a/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java index 94db6cb64e2c8..e4ec0ef3d45a4 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java @@ -19,6 +19,7 @@ package org.elasticsearch.threadpool; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.SizeValue; @@ -38,7 +39,9 @@ public final class FixedExecutorBuilder extends ExecutorBuilder { private final Setting sizeSetting; + private final Setting fallbackSizeSetting; private final Setting queueSizeSetting; + private final Setting fallbackQueueSizeSetting; /** * Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name. @@ -52,6 +55,19 @@ public final class FixedExecutorBuilder extends ExecutorBuilder fallbackSizeSetting = sizeSetting(settings, fallbackName, size, fallbackPrefix, fallbackProperties); + this.sizeSetting = + new Setting<>( + new Setting.SimpleKey(sizeKey), + fallbackSizeSetting, + s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), + properties); + this.fallbackSizeSetting = fallbackSizeSetting; + final Setting fallbackQueueSizeSetting = queueSizeSetting(fallbackPrefix, queueSize, fallbackProperties); + this.queueSizeSetting = + new Setting<>( + new Setting.SimpleKey(queueSizeKey), + fallbackQueueSizeSetting, + s -> Setting.parseInt(s, Integer.MIN_VALUE, queueSizeKey), + properties); + this.fallbackQueueSizeSetting = fallbackQueueSizeSetting; } - this.sizeSetting = - new Setting<>( - sizeKey, - s -> Integer.toString(size), - s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), - properties); - final String queueSizeKey = settingsKey(prefix, "queue_size"); - this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, properties); + } + + private Setting sizeSetting( + final Settings settings, final String name, final int size, final String prefix, final Setting.Property[] properties) { + final String sizeKey = settingsKey(prefix, "size"); + return new Setting<>( + sizeKey, + s -> Integer.toString(size), + s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), + properties); + } + + private Setting queueSizeSetting(final String prefix, final int queueSize, final Setting.Property[] properties) { + return Setting.intSetting(settingsKey(prefix, "queue_size"), queueSize, properties); } @Override public List> getRegisteredSettings() { - return Arrays.asList(sizeSetting, queueSizeSetting); + if (fallbackSizeSetting == null && fallbackQueueSizeSetting == null) { + return Arrays.asList(sizeSetting, queueSizeSetting); + } else { + assert fallbackSizeSetting != null && fallbackQueueSizeSetting != null; + return Arrays.asList(sizeSetting, fallbackSizeSetting, queueSizeSetting, fallbackQueueSizeSetting); + } } @Override @@ -132,8 +190,14 @@ ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final Thre final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name())); final ExecutorService executor = EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext); + final String name; + if ("write".equals(name()) && Booleans.parseBoolean(System.getProperty("es.thread_pool.write.use_bulk_as_display_name", "false"))) { + name = "bulk"; + } else { + name = name(); + } final ThreadPool.Info info = - new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize)); + new ThreadPool.Info(name, ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize)); return new ThreadPool.ExecutorHolder(executor, info); } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index a8f5cd2c33578..bf3641f18b172 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -69,7 +69,7 @@ public static class Names { public static final String GET = "get"; public static final String ANALYZE = "analyze"; public static final String INDEX = "index"; - public static final String BULK = "bulk"; + public static final String WRITE = "write"; public static final String SEARCH = "search"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; @@ -126,7 +126,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.GET, ThreadPoolType.FIXED); map.put(Names.ANALYZE, ThreadPoolType.FIXED); map.put(Names.INDEX, ThreadPoolType.FIXED); - map.put(Names.BULK, ThreadPoolType.FIXED); + map.put(Names.WRITE, ThreadPoolType.FIXED); map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE); map.put(Names.MANAGEMENT, ThreadPoolType.SCALING); map.put(Names.FLUSH, ThreadPoolType.SCALING); @@ -170,7 +170,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true)); - builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops + builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, "bulk", availableProcessors, 200)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, @@ -264,7 +264,7 @@ public Info info(String name) { public ThreadPoolStats stats() { List stats = new ArrayList<>(); for (ExecutorHolder holder : executors.values()) { - String name = holder.info.getName(); + final String name = holder.info.getName(); // no need to have info on "same" thread pool if ("same".equals(name)) { continue; diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 1a07eac1adbd5..4b96f3d17543c 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -54,8 +54,8 @@ protected Settings nodeSettings(int nodeOrdinal) { // (see also ThreadedActionListener which is happily spawning threads even when we already got rejected) //.put("thread_pool.listener.queue_size", 1) .put("thread_pool.get.queue_size", 1) - // default is 50 - .put("thread_pool.bulk.queue_size", 30) + // default is 200 + .put("thread_pool.write.queue_size", 30) .build(); } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java index b6110a85eceb6..33917ceff685b 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutorTests.java @@ -35,8 +35,8 @@ public class EsThreadPoolExecutorTests extends ESSingleNodeTestCase { protected Settings nodeSettings() { return Settings.builder() .put("node.name", "es-thread-pool-executor-tests") - .put("thread_pool.bulk.size", 1) - .put("thread_pool.bulk.queue_size", 0) + .put("thread_pool.write.size", 1) + .put("thread_pool.write.queue_size", 0) .put("thread_pool.search.size", 1) .put("thread_pool.search.queue_size", 1) .build(); @@ -44,7 +44,7 @@ protected Settings nodeSettings() { public void testRejectedExecutionExceptionContainsNodeName() { // we test a fixed and an auto-queue executor but not scaling since it does not reject - runThreadPoolExecutorTest(1, ThreadPool.Names.BULK); + runThreadPoolExecutorTest(1, ThreadPool.Names.WRITE); runThreadPoolExecutorTest(2, ThreadPool.Names.SEARCH); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index a8e282e2c17e5..28f623b23e282 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -69,18 +69,18 @@ public class IndexShardOperationPermitsTests extends ESTestCase { @BeforeClass public static void setupThreadPool() { - int bulkThreadPoolSize = randomIntBetween(1, 2); - int bulkThreadPoolQueueSize = randomIntBetween(1, 2); + int writeThreadPoolSize = randomIntBetween(1, 2); + int writeThreadPoolQueueSize = randomIntBetween(1, 2); threadPool = new TestThreadPool("IndexShardOperationsLockTests", Settings.builder() - .put("thread_pool." + ThreadPool.Names.BULK + ".size", bulkThreadPoolSize) - .put("thread_pool." + ThreadPool.Names.BULK + ".queue_size", bulkThreadPoolQueueSize) + .put("thread_pool." + ThreadPool.Names.WRITE + ".size", writeThreadPoolSize) + .put("thread_pool." + ThreadPool.Names.WRITE + ".queue_size", writeThreadPoolQueueSize) .build()); - assertThat(threadPool.executor(ThreadPool.Names.BULK), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getCorePoolSize(), equalTo(bulkThreadPoolSize)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getMaximumPoolSize(), equalTo(bulkThreadPoolSize)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getQueue().remainingCapacity(), - equalTo(bulkThreadPoolQueueSize)); + assertThat(threadPool.executor(ThreadPool.Names.WRITE), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.WRITE)).getCorePoolSize(), equalTo(writeThreadPoolSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.WRITE)).getMaximumPoolSize(), equalTo(writeThreadPoolSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.WRITE)).getQueue().remainingCapacity(), + equalTo(writeThreadPoolQueueSize)); } @AfterClass @@ -110,8 +110,8 @@ class DummyException extends RuntimeException {} CountDownLatch latch = new CountDownLatch(numThreads / 4); boolean forceExecution = randomBoolean(); for (int i = 0; i < numThreads; i++) { - // the bulk thread pool uses a bounded size and can get rejections, see setupThreadPool - String threadPoolName = randomFrom(ThreadPool.Names.BULK, ThreadPool.Names.GENERIC); + // the write thread pool uses a bounded size and can get rejections, see setupThreadPool + String threadPoolName = randomFrom(ThreadPool.Names.WRITE, ThreadPool.Names.GENERIC); boolean failingListener = randomBoolean(); PlainActionFuture future = new PlainActionFuture() { @Override diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java index 3830d10f69b3c..546017f807ac0 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java @@ -87,9 +87,9 @@ public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception { } public void testThatNegativeSettingAllowsToStart() throws InterruptedException { - Settings settings = Settings.builder().put("node.name", "bulk").put("thread_pool.bulk.queue_size", "-1").build(); + Settings settings = Settings.builder().put("node.name", "write").put("thread_pool.write.queue_size", "-1").build(); ThreadPool threadPool = new ThreadPool(settings); - assertThat(threadPool.info("bulk").getQueueSize(), is(nullValue())); + assertThat(threadPool.info("write").getQueueSize(), is(nullValue())); terminate(threadPool); } diff --git a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index f0f8c70a3f9d4..6ae41febf3b19 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -61,7 +61,7 @@ public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedExc } public void testIndexingThreadPoolsMaxSize() throws InterruptedException { - final String name = randomFrom(Names.BULK, Names.INDEX); + final String name = randomFrom(Names.WRITE, Names.INDEX); final int maxSize = 1 + EsExecutors.numberOfProcessors(Settings.EMPTY); final int tooBig = randomIntBetween(1 + maxSize, Integer.MAX_VALUE); @@ -92,7 +92,7 @@ public void testIndexingThreadPoolsMaxSize() throws InterruptedException { } private static int getExpectedThreadPoolSize(Settings settings, String name, int size) { - if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) { + if (name.equals(ThreadPool.Names.WRITE) || name.equals(ThreadPool.Names.INDEX)) { return Math.min(size, EsExecutors.numberOfProcessors(settings)); } else { return size;