Skip to content

Commit

Permalink
[New Feature] Add new configuration options which allows broker to us…
Browse files Browse the repository at this point in the history
…e a bounded Jersey ThreadPool (#10614)
  • Loading branch information
MeihanLi authored Apr 18, 2023
1 parent 8903920 commit f2afe21
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pinot-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-common</artifactId>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-jaxrs</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ protected void configure() {
bind(accessFactory).to(AccessControlFactory.class);
}
});
boolean enableBoundedJerseyThreadPoolExecutor = brokerConf
.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR,
CommonConstants.Broker.DEFAULT_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR);
if (enableBoundedJerseyThreadPoolExecutor) {
register(buildBrokerManagedAsyncExecutorProvider(brokerConf, brokerMetrics));
}
register(JacksonFeature.class);
registerClasses(io.swagger.jaxrs.listing.ApiListingResource.class);
registerClasses(io.swagger.jaxrs.listing.SwaggerSerializers.class);
Expand Down Expand Up @@ -153,6 +159,19 @@ private void setupSwagger() {
_httpServer.getServerConfiguration().addHttpHandler(swaggerDist, "/swaggerui-dist/");
}

private BrokerManagedAsyncExecutorProvider buildBrokerManagedAsyncExecutorProvider(PinotConfiguration brokerConf,
BrokerMetrics brokerMetrics) {
int corePoolSize = brokerConf
.getProperty(CommonConstants.Broker.CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE,
CommonConstants.Broker.DEFAULT_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE);
int maximumPoolSize = brokerConf
.getProperty(CommonConstants.Broker.CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE,
CommonConstants.Broker.DEFAULT_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE);
int queueSize = brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE,
CommonConstants.Broker.DEFAULT_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE);
return new BrokerManagedAsyncExecutorProvider(corePoolSize, maximumPoolSize, queueSize, brokerMetrics);
}

public void stop() {
if (_httpServer != null) {
LOGGER.info("Shutting down http server");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.broker;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import javax.ws.rs.ServiceUnavailableException;
import javax.ws.rs.core.Response;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.glassfish.jersey.server.ManagedAsyncExecutor;
import org.glassfish.jersey.spi.ThreadPoolExecutorProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* BrokerManagedAsyncExecutorProvider provides a bounded thread pool.
*/
@ManagedAsyncExecutor
public class BrokerManagedAsyncExecutorProvider extends ThreadPoolExecutorProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerManagedAsyncExecutorProvider.class);

private static final String NAME = "broker-managed-async-executor";

private final BrokerMetrics _brokerMetrics;

private final int _maximumPoolSize;
private final int _corePoolSize;
private final int _queueSize;

public BrokerManagedAsyncExecutorProvider(int corePoolSize, int maximumPoolSize, int queueSize,
BrokerMetrics brokerMetrics) {
super(NAME);
_corePoolSize = corePoolSize;
_maximumPoolSize = maximumPoolSize;
_queueSize = queueSize;
_brokerMetrics = brokerMetrics;
}

@Override
protected int getMaximumPoolSize() {
return _maximumPoolSize;
}

@Override
protected int getCorePoolSize() {
return _corePoolSize;
}

@Override
protected BlockingQueue<Runnable> getWorkQueue() {
if (_queueSize == Integer.MAX_VALUE) {
return new LinkedBlockingQueue();
}
return new ArrayBlockingQueue(_queueSize);
}

@Override
protected RejectedExecutionHandler getRejectedExecutionHandler() {
return new BrokerThreadPoolRejectExecutionHandler(_brokerMetrics);
}

static class BrokerThreadPoolRejectExecutionHandler implements RejectedExecutionHandler {
private final BrokerMetrics _brokerMetrics;

public BrokerThreadPoolRejectExecutionHandler(BrokerMetrics brokerMetrics) {
_brokerMetrics = brokerMetrics;
}

/**
* Reject the runnable if it can’t be accommodated by the thread pool.
*
* <p> Response returned will have SERVICE_UNAVAILABLE(503) error code with error msg.
*
* @param r Runnable
* @param executor ThreadPoolExecutor
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_REJECTED_EXCEPTIONS, 1L);
LOGGER.error("Task " + r + " rejected from " + executor);

throw new ServiceUnavailableException(Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(
"Pinot Broker thread pool can not accommodate more requests now. " + "Request is rejected from " + executor)
.build());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.broker;

import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.ServiceUnavailableException;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;


public class BrokerManagedAsyncExecutorProviderTest {

public static BrokerMetrics _brokerMetrics;

@BeforeClass
public void setUp() {
_brokerMetrics = new BrokerMetrics(CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX,
PinotMetricUtils.getPinotMetricsRegistry(new PinotConfiguration()),
CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS, Collections.emptyList());
}

@Test
public void testExecutorService()
throws InterruptedException, ExecutionException {
// create a new instance of the executor provider
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(2, 2, 2, _brokerMetrics);

// get the executor service
ThreadPoolExecutor executor = (ThreadPoolExecutor) provider.getExecutorService();

// submit a task to the executor service and wait for it to complete
Future<Integer> futureResult = executor.submit(() -> 1 + 1);
Integer result = futureResult.get();

// verify that the task was executed and returned the expected result
assertNotNull(result);
assertEquals((int) result, 2);

// wait for the executor service to shutdown
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
}

@Test
public void testGet()
throws InterruptedException {
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics);
ExecutorService executorService = provider.getExecutorService();

// verify that the executor has the expected properties
assertNotNull(executorService);
assertTrue(executorService instanceof ThreadPoolExecutor);

ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;

assertEquals(1, threadPoolExecutor.getCorePoolSize());
assertEquals(1, threadPoolExecutor.getMaximumPoolSize());

BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
assertNotNull(blockingQueue);
assertTrue(blockingQueue instanceof ArrayBlockingQueue);
assertEquals(0, blockingQueue.size());
assertEquals(1, blockingQueue.remainingCapacity());

RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
assertNotNull(rejectedExecutionHandler);
assertTrue(
rejectedExecutionHandler instanceof BrokerManagedAsyncExecutorProvider.BrokerThreadPoolRejectExecutionHandler);

// test that the executor actually executes tasks
AtomicInteger counter = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 1; i++) {
threadPoolExecutor.execute(() -> {
counter.incrementAndGet();
latch.countDown();
});
}
latch.await();
assertEquals(counter.get(), 1);
}

@Test(expectedExceptions = ServiceUnavailableException.class)
public void testRejectHandler()
throws InterruptedException {
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) provider.getExecutorService();

// test the rejection policy
AtomicInteger counter = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(() -> {
counter.incrementAndGet();
latch.countDown();
});
}
latch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
QUERIES("queries", false),

// These metrics track the exceptions caught during query execution in broker side.
// Query rejected by Jersey thread pool executor
QUERY_REJECTED_EXCEPTIONS("exceptions", true),
// Query compile phase.
REQUEST_COMPILATION_EXCEPTIONS("exceptions", true),
// Get resource phase.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,25 @@ public static class Broker {
Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2));
// Same logic as CombineOperatorUtils

// Config for Jersey ThreadPoolExecutorProvider.
// By default, Jersey uses the default unbounded thread pool to process queries.
// By enabling it, BrokerManagedAsyncExecutorProvider will be used to create a bounded thread pool.
public static final String CONFIG_OF_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR =
"pinot.broker.enable.bounded.jersey.threadpool.executor";
public static final boolean DEFAULT_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR = false;
// Default capacities for the bounded thread pool
public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE =
"pinot.broker.jersey.threadpool.executor.max.pool.size";
public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE =
Runtime.getRuntime().availableProcessors() * 2;
public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE =
"pinot.broker.jersey.threadpool.executor.core.pool.size";
public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE =
Runtime.getRuntime().availableProcessors() * 2;
public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE =
"pinot.broker.jersey.threadpool.executor.queue.size";
public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE = Integer.MAX_VALUE;

// used for SQL GROUP BY during broker reduce
public static final String CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD = "pinot.broker.groupby.trim.threshold";
public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000;
Expand Down

0 comments on commit f2afe21

Please sign in to comment.