getWorkQueue() {
+ if (_queueSize == Integer.MAX_VALUE) {
+ return new LinkedBlockingQueue();
+ }
+ return new ArrayBlockingQueue(_queueSize);
+ }
+
+ @Override
+ protected RejectedExecutionHandler getRejectedExecutionHandler() {
+ return new BrokerThreadPoolRejectExecutionHandler(_brokerMetrics);
+ }
+
+ static class BrokerThreadPoolRejectExecutionHandler implements RejectedExecutionHandler {
+ private final BrokerMetrics _brokerMetrics;
+
+ public BrokerThreadPoolRejectExecutionHandler(BrokerMetrics brokerMetrics) {
+ _brokerMetrics = brokerMetrics;
+ }
+
+ /**
+ * Reject the runnable if it can’t be accommodated by the thread pool.
+ *
+ * Response returned will have SERVICE_UNAVAILABLE(503) error code with error msg.
+ *
+ * @param r Runnable
+ * @param executor ThreadPoolExecutor
+ */
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_REJECTED_EXCEPTIONS, 1L);
+ LOGGER.error("Task " + r + " rejected from " + executor);
+
+ throw new ServiceUnavailableException(Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(
+ "Pinot Broker thread pool can not accommodate more requests now. " + "Request is rejected from " + executor)
+ .build());
+ }
+ }
+}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerManagedAsyncExecutorProviderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerManagedAsyncExecutorProviderTest.java
new file mode 100644
index 000000000000..25c1d099ca75
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerManagedAsyncExecutorProviderTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker;
+
+import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.ws.rs.ServiceUnavailableException;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class BrokerManagedAsyncExecutorProviderTest {
+
+ public static BrokerMetrics _brokerMetrics;
+
+ @BeforeClass
+ public void setUp() {
+ _brokerMetrics = new BrokerMetrics(CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX,
+ PinotMetricUtils.getPinotMetricsRegistry(new PinotConfiguration()),
+ CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS, Collections.emptyList());
+ }
+
+ @Test
+ public void testExecutorService()
+ throws InterruptedException, ExecutionException {
+ // create a new instance of the executor provider
+ BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(2, 2, 2, _brokerMetrics);
+
+ // get the executor service
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) provider.getExecutorService();
+
+ // submit a task to the executor service and wait for it to complete
+ Future futureResult = executor.submit(() -> 1 + 1);
+ Integer result = futureResult.get();
+
+ // verify that the task was executed and returned the expected result
+ assertNotNull(result);
+ assertEquals((int) result, 2);
+
+ // wait for the executor service to shutdown
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testGet()
+ throws InterruptedException {
+ BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics);
+ ExecutorService executorService = provider.getExecutorService();
+
+ // verify that the executor has the expected properties
+ assertNotNull(executorService);
+ assertTrue(executorService instanceof ThreadPoolExecutor);
+
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
+
+ assertEquals(1, threadPoolExecutor.getCorePoolSize());
+ assertEquals(1, threadPoolExecutor.getMaximumPoolSize());
+
+ BlockingQueue blockingQueue = threadPoolExecutor.getQueue();
+ assertNotNull(blockingQueue);
+ assertTrue(blockingQueue instanceof ArrayBlockingQueue);
+ assertEquals(0, blockingQueue.size());
+ assertEquals(1, blockingQueue.remainingCapacity());
+
+ RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
+ assertNotNull(rejectedExecutionHandler);
+ assertTrue(
+ rejectedExecutionHandler instanceof BrokerManagedAsyncExecutorProvider.BrokerThreadPoolRejectExecutionHandler);
+
+ // test that the executor actually executes tasks
+ AtomicInteger counter = new AtomicInteger();
+ CountDownLatch latch = new CountDownLatch(1);
+ for (int i = 0; i < 1; i++) {
+ threadPoolExecutor.execute(() -> {
+ counter.incrementAndGet();
+ latch.countDown();
+ });
+ }
+ latch.await();
+ assertEquals(counter.get(), 1);
+ }
+
+ @Test(expectedExceptions = ServiceUnavailableException.class)
+ public void testRejectHandler()
+ throws InterruptedException {
+ BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics);
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) provider.getExecutorService();
+
+ // test the rejection policy
+ AtomicInteger counter = new AtomicInteger();
+ CountDownLatch latch = new CountDownLatch(10);
+ for (int i = 0; i < 10; i++) {
+ threadPoolExecutor.execute(() -> {
+ counter.incrementAndGet();
+ latch.countDown();
+ });
+ }
+ latch.await();
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index de0cd613bc0c..606eeb05a69f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -32,6 +32,8 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
QUERIES("queries", false),
// These metrics track the exceptions caught during query execution in broker side.
+ // Query rejected by Jersey thread pool executor
+ QUERY_REJECTED_EXCEPTIONS("exceptions", true),
// Query compile phase.
REQUEST_COMPILATION_EXCEPTIONS("exceptions", true),
// Get resource phase.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 0cb3a5467475..0be6c572676e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -243,6 +243,25 @@ public static class Broker {
Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2));
// Same logic as CombineOperatorUtils
+ // Config for Jersey ThreadPoolExecutorProvider.
+ // By default, Jersey uses the default unbounded thread pool to process queries.
+ // By enabling it, BrokerManagedAsyncExecutorProvider will be used to create a bounded thread pool.
+ public static final String CONFIG_OF_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR =
+ "pinot.broker.enable.bounded.jersey.threadpool.executor";
+ public static final boolean DEFAULT_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR = false;
+ // Default capacities for the bounded thread pool
+ public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE =
+ "pinot.broker.jersey.threadpool.executor.max.pool.size";
+ public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE =
+ Runtime.getRuntime().availableProcessors() * 2;
+ public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE =
+ "pinot.broker.jersey.threadpool.executor.core.pool.size";
+ public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE =
+ Runtime.getRuntime().availableProcessors() * 2;
+ public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE =
+ "pinot.broker.jersey.threadpool.executor.queue.size";
+ public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE = Integer.MAX_VALUE;
+
// used for SQL GROUP BY during broker reduce
public static final String CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD = "pinot.broker.groupby.trim.threshold";
public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000;