Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8348] Allow custom fast-failure queues to be added in BrokerFastFailure #8347

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2519,4 +2519,6 @@ public ColdDataCgCtrService getColdDataCgCtrService() {
public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) {
this.coldDataCgCtrService = coldDataCgCtrService;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blank line


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*/
package org.apache.rocketmq.broker.latency;

import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
Expand All @@ -42,13 +46,26 @@ public class BrokerFastFailure {

private volatile long jstackTime = System.currentTimeMillis();

private final List<Pair<BlockingQueue<Runnable>, Supplier<Long>>> cleanExpiredRequestQueueList = new ArrayList<>();

public BrokerFastFailure(final BrokerController brokerController) {
this.brokerController = brokerController;
initCleanExpiredRequestQueueList();
this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true,
brokerController == null ? null : brokerController.getBrokerConfig()));
}

private void initCleanExpiredRequestQueueList() {
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getSendThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()));
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getPullThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue()));
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getLitePullThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInLitePullQueue()));
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getHeartbeatThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue()));
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getEndTransactionThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue()));
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getAckThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInAckQueue()));
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getAdminBrokerThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInAdminBrokerQueue()));
}

public static RequestTask castRunnable(final Runnable runnable) {
try {
if (runnable instanceof FutureTaskExt) {
Expand Down Expand Up @@ -98,26 +115,9 @@ private void cleanExpiredRequest() {
}
}

cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());

cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());

cleanExpiredRequestInQueue(this.brokerController.getLitePullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInLitePullQueue());

cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());

cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());

cleanExpiredRequestInQueue(this.brokerController.getAckThreadPoolQueue(),
brokerController.getBrokerConfig().getWaitTimeMillsInAckQueue());

cleanExpiredRequestInQueue(this.brokerController.getAdminBrokerThreadPoolQueue(),
brokerController.getBrokerConfig().getWaitTimeMillsInAdminBrokerQueue());
for (Pair<BlockingQueue<Runnable>, Supplier<Long>> pair : cleanExpiredRequestQueueList) {
cleanExpiredRequestInQueue(pair.getObject1(), pair.getObject2().get());
}
}

void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
Expand Down Expand Up @@ -154,6 +154,11 @@ void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, fin
}
}

public synchronized void addCleanExpiredRequestQueue(BlockingQueue<Runnable> cleanExpiredRequestQueue,
Supplier<Long> maxWaitTimeMillsInQueueSupplier) {
cleanExpiredRequestQueueList.add(new Pair<>(cleanExpiredRequestQueue, maxWaitTimeMillsInQueueSupplier));
}

public void shutdown() {
this.scheduledExecutorService.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,46 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import static org.assertj.core.api.Assertions.assertThat;

public class BrokerFastFailureTest {

private BrokerController brokerController;

private final BrokerConfig brokerConfig = new BrokerConfig();

private MessageStore messageStore;

@Before
public void setUp() {
brokerController = Mockito.mock(BrokerController.class);
messageStore = Mockito.mock(DefaultMessageStore.class);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
Mockito.when(brokerController.getSendThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getPullThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getLitePullThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getHeartbeatThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getEndTransactionThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getAdminBrokerThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getAckThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
Mockito.when(messageStore.isOSPageCacheBusy()).thenReturn(false);
Mockito.when(brokerController.getMessageStore()).thenReturn(messageStore);
}

@Test
public void testCleanExpiredRequestInQueue() throws Exception {
BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null);
BrokerFastFailure brokerFastFailure = new BrokerFastFailure(brokerController);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
Expand Down Expand Up @@ -63,4 +93,40 @@ public void run() {
assertThat(((FutureTaskExt) queue.peek()).getRunnable()).isEqualTo(requestTask);
}

@Test
public void testCleanExpiredCustomRequestInQueue() throws Exception {
BrokerFastFailure brokerFastFailure = new BrokerFastFailure(brokerController);
brokerFastFailure.start();
brokerConfig.setWaitTimeMillsInAckQueue(10);
BlockingQueue<Runnable> customThreadPoolQueue = new LinkedBlockingQueue<>();
brokerFastFailure.addCleanExpiredRequestQueue(customThreadPoolQueue, () -> brokerConfig.getWaitTimeMillsInAckQueue());

Runnable runnable = new Runnable() {
@Override
public void run() {

}
};
RequestTask requestTask = new RequestTask(runnable, null, null);
customThreadPoolQueue.add(new FutureTaskExt<>(requestTask, null));

Thread.sleep(2000);

assertThat(customThreadPoolQueue.size()).isEqualTo(0);
assertThat(requestTask.isStopRun()).isEqualTo(true);

brokerConfig.setWaitTimeMillsInAckQueue(10000);

RequestTask requestTask2 = new RequestTask(runnable, null, null);
customThreadPoolQueue.add(new FutureTaskExt<>(requestTask2, null));

Thread.sleep(1000);

assertThat(customThreadPoolQueue.size()).isEqualTo(1);
assertThat(((FutureTaskExt) customThreadPoolQueue.peek()).getRunnable()).isEqualTo(requestTask2);

brokerFastFailure.shutdown();

}

}
Loading