Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[New Feature] Add new configuration options which allows broker to use a bounded Jersey ThreadPool #10614

Merged
merged 4 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pinot-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-common</artifactId>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-jaxrs</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ protected void configure() {
bind(accessFactory).to(AccessControlFactory.class);
}
});
boolean enableBoundedJerseyThreadPoolExecutor = brokerConf
.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR,
CommonConstants.Broker.DEFAULT_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR);
if (enableBoundedJerseyThreadPoolExecutor) {
register(buildBrokerManagedAsyncExecutorProvider(brokerConf, brokerMetrics));
}
register(JacksonFeature.class);
registerClasses(io.swagger.jaxrs.listing.ApiListingResource.class);
registerClasses(io.swagger.jaxrs.listing.SwaggerSerializers.class);
Expand Down Expand Up @@ -153,6 +159,19 @@ private void setupSwagger() {
_httpServer.getServerConfiguration().addHttpHandler(swaggerDist, "/swaggerui-dist/");
}

private BrokerManagedAsyncExecutorProvider buildBrokerManagedAsyncExecutorProvider(PinotConfiguration brokerConf,
BrokerMetrics brokerMetrics) {
int corePoolSize = brokerConf
.getProperty(CommonConstants.Broker.CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE,
CommonConstants.Broker.DEFAULT_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE);
int maximumPoolSize = brokerConf
.getProperty(CommonConstants.Broker.CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE,
CommonConstants.Broker.DEFAULT_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE);
int queueSize = brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE,
CommonConstants.Broker.DEFAULT_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE);
return new BrokerManagedAsyncExecutorProvider(corePoolSize, maximumPoolSize, queueSize, brokerMetrics);
}

public void stop() {
if (_httpServer != null) {
LOGGER.info("Shutting down http server");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.broker;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import javax.ws.rs.ServiceUnavailableException;
import javax.ws.rs.core.Response;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.glassfish.jersey.server.ManagedAsyncExecutor;
import org.glassfish.jersey.spi.ThreadPoolExecutorProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* BrokerManagedAsyncExecutorProvider provides a bounded thread pool.
*/
@ManagedAsyncExecutor
public class BrokerManagedAsyncExecutorProvider extends ThreadPoolExecutorProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerManagedAsyncExecutorProvider.class);

private static final String NAME = "broker-managed-async-executor";

private final BrokerMetrics _brokerMetrics;

private final int _maximumPoolSize;
private final int _corePoolSize;
private final int _queueSize;

public BrokerManagedAsyncExecutorProvider(int corePoolSize, int maximumPoolSize, int queueSize,
BrokerMetrics brokerMetrics) {
super(NAME);
_corePoolSize = corePoolSize;
_maximumPoolSize = maximumPoolSize;
_queueSize = queueSize;
_brokerMetrics = brokerMetrics;
}

@Override
protected int getMaximumPoolSize() {
return _maximumPoolSize;
}

@Override
protected int getCorePoolSize() {
return _corePoolSize;
}

@Override
protected BlockingQueue<Runnable> getWorkQueue() {
if (_queueSize == Integer.MAX_VALUE) {
return new LinkedBlockingQueue();
}
return new ArrayBlockingQueue(_queueSize);
}

@Override
protected RejectedExecutionHandler getRejectedExecutionHandler() {
return new BrokerThreadPoolRejectExecutionHandler(_brokerMetrics);
}

static class BrokerThreadPoolRejectExecutionHandler implements RejectedExecutionHandler {
private final BrokerMetrics _brokerMetrics;

public BrokerThreadPoolRejectExecutionHandler(BrokerMetrics brokerMetrics) {
_brokerMetrics = brokerMetrics;
}

/**
* Reject the runnable if it can’t be accommodated by the thread pool.
*
* <p> Response returned will have SERVICE_UNAVAILABLE(503) error code with error msg.
*
* @param r Runnable
* @param executor ThreadPoolExecutor
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_REJECTED_EXCEPTIONS, 1L);
LOGGER.error("Task " + r + " rejected from " + executor);

throw new ServiceUnavailableException(Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(
"Pinot Broker thread pool can not accommodate more requests now. " + "Request is rejected from " + executor)
.build());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.broker;

import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.ServiceUnavailableException;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;


public class BrokerManagedAsyncExecutorProviderTest {

public static BrokerMetrics _brokerMetrics;

@BeforeClass
public void setUp() {
_brokerMetrics = new BrokerMetrics(CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX,
PinotMetricUtils.getPinotMetricsRegistry(new PinotConfiguration()),
CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS, Collections.emptyList());
}

@Test
public void testExecutorService()
throws InterruptedException, ExecutionException {
// create a new instance of the executor provider
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(2, 2, 2, _brokerMetrics);

// get the executor service
ThreadPoolExecutor executor = (ThreadPoolExecutor) provider.getExecutorService();

// submit a task to the executor service and wait for it to complete
Future<Integer> futureResult = executor.submit(() -> 1 + 1);
Integer result = futureResult.get();

// verify that the task was executed and returned the expected result
assertNotNull(result);
assertEquals((int) result, 2);

// wait for the executor service to shutdown
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
}

@Test
public void testGet()
throws InterruptedException {
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics);
ExecutorService executorService = provider.getExecutorService();

// verify that the executor has the expected properties
assertNotNull(executorService);
assertTrue(executorService instanceof ThreadPoolExecutor);

ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;

assertEquals(1, threadPoolExecutor.getCorePoolSize());
assertEquals(1, threadPoolExecutor.getMaximumPoolSize());

BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
assertNotNull(blockingQueue);
assertTrue(blockingQueue instanceof ArrayBlockingQueue);
assertEquals(0, blockingQueue.size());
assertEquals(1, blockingQueue.remainingCapacity());

RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
assertNotNull(rejectedExecutionHandler);
assertTrue(
rejectedExecutionHandler instanceof BrokerManagedAsyncExecutorProvider.BrokerThreadPoolRejectExecutionHandler);

// test that the executor actually executes tasks
AtomicInteger counter = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 1; i++) {
threadPoolExecutor.execute(() -> {
counter.incrementAndGet();
latch.countDown();
});
}
latch.await();
assertEquals(counter.get(), 1);
}

@Test(expectedExceptions = ServiceUnavailableException.class)
public void testRejectHandler()
throws InterruptedException {
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) provider.getExecutorService();

// test the rejection policy
AtomicInteger counter = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(() -> {
counter.incrementAndGet();
latch.countDown();
});
}
latch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
QUERIES("queries", false),

// These metrics track the exceptions caught during query execution in broker side.
// Query rejected by Jersey thread pool executor
QUERY_REJECTED_EXCEPTIONS("exceptions", true),
// Query compile phase.
REQUEST_COMPILATION_EXCEPTIONS("exceptions", true),
// Get resource phase.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,25 @@ public static class Broker {
Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2));
// Same logic as CombineOperatorUtils

// Config for Jersey ThreadPoolExecutorProvider.
// By default, Jersey uses the default unbounded thread pool to process queries.
// By enabling it, BrokerManagedAsyncExecutorProvider will be used to create a bounded thread pool.
public static final String CONFIG_OF_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR =
"pinot.broker.enable.bounded.jersey.threadpool.executor";
public static final boolean DEFAULT_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR = false;
// Default capacities for the bounded thread pool
public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE =
"pinot.broker.jersey.threadpool.executor.max.pool.size";
public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE =
Runtime.getRuntime().availableProcessors() * 2;
public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE =
"pinot.broker.jersey.threadpool.executor.core.pool.size";
public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE =
Runtime.getRuntime().availableProcessors() * 2;
public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE =
"pinot.broker.jersey.threadpool.executor.queue.size";
public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE = Integer.MAX_VALUE;

// used for SQL GROUP BY during broker reduce
public static final String CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD = "pinot.broker.groupby.trim.threshold";
public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000;
Expand Down