Skip to content

Commit

Permalink
rename configs and reformat codes
Browse files Browse the repository at this point in the history
  • Loading branch information
MeihanLi committed Apr 18, 2023
1 parent a2496ed commit 0175c27
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ protected void configure() {
bind(accessFactory).to(AccessControlFactory.class);
}
});
boolean enableBoundedThreadPool = brokerConf
.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_BOUNDED_THREAD_POOL,
CommonConstants.Broker.DEFAULT_ENABLE_BOUNDED_THREAD_POOL);
if (enableBoundedThreadPool) {
boolean enableBoundedJerseyThreadPoolExecutor = brokerConf
.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR,
CommonConstants.Broker.DEFAULT_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR);
if (enableBoundedJerseyThreadPoolExecutor) {
register(buildBrokerManagedAsyncExecutorProvider(brokerConf, brokerMetrics));
}
register(JacksonFeature.class);
Expand Down Expand Up @@ -159,14 +159,16 @@ private void setupSwagger() {
_httpServer.getServerConfiguration().addHttpHandler(swaggerDist, "/swaggerui-dist/");
}

private BrokerManagedAsyncExecutorProvider buildBrokerManagedAsyncExecutorProvider(
PinotConfiguration brokerConf, BrokerMetrics brokerMetrics) {
int corePoolSize = brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_CORE_POOL_SIZE,
CommonConstants.Broker.DEFAULT_CORE_POOL_SIZE);
int maximumPoolSize = brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_POOL_SIZE,
CommonConstants.Broker.DEFAULT_MAX_POOL_SIZE);
int queueSize = brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_QUEUE_SIZE,
CommonConstants.Broker.DEFAULT_QUEUE_SIZE);
private BrokerManagedAsyncExecutorProvider buildBrokerManagedAsyncExecutorProvider(PinotConfiguration brokerConf,
BrokerMetrics brokerMetrics) {
int corePoolSize = brokerConf
.getProperty(CommonConstants.Broker.CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE,
CommonConstants.Broker.DEFAULT_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE);
int maximumPoolSize = brokerConf
.getProperty(CommonConstants.Broker.CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE,
CommonConstants.Broker.DEFAULT_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE);
int queueSize = brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE,
CommonConstants.Broker.DEFAULT_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE);
return new BrokerManagedAsyncExecutorProvider(corePoolSize, maximumPoolSize, queueSize, brokerMetrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.broker.broker;


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
Expand All @@ -32,78 +31,74 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* BrokerManagedAsyncExecutorProvider provides a bounded thread pool.
*/
@ManagedAsyncExecutor
public class BrokerManagedAsyncExecutorProvider extends ThreadPoolExecutorProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerManagedAsyncExecutorProvider.class);
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerManagedAsyncExecutorProvider.class);

private static final String NAME = "broker-managed-async-executor";
private static final String NAME = "broker-managed-async-executor";

private final BrokerMetrics _brokerMetrics;
private final BrokerMetrics _brokerMetrics;

private final int _maximumPoolSize;
private final int _corePoolSize;
private final int _queueSize;
private final int _maximumPoolSize;
private final int _corePoolSize;
private final int _queueSize;

public BrokerManagedAsyncExecutorProvider(
int corePoolSize,
int maximumPoolSize,
int queueSize,
BrokerMetrics brokerMetrics) {
super(NAME);
_corePoolSize = corePoolSize;
_maximumPoolSize = maximumPoolSize;
_queueSize = queueSize;
_brokerMetrics = brokerMetrics;
}
public BrokerManagedAsyncExecutorProvider(int corePoolSize, int maximumPoolSize, int queueSize,
BrokerMetrics brokerMetrics) {
super(NAME);
_corePoolSize = corePoolSize;
_maximumPoolSize = maximumPoolSize;
_queueSize = queueSize;
_brokerMetrics = brokerMetrics;
}

@Override
protected int getMaximumPoolSize() {
return _maximumPoolSize;
}
@Override
protected int getMaximumPoolSize() {
return _maximumPoolSize;
}

@Override
protected int getCorePoolSize() {
return _corePoolSize;
}
@Override
protected int getCorePoolSize() {
return _corePoolSize;
}

@Override
protected BlockingQueue<Runnable> getWorkQueue() {
return new ArrayBlockingQueue(_queueSize);
}
@Override
protected BlockingQueue<Runnable> getWorkQueue() {
return new ArrayBlockingQueue(_queueSize);
}

@Override
protected RejectedExecutionHandler getRejectedExecutionHandler() {
return new BrokerThreadPoolRejectExecutionHandler(_brokerMetrics);
}
@Override
protected RejectedExecutionHandler getRejectedExecutionHandler() {
return new BrokerThreadPoolRejectExecutionHandler(_brokerMetrics);
}

static class BrokerThreadPoolRejectExecutionHandler implements RejectedExecutionHandler {
private final BrokerMetrics _brokerMetrics;
static class BrokerThreadPoolRejectExecutionHandler implements RejectedExecutionHandler {
private final BrokerMetrics _brokerMetrics;

public BrokerThreadPoolRejectExecutionHandler(BrokerMetrics brokerMetrics) {
_brokerMetrics = brokerMetrics;
}
public BrokerThreadPoolRejectExecutionHandler(BrokerMetrics brokerMetrics) {
_brokerMetrics = brokerMetrics;
}

/**
* Reject the runnable if it can’t be accommodated by the thread pool.
*
* <p> Response returned will have SERVICE_UNAVAILABLE(503) error code with error msg.
*
* @param r Runnable
* @param executor ThreadPoolExecutor
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_REJECTION_EXCEPTIONS, 1L);
LOGGER.error("Task " + r + " rejected from " + executor);
/**
* Reject the runnable if it can’t be accommodated by the thread pool.
*
* <p> Response returned will have SERVICE_UNAVAILABLE(503) error code with error msg.
*
* @param r Runnable
* @param executor ThreadPoolExecutor
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_REJECTED_EXCEPTIONS, 1L);
LOGGER.error("Task " + r + " rejected from " + executor);

throw new ServiceUnavailableException(Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("Pinot Broker thread pool can not accommodate more requests now. "
+ "Request is rejected from "
+ executor)
.build());
}
throw new ServiceUnavailableException(Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(
"Pinot Broker thread pool can not accommodate more requests now. " + "Request is rejected from " + executor)
.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,92 +41,94 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

public class BrokerManagedAsyncExecutorProviderTest {

public static BrokerMetrics _brokerMetrics;

@BeforeClass
public void setUp() {
_brokerMetrics = new BrokerMetrics(
CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX,
PinotMetricUtils.getPinotMetricsRegistry(new PinotConfiguration()),
CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS,
Collections.emptyList());
}

@Test
public void testExecutorService() throws InterruptedException, ExecutionException {
// create a new instance of the executor provider
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(2, 2, 2, _brokerMetrics);

// get the executor service
ThreadPoolExecutor executor = (ThreadPoolExecutor) provider.getExecutorService();

// submit a task to the executor service and wait for it to complete
Future<Integer> futureResult = executor.submit(() -> 1 + 1);
Integer result = futureResult.get();

// verify that the task was executed and returned the expected result
assertNotNull(result);
assertEquals((int) result, 2);

// wait for the executor service to shutdown
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
public class BrokerManagedAsyncExecutorProviderTest {

@Test
public void testGet() throws InterruptedException {
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics);
ExecutorService executorService = provider.getExecutorService();

// verify that the executor has the expected properties
assertNotNull(executorService);
assertTrue(executorService instanceof ThreadPoolExecutor);

ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;

assertEquals(1, threadPoolExecutor.getCorePoolSize());
assertEquals(1, threadPoolExecutor.getMaximumPoolSize());

BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
assertNotNull(blockingQueue);
assertTrue(blockingQueue instanceof ArrayBlockingQueue);
assertEquals(0, blockingQueue.size());
assertEquals(1, blockingQueue.remainingCapacity());

RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
assertNotNull(rejectedExecutionHandler);
assertTrue(rejectedExecutionHandler
instanceof BrokerManagedAsyncExecutorProvider.BrokerThreadPoolRejectExecutionHandler);

// test that the executor actually executes tasks
AtomicInteger counter = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 1; i++) {
threadPoolExecutor.execute(() -> {
counter.incrementAndGet();
latch.countDown();
});
}
latch.await();
assertEquals(counter.get(), 1);
public static BrokerMetrics _brokerMetrics;

@BeforeClass
public void setUp() {
_brokerMetrics = new BrokerMetrics(CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX,
PinotMetricUtils.getPinotMetricsRegistry(new PinotConfiguration()),
CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS, Collections.emptyList());
}

@Test
public void testExecutorService()
throws InterruptedException, ExecutionException {
// create a new instance of the executor provider
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(2, 2, 2, _brokerMetrics);

// get the executor service
ThreadPoolExecutor executor = (ThreadPoolExecutor) provider.getExecutorService();

// submit a task to the executor service and wait for it to complete
Future<Integer> futureResult = executor.submit(() -> 1 + 1);
Integer result = futureResult.get();

// verify that the task was executed and returned the expected result
assertNotNull(result);
assertEquals((int) result, 2);

// wait for the executor service to shutdown
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
}

@Test
public void testGet()
throws InterruptedException {
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics);
ExecutorService executorService = provider.getExecutorService();

// verify that the executor has the expected properties
assertNotNull(executorService);
assertTrue(executorService instanceof ThreadPoolExecutor);

ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;

assertEquals(1, threadPoolExecutor.getCorePoolSize());
assertEquals(1, threadPoolExecutor.getMaximumPoolSize());

BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
assertNotNull(blockingQueue);
assertTrue(blockingQueue instanceof ArrayBlockingQueue);
assertEquals(0, blockingQueue.size());
assertEquals(1, blockingQueue.remainingCapacity());

RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
assertNotNull(rejectedExecutionHandler);
assertTrue(
rejectedExecutionHandler instanceof BrokerManagedAsyncExecutorProvider.BrokerThreadPoolRejectExecutionHandler);

// test that the executor actually executes tasks
AtomicInteger counter = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 1; i++) {
threadPoolExecutor.execute(() -> {
counter.incrementAndGet();
latch.countDown();
});
}

@Test(expectedExceptions = ServiceUnavailableException.class)
public void testRejectHandler() throws InterruptedException {
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) provider.getExecutorService();

// test the rejection policy
AtomicInteger counter = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(() -> {
counter.incrementAndGet();
latch.countDown();
});
}
latch.await();
latch.await();
assertEquals(counter.get(), 1);
}

@Test(expectedExceptions = ServiceUnavailableException.class)
public void testRejectHandler()
throws InterruptedException {
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) provider.getExecutorService();

// test the rejection policy
AtomicInteger counter = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(() -> {
counter.incrementAndGet();
latch.countDown();
});
}
latch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
QUERIES("queries", false),

// These metrics track the exceptions caught during query execution in broker side.
// Query rejected by Jersey thread pool
QUERY_REJECTION_EXCEPTIONS("exceptions", true),
// Query rejected by Jersey thread pool executor
QUERY_REJECTED_EXCEPTIONS("exceptions", true),
// Query compile phase.
REQUEST_COMPILATION_EXCEPTIONS("exceptions", true),
// Get resource phase.
Expand Down
Loading

0 comments on commit 0175c27

Please sign in to comment.