Skip to content

Commit

Permalink
Thread pool metrics (elastic#104500)
Browse files Browse the repository at this point in the history
This implements metrics for the threadpools.

The aim is to emit metrics for the various threadpools, the metric callback should be created when the threadpool is created, and removed before the threadpool is shutdown.

The PR also includes a test for the new metrics, and some additions to the metrics test plugin.

Finally the metric name check has been modified to allow some of the non compliant threadpools (too long, includes - )
Co-authored-by: Przemyslaw Gomulka <[email protected]>
  • Loading branch information
gareth-ellis authored Jan 18, 2024
1 parent c011fe5 commit 764269b
Show file tree
Hide file tree
Showing 33 changed files with 329 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.threadpool.ThreadPool;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -166,7 +167,7 @@ public void setUp() throws Exception {
.build();

Settings settings = Settings.builder().put("node.name", ShardsAvailabilityHealthIndicatorBenchmark.class.getSimpleName()).build();
ThreadPool threadPool = new ThreadPool(settings);
ThreadPool threadPool = new ThreadPool(settings, MeterRegistry.NOOP);

ClusterService clusterService = new ClusterService(
Settings.EMPTY,
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/102371.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102371
summary: Adding threadpool metrics
area: Infra/Core
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/104500.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 104500
summary: Thread pool metrics
area: Infra/Core
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ public class MetricNameValidator {
static final int MAX_ELEMENT_LENGTH = 30;
static final int MAX_NUMBER_OF_ELEMENTS = 10;

static final Set<String> SKIP_VALIDATION_METRIC_NAMES_DUE_TO_BWC = Set.of(
"searchable_snapshots_cache_fetch_async",
"searchable_snapshots_cache_prewarming",
"security-token-key",
"security-crypto"
);

private MetricNameValidator() {}

/**
Expand All @@ -42,6 +49,10 @@ private MetricNameValidator() {}
*/
public static String validate(String metricName) {
Objects.requireNonNull(metricName);

if (skipValidationToBWC(metricName)) {
return metricName;
}
validateMaxMetricNameLength(metricName);

String[] elements = metricName.split("\\.");
Expand All @@ -53,6 +64,19 @@ public static String validate(String metricName) {
return metricName;
}

/**
* Due to backwards compatibility some metric names would have to skip validation.
* This is for instance where a threadpool name is too long, or contains `-`
* We want to allow to easily find threadpools in code base that are alerting with a metric
* as well as find thread pools metrics in dashboards with their codebase names.
* Renaming a threadpool name would be a breaking change.
*
* NOTE: only allow skipping validation if a refactor in codebase would cause a breaking change
*/
private static boolean skipValidationToBWC(String metricName) {
return SKIP_VALIDATION_METRIC_NAMES_DUE_TO_BWC.stream().anyMatch(m -> metricName.contains(m));
}

private static void validateMaxMetricNameLength(String metricName) {
if (metricName.length() > MAX_METRIC_NAME_LENGTH) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -108,6 +132,7 @@ private static void hasESPrefix(String[] elements, String name) {

private static void perElementValidations(String[] elements, String name) {
for (String element : elements) {

hasOnlyAllowedCharacters(element, name);
hasNotBreachLengthLimit(element, name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ public void testLastElementAllowList() {
expectThrows(IllegalArgumentException.class, () -> MetricNameValidator.validate("es.somemodule.somemetric.some_other_suffix"));
}

public void testSkipValidationDueToBWC() {
for (String partOfMetricName : MetricNameValidator.SKIP_VALIDATION_METRIC_NAMES_DUE_TO_BWC) {
MetricNameValidator.validate("es.threadpool." + partOfMetricName + ".total");// fake metric name, but with the part that skips
// validation
}
}

public static String metricNameWithLength(int length) {
int prefixAndSuffix = "es.".length() + ".utilization".length();
assert length > prefixAndSuffix : "length too short";
Expand All @@ -99,4 +106,5 @@ public static String metricNameWithLength(int length) {
metricName.append("utilization");
return metricName.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class GeoIpDownloaderTests extends ESTestCase {
public void setup() {
httpClient = mock(HttpClient.class);
clusterService = mock(ClusterService.class);
threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build());
threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), MeterRegistry.NOOP);
when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(
Settings.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.mocksocket.MockSocket;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportSettings;
Expand Down Expand Up @@ -51,7 +52,7 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase {

@Before
public void startThreadPool() {
threadPool = new ThreadPool(settings);
threadPool = new ThreadPool(settings, MeterRegistry.NOOP);
NetworkService networkService = new NetworkService(Collections.emptyList());
PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY);
nettyTransport = new Netty4Transport(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
Expand All @@ -19,12 +24,18 @@
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.in;

@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class SimpleThreadPoolIT extends ESIntegTestCase {
Expand All @@ -33,6 +44,11 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder().build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(TestTelemetryPlugin.class);
}

public void testThreadNames() throws Exception {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
Set<String> preNodeStartThreadNames = new HashSet<>();
Expand Down Expand Up @@ -95,4 +111,66 @@ public void testThreadNames() throws Exception {
}
}

public void testThreadPoolMetrics() throws Exception {
internalCluster().startNode();

final String dataNodeName = internalCluster().getRandomNodeName();
final TestTelemetryPlugin plugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
.filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();

logger.info("do some indexing, flushing, optimize, and searches");
int numDocs = randomIntBetween(2, 100);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; ++i) {
builders[i] = prepareIndex("idx").setSource(
jsonBuilder().startObject()
.field("str_value", "s" + i)
.array("str_values", new String[] { "s" + (i * 2), "s" + (i * 2 + 1) })
.field("l_value", i)
.array("l_values", new int[] { i * 2, i * 2 + 1 })
.field("d_value", i)
.array("d_values", new double[] { i * 2, i * 2 + 1 })
.endObject()
);
}
indexRandom(true, builders);
int numSearches = randomIntBetween(2, 100);
for (int i = 0; i < numSearches; i++) {
assertNoFailures(prepareSearch("idx").setQuery(QueryBuilders.termQuery("str_value", "s" + i)));
assertNoFailures(prepareSearch("idx").setQuery(QueryBuilders.termQuery("l_value", i)));
}
final var tp = internalCluster().getInstance(ThreadPool.class, dataNodeName);
ThreadPoolStats tps = tp.stats();
plugin.collect();
ArrayList<String> registeredMetrics = plugin.getRegisteredMetrics(InstrumentType.LONG_GAUGE);
registeredMetrics.addAll(plugin.getRegisteredMetrics(InstrumentType.LONG_ASYNC_COUNTER));
tps.forEach(stats -> {
Map<String, Long> threadPoolMetrics = Map.of(
ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED,
stats.completed(),
ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE,
(long) stats.active(),
ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT,
(long) stats.threads(),
ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST,
(long) stats.largest(),
ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE,
(long) stats.queue()
);
threadPoolMetrics.forEach((suffix, value) -> {
String metricName = ThreadPool.THREAD_POOL_METRIC_PREFIX + stats.name() + suffix;
List<Measurement> measurements;
if (suffix.equals(ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED)) {
measurements = plugin.getLongAsyncCounterMeasurement(metricName);
} else {
measurements = plugin.getLongGaugeMeasurement(metricName);
}
assertThat(metricName, in(registeredMetrics));
assertThat(measurements.get(0).getLong(), greaterThanOrEqualTo(value));
});
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public abstract class EsRejectedExecutionHandler implements RejectedExecutionHandler {

private final CounterMetric rejected = new CounterMetric();
private LongCounter rejectionCounter = null;

/**
* The number of rejected executions.
Expand All @@ -26,6 +29,14 @@ public long rejected() {

protected void incrementRejections() {
rejected.inc();
if (rejectionCounter != null) {
rejectionCounter.increment();
}
}

public void registerCounter(MeterRegistry meterRegistry, String prefix, String name) {
rejectionCounter = meterRegistry.registerLongCounter(prefix + ".rejected.total", "number of rejected threads for " + name, "count");
rejectionCounter.incrementBy(rejected());
}

protected static EsRejectedExecutionException newRejectedException(
Expand Down
21 changes: 14 additions & 7 deletions server/src/main/java/org/elasticsearch/node/NodeConstruction.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -242,8 +243,8 @@ static NodeConstruction prepareConstruction(
NodeConstruction constructor = new NodeConstruction(closeables);

Settings settings = constructor.createEnvironment(initialEnvironment, serviceProvider);

ThreadPool threadPool = constructor.createThreadPool(settings);
TelemetryProvider telemetryProvider = constructor.createTelemetryProvider(settings);
ThreadPool threadPool = constructor.createThreadPool(settings, telemetryProvider.getMeterRegistry());
SettingsModule settingsModule = constructor.validateSettings(initialEnvironment.settings(), settings, threadPool);

SearchModule searchModule = constructor.createSearchModule(settingsModule.getSettings(), threadPool);
Expand All @@ -258,7 +259,8 @@ static NodeConstruction prepareConstruction(
scriptService,
constructor.createAnalysisRegistry(),
serviceProvider,
forbidPrivateIndexSettings
forbidPrivateIndexSettings,
telemetryProvider
);

return constructor;
Expand Down Expand Up @@ -449,9 +451,14 @@ private Settings createEnvironment(Environment initialEnvironment, NodeServicePr
return settings;
}

private ThreadPool createThreadPool(Settings settings) throws IOException {
private TelemetryProvider createTelemetryProvider(Settings settings) {
return getSinglePlugin(TelemetryPlugin.class).map(p -> p.getTelemetryProvider(settings)).orElse(TelemetryProvider.NOOP);
}

private ThreadPool createThreadPool(Settings settings, MeterRegistry meterRegistry) throws IOException {
ThreadPool threadPool = new ThreadPool(
settings,
meterRegistry,
pluginsService.flatMap(p -> p.getExecutorBuilders(settings)).toArray(ExecutorBuilder<?>[]::new)
);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
Expand Down Expand Up @@ -581,13 +588,12 @@ private void construct(
ScriptService scriptService,
AnalysisRegistry analysisRegistry,
NodeServiceProvider serviceProvider,
boolean forbidPrivateIndexSettings
boolean forbidPrivateIndexSettings,
TelemetryProvider telemetryProvider
) throws IOException {

Settings settings = settingsModule.getSettings();

TelemetryProvider telemetryProvider = getSinglePlugin(TelemetryPlugin.class).map(p -> p.getTelemetryProvider(settings))
.orElse(TelemetryProvider.NOOP);
modules.bindToInstance(Tracer.class, telemetryProvider.getTracer());

TaskManager taskManager = new TaskManager(
Expand All @@ -599,6 +605,7 @@ private void construct(
).collect(Collectors.toSet()),
telemetryProvider.getTracer()
);
final Tracer tracer = telemetryProvider.getTracer();

ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
clusterService.addStateApplier(scriptService);
Expand Down
Loading

0 comments on commit 764269b

Please sign in to comment.