From 5499b8b8b22a4403b857337383e6478ee50f7a78 Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Thu, 19 Oct 2023 09:47:01 -0700 Subject: [PATCH] Increase remote recovery thread pool size The remote recovery thread pool does blocking I/O when downloading files, so the "half processor count max 10" was definitely too small. This can be shown by triggering recoveries on a node that is also doing segment replication, and the replication lag will increase due to contention on that thread pool. Some amount of contention is inevitable, but the change here to increase the download thread pool, and also limit the concurrent usage of that thread pool by any single recovery/replication to 25% of the threads does help. Long term, we can improve this even further by moving to fully async I/O to avoid blocking threads in the application on draining InputStreams. Signed-off-by: Andrew Ross --- .../org/opensearch/indices/recovery/RecoverySettings.java | 7 ++++--- .../main/java/org/opensearch/threadpool/ThreadPool.java | 7 ++++++- .../org/opensearch/threadpool/ScalingThreadPoolTests.java | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 44dfb2f4cb00a..0f3025369833d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -41,6 +41,7 @@ import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; @@ -87,10 +88,10 @@ public class RecoverySettings { /** * Controls the maximum number of streams that can be started concurrently per recovery when downloading from the remote store. */ - public static final Setting INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING = Setting.intSetting( + public static final Setting INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING = new Setting<>( "indices.recovery.max_concurrent_remote_store_streams", - 10, - 1, + (s) -> Integer.toString(Math.max(1, OpenSearchExecutors.allocatedProcessors(s) / 2)), + (s) -> Setting.parseInt(s, 1, "indices.recovery.max_concurrent_remote_store_streams"), Property.Dynamic, Property.NodeScope ); diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index fab7620292dd2..5f10986239300 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -273,7 +273,12 @@ public ThreadPool( ); builders.put( Names.REMOTE_RECOVERY, - new ScalingExecutorBuilder(Names.REMOTE_RECOVERY, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) + new ScalingExecutorBuilder( + Names.REMOTE_RECOVERY, + 1, + twiceAllocatedProcessors(allocatedProcessors), + TimeValue.timeValueMinutes(5) + ) ); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { builders.put( diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index ba2d4b8c247bb..19271bbf30e80 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -154,7 +154,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n); sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive); sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessorsMaxTen); - sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::halfAllocatedProcessorsMaxTen); + sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::twiceAllocatedProcessors); return sizes.get(threadPoolName).apply(numberOfProcessors); }