Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename the bulk thread pool to write thread pool #29593

Merged
merged 4 commits into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/reference/cat.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ Responds with:
// TESTRESPONSE[s/9300 27 sLBaIGK/\\d+ \\d+ .+/ _cat]

You can also request multiple columns using simple wildcards like
`/_cat/thread_pool?h=ip,bulk.*` to get all headers (or aliases) starting
with `bulk.`.
`/_cat/thread_pool?h=ip,queue*` to get all headers (or aliases) starting
with `queue`.

[float]
[[numeric-formats]]
Expand Down
4 changes: 2 additions & 2 deletions docs/reference/cat/thread_pool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Which looks like:
[source,txt]
--------------------------------------------------
node-0 analyze 0 0 0
node-0 bulk 0 0 0
node-0 fetch_shard_started 0 0 0
node-0 fetch_shard_store 0 0 0
node-0 flush 0 0 0
Expand All @@ -29,6 +28,7 @@ node-0 refresh 0 0 0
node-0 search 0 0 0
node-0 snapshot 0 0 0
node-0 warmer 0 0 0
node-0 write 0 0 0
--------------------------------------------------
// TESTRESPONSE[s/\d+/\\d+/ _cat]

Expand All @@ -45,7 +45,6 @@ The second column is the thread pool name
--------------------------------------------------
name
analyze
bulk
fetch_shard_started
fetch_shard_store
flush
Expand All @@ -59,6 +58,7 @@ refresh
search
snapshot
warmer
write
--------------------------------------------------


Expand Down
16 changes: 16 additions & 0 deletions docs/reference/migration/migrate_6_3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,19 @@ users we think this is fine as analyze requests are useful for debugging and so
probably do not see high concurrency. If you are impacted by this you can
increase the size of the thread pool by using the `thread_pool.analyze.size`
setting.

==== Renaming the bulk thread pool

The `bulk` thread pool has been renamed to the `write` thread pool. This change
was made to reflect the fact that this thread pool is used to execute all write
operations: single-document index/delete/update requests, as well as bulk
requests. The settings `thread_pool.bulk.size` and `thread_pool.bulk.queue_size`
are still supported as fallback settings although you should transition to
`thread_pool.write.size` and `thread_pool.write.queue_size`, respectively. The
fallback settings will be removed in 7.0.0. Additionally, this means that some
APIs (e.g., the node stats API) will now display the name of this thread pool as
`write`. If this change impacts you (e.g., for monitoring that you have in
place) you can start Elasticsearch with the JVM option
`-Des.thread_pool.write.use_bulk_as_display_name=true` to have Elasticsearch
continue to display the name of this thread pool as `bulk`. Elasticsearch will
stop observing this system property in 7.0.0.
9 changes: 4 additions & 5 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ There are several thread pools, but the important ones include:
`analyze`::
For analyze requests. Thread pool type is `fixed` with a size of 1, queue size of 16.

`bulk`::
For bulk operations. Thread pool type is `fixed`
with a size of `# of available processors`,
queue_size of `200`. The maximum size for this pool
is `1 + # of available processors`.
`write`::
For single-document index/delete/update and bulk requests. Thread pool type
is `fixed` with a size of `# of available processors`, queue_size of `200`.
The maximum size for this pool is `1 + # of available processors`.

`snapshot`::
For snapshot/restore operations. Thread pool type is `scaling` with a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ private void testCase(

final Settings nodeSettings = Settings.builder()
// use pools of size 1 so we can block them
.put("thread_pool.bulk.size", 1)
.put("thread_pool.write.size", 1)
.put("thread_pool.search.size", 1)
// use queues of size 1 because size 0 is broken and because search requests need the queue to function
.put("thread_pool.bulk.queue_size", 1)
.put("thread_pool.write.queue_size", 1)
.put("thread_pool.search.queue_size", 1)
.put("node.attr.color", "blue")
.build();
Expand Down Expand Up @@ -203,7 +203,7 @@ private void testCase(
assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L)));

logger.info("Blocking bulk and unblocking search so we start to get bulk rejections");
CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.BULK, node);
CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.WRITE, node);
initialSearchBlock.await();

logger.info("Waiting for bulk rejections");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
"Test cat thread_pool output":
- skip:
version: " - 6.2.99"
reason: the write thread pool was renamed from bulk in 6.3.0

- do:
cat.thread_pool: {}
Expand Down Expand Up @@ -29,30 +32,30 @@

- do:
cat.thread_pool:
thread_pool_patterns: bulk,management,flush,index,generic,force_merge
thread_pool_patterns: write,management,flush,index,generic,force_merge
h: id,name,active
v: true

- match:
$body: |
/^ id \s+ name \s+ active \n
(\S+\s+ bulk \s+ \d+ \n
\S+\s+ flush \s+ \d+ \n
(\S+\s+ flush \s+ \d+ \n
\S+\s+ force_merge \s+ \d+ \n
\S+\s+ generic \s+ \d+ \n
\S+\s+ index \s+ \d+ \n
\S+\s+ management \s+ \d+ \n)+ $/
\S+\s+ management \s+ \d+ \n
\S+\s+ write \s+ \d+ \n)+ $/

- do:
cat.thread_pool:
thread_pool_patterns: bulk
thread_pool_patterns: write
h: id,name,type,active,size,queue,queue_size,rejected,largest,completed,min,max,keep_alive
v: true

- match:
$body: |
/^ id \s+ name \s+ type \s+ active \s+ size \s+ queue \s+ queue_size \s+ rejected \s+ largest \s+ completed \s+ min \s+ max \s+ keep_alive \n
(\S+ \s+ bulk \s+ fixed \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \n)+ $/
/^ id \s+ name \s+ type \s+ active \s+ size \s+ queue \s+ queue_size \s+ rejected \s+ largest \s+ completed \s+ min \s+ max \s+ keep_alive \n
(\S+ \s+ write \s+ fixed \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \n)+ $/

- do:
cat.thread_pool:
Expand All @@ -68,12 +71,12 @@

- do:
cat.thread_pool:
thread_pool_patterns: bulk,index,search
thread_pool_patterns: write,index,search
size: ""

- match:
$body: |
/ #node_name name active queue rejected
^ (\S+ \s+ bulk \s+ \d+ \s+ \d+ \s+ \d+ \n
\S+ \s+ index \s+ \d+ \s+ \d+ \s+ \d+ \n
\S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
^ (\S+ \s+ index \s+ \d+ \s+ \d+ \s+ \d+ \n
\S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n
\S+ \s+ write \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public TransportShardBulkAction(Settings settings, TransportService transportSer
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.BULK);
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.BULK);
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public PipelineExecutionService(PipelineStore store, ThreadPool threadPool) {
public void executeBulkRequest(Iterable<DocWriteRequest> actionRequests,
BiConsumer<IndexRequest, Exception> itemFailureHandler,
Consumer<Exception> completionHandler) {
threadPool.executor(ThreadPool.Names.BULK).execute(new AbstractRunnable() {
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {

@Override
public void onFailure(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected static String settingsKey(final String prefix, final String key) {
}

protected int applyHardSizeLimit(final Settings settings, final String name) {
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
if (name.equals("bulk") || name.equals(ThreadPool.Names.INDEX) || name.equals(ThreadPool.Names.WRITE)) {
return 1 + EsExecutors.numberOfProcessors(settings);
} else {
return Integer.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.threadpool;

import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
Expand All @@ -38,7 +39,9 @@
public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBuilder.FixedExecutorSettings> {

private final Setting<Integer> sizeSetting;
private final Setting<Integer> fallbackSizeSetting;
private final Setting<Integer> queueSizeSetting;
private final Setting<Integer> fallbackQueueSizeSetting;

/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
Expand All @@ -52,6 +55,19 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
this(settings, name, size, queueSize, false);
}

/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param fallbackName the fallback name of the executor (used for transitioning the name of a setting)
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
*/
FixedExecutorBuilder(final Settings settings, final String name, final String fallbackName, final int size, final int queueSize) {
this(settings, name, fallbackName, size, queueSize, "thread_pool." + name, "thread_pool." + fallbackName, false);
}

/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
*
Expand All @@ -62,7 +78,7 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
* @param deprecated whether or not the thread pool is deprecated
*/
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final boolean deprecated) {
this(settings, name, size, queueSize, "thread_pool." + name, deprecated);
this(settings, name, null, size, queueSize, "thread_pool." + name, null, deprecated);
}

/**
Expand All @@ -75,7 +91,7 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
* @param prefix the prefix for the settings keys
*/
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
this(settings, name, size, queueSize, prefix, false);
this(settings, name, null, size, queueSize, prefix, null, false);
}

/**
Expand All @@ -90,31 +106,73 @@ public FixedExecutorBuilder(final Settings settings, final String name, final in
private FixedExecutorBuilder(
final Settings settings,
final String name,
final String fallbackName,
final int size,
final int queueSize,
final String prefix,
final String fallbackPrefix,
final boolean deprecated) {
super(name);
final String sizeKey = settingsKey(prefix, "size");
final Setting.Property[] properties;
if (deprecated) {
properties = new Setting.Property[]{Setting.Property.NodeScope, Setting.Property.Deprecated};
final String queueSizeKey = settingsKey(prefix, "queue_size");
if (fallbackName == null) {
final Setting.Property[] properties;
if (deprecated) {
properties = new Setting.Property[]{Setting.Property.NodeScope, Setting.Property.Deprecated};
} else {
properties = new Setting.Property[]{Setting.Property.NodeScope};
}
assert fallbackPrefix == null;
this.sizeSetting = sizeSetting(settings, name, size, prefix, properties);
this.fallbackSizeSetting = null;
this.queueSizeSetting = queueSizeSetting(prefix, queueSize, properties);
this.fallbackQueueSizeSetting = null;
} else {
properties = new Setting.Property[]{Setting.Property.NodeScope};
assert fallbackPrefix != null;
assert deprecated == false;
final Setting.Property[] properties = { Setting.Property.NodeScope };
final Setting.Property[] fallbackProperties = { Setting.Property.NodeScope, Setting.Property.Deprecated };
final Setting<Integer> fallbackSizeSetting = sizeSetting(settings, fallbackName, size, fallbackPrefix, fallbackProperties);
this.sizeSetting =
new Setting<>(
new Setting.SimpleKey(sizeKey),
fallbackSizeSetting,
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
properties);
this.fallbackSizeSetting = fallbackSizeSetting;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could be set above instead of creating a local?

final Setting<Integer> fallbackQueueSizeSetting = queueSizeSetting(fallbackPrefix, queueSize, fallbackProperties);
this.queueSizeSetting =
new Setting<>(
new Setting.SimpleKey(queueSizeKey),
fallbackQueueSizeSetting,
s -> Setting.parseInt(s, Integer.MIN_VALUE, queueSizeKey),
properties);
this.fallbackQueueSizeSetting = fallbackQueueSizeSetting;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

}
this.sizeSetting =
new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
properties);
final String queueSizeKey = settingsKey(prefix, "queue_size");
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, properties);
}

private Setting<Integer> sizeSetting(
final Settings settings, final String name, final int size, final String prefix, final Setting.Property[] properties) {
final String sizeKey = settingsKey(prefix, "size");
return new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
properties);
}

private Setting<Integer> queueSizeSetting(final String prefix, final int queueSize, final Setting.Property[] properties) {
return Setting.intSetting(settingsKey(prefix, "queue_size"), queueSize, properties);
}

@Override
public List<Setting<?>> getRegisteredSettings() {
return Arrays.asList(sizeSetting, queueSizeSetting);
if (fallbackSizeSetting == null && fallbackQueueSizeSetting == null) {
return Arrays.asList(sizeSetting, queueSizeSetting);
} else {
assert fallbackSizeSetting != null && fallbackQueueSizeSetting != null;
return Arrays.asList(sizeSetting, fallbackSizeSetting, queueSizeSetting, fallbackQueueSizeSetting);
}
}

@Override
Expand All @@ -132,8 +190,14 @@ ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final Thre
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final ExecutorService executor =
EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);
final String name;
if ("write".equals(name()) && Booleans.parseBoolean(System.getProperty("es.thread_pool.write.use_bulk_as_display_name", "false"))) {
name = "bulk";
} else {
name = name();
}
final ThreadPool.Info info =
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
new ThreadPool.Info(name, ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
return new ThreadPool.ExecutorHolder(executor, info);
}

Expand Down
Loading