Skip to content

Commit

Permalink
perf(common/core): add executor impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Alice52 committed Feb 28, 2023
1 parent e145537 commit 8caa7ad
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package common.core.executor;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
* handle thread context of child-and-parent.
*
* @author T04856 <br>
* @create 2023-02-28 9:01 AM <br>
* @project system-design <br>
*/
@Slf4j
public abstract class AbstractThreadPoolExecutor extends ThreadPoolExecutor {

protected static final Long KEEP_ALIVE_TIME_SECONDS = 120L;

protected AbstractThreadPoolExecutor(
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

/**
* 在子线程中使用父线程的 MDC#log 上线文
*
* @param runnable
* @return
*/
private static Runnable executor(final Runnable runnable) {

final Map<String, String> context = MDC.getCopyOfContextMap();

return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}

@Override
public void execute(Runnable command) {
super.execute(executor(command));
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package common.core.executor;

import org.slf4j.MDC;
import common.core.executor.factory.ExecutorThreadFactory;

import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 固定队列大小的执行器
*
* @author zack <br>
* @create 2022-04-08 11:56 <br>
* @project mc-platform <br>
*/
public class FixedThreadPoolExecutor extends ThreadPoolExecutor {
public class FixedThreadPoolExecutor extends AbstractThreadPoolExecutor {

private static final Long KEEP_ALIVE_TIME_SECONDS = 120L;

Expand Down Expand Up @@ -42,53 +42,7 @@ public static FixedThreadPoolExecutor newFixedThreadPoolExecutor(
KEEP_ALIVE_TIME_SECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize),
new FixedThreadFactory(),
new ExecutorThreadFactory(ExecutorThreadFactory.ExecutorEnum.Fixed),
handler);
}

/**
* 在子线程中使用父线程的 MDC#log 上线文
*
* @param runnable
* @return
*/
private static Runnable executor(final Runnable runnable) {

final Map<String, String> context = MDC.getCopyOfContextMap();

return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}

@Override
public void execute(Runnable command) {
super.execute(executor(command));
}

static class FixedThreadFactory implements ThreadFactory {

private final AtomicInteger threadNumber = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("fixed-thread-" + threadNumber.getAndIncrement());
return thread;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package common.core.executor;

import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import java.util.concurrent.*;

/**
* 多个线程池并行, 但是某线程池内任务按顺序执行
*
* <pre>
* 1. [不推荐] 直接创建单个线程的线程池, 每个维度都是一个线程池, Map 获取对应的执行线程池 {@code javase#SeqParallelPool}
* 2. [推荐] 直接创建单个线程的线程池
* <pre>
* - 本质上还是单个线程池的执行器: new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, queue, threadFactory);
* - 将统一 key 的所有任务都分配到同一执行器: hash && equals 计算 key
* - 多个执行器可以提高不同key的执行并行度
* </pre>
* </pre>
*
* @see
* @author T04856 <br>
* @create 2023-02-28 9:19 AM <br>
* @project system-design <br>
*/
@Slf4j
public class KeyAffinityExecutor extends AbstractThreadPoolExecutor {
private KeyAffinityExecutor(
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

public static <K> KeyAffinityExecutor newSerializingExecutor(
int parallelism, int queueBufferSize, String threadName) {

return null;
}

public static void main(String[] args) {

KeyAffinityExecutor executorService =
KeyAffinityExecutor.newSerializingExecutor(1, 200, "MY-POOL-%d");
executorService.executeEx("key", () -> log.info("xx"));
}

public <T, K> ListenableFuture<T> submit(K key, @Nonnull Callable<T> task) {
return null;
}

public <K> void executeEx(K key, Runnable task) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package common.core.executor;

import common.core.executor.factory.ExecutorThreadFactory;
import common.core.executor.reject.CallerBlocksPolicy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;

import java.util.concurrent.*;

/**
* 激进执行器:
*
* <pre>
* 1. 优先创建线程
* 2. 达到最大线程后, 放入队列
* 3. 队列满则执行拒绝策略
* </pre>
*
* @author T04856 <br>
* @create 2023-02-28 8:53 AM <br>
* @project system-design <br>
*/
@Slf4j
public class RadicalThreadPoolExecutor extends AbstractThreadPoolExecutor {
private static final int threadCount = 200;
private static final int queueSize = 2000;

private RadicalThreadPoolExecutor(
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

public static RadicalThreadPoolExecutor newRadicalThreadPoolExecutor() {

return new RadicalThreadPoolExecutor(
0,
threadCount,
30,
TimeUnit.SECONDS,
new RadicalBlockingQueue<>(queueSize),
new ExecutorThreadFactory(ExecutorThreadFactory.ExecutorEnum.Radical),
new CallerBlocksPolicy());
}

/**
* @param corePoolSize 处于空闲状态也会保留在池中的线程数
* @param maxPoolSize 线程池中允许的最大线程数
* @param queueSize queue size
* @return
*/
public static RadicalThreadPoolExecutor newRadicalThreadPoolExecutor(
int corePoolSize, int maxPoolSize, int queueSize) {

return new RadicalThreadPoolExecutor(
0,
maxPoolSize,
30,
TimeUnit.SECONDS,
new RadicalBlockingQueue<>(queueSize),
new ExecutorThreadFactory(ExecutorThreadFactory.ExecutorEnum.Radical),
new CallerBlocksPolicy());
}

@Bean("radicalThreadPool")
public ThreadPoolExecutor executor() {

return new ThreadPoolExecutor(
0,
threadCount,
30,
TimeUnit.SECONDS,
new RadicalBlockingQueue<>(queueSize),
new ExecutorThreadFactory(ExecutorThreadFactory.ExecutorEnum.Radical),
new CallerBlocksPolicy());
}
}

@Slf4j
class RadicalBlockingQueue<E> extends LinkedBlockingQueue<E> {

public RadicalBlockingQueue(int capacity) {
super(capacity);
}

@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return super.offer(e, timeout, unit);
}

/**
* 必须和 CallerBlocksPolicy 结合使用, 否则就没有队列概念了, 会导致任务的丢失.
*
* <pre>
* 1. 先返回 false: 造成队列满的假象, 让线程池优先扩容
* 2. 此时显示队列已满, 则尝试添加工作线程
* 3. 若工作线程达到最大值, 则会执行拒绝策略.
* </pre>
*
* @param e
* @return
*/
@Override
public boolean offer(E e) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package common.core.executor.factory;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import static cn.hutool.core.text.CharSequenceUtil.format;

/**
* @author T04856 <br>
* @create 2023-02-28 8:58 AM <br>
* @project system-design <br>
*/
@Slf4j
public class ExecutorThreadFactory implements ThreadFactory {
private static Thread.UncaughtExceptionHandler defaultExceptionHandler =
(t, e) -> log.error("handle exception in child thread: ", e);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ExecutorEnum threadPrefix;

public ExecutorThreadFactory(ExecutorEnum threadPrefix) {
this(threadPrefix, defaultExceptionHandler);
}

public ExecutorThreadFactory(
ExecutorEnum threadPrefix, Thread.UncaughtExceptionHandler exceptionHandler) {
this.threadPrefix = threadPrefix;
if (Objects.nonNull(exceptionHandler)) {
ExecutorThreadFactory.defaultExceptionHandler = exceptionHandler;
}
}

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(format("{}-thread-{}", threadPrefix, threadNumber.getAndIncrement()));
thread.setUncaughtExceptionHandler(defaultExceptionHandler);
return thread;
}

@Getter
public enum ExecutorEnum {
Fixed,
KeyAffinity,
Radical,
;
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package common.core.executor;
package common.core.executor.queue;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* this class have been built-in RadicalThreadPoolExecutor.
*
* @author asd <br>
* @create 2022-08-25 10:41 AM <br>
* @project project-cloud-custom <br>
*/
@Slf4j
public class RadicalBlockingQueue<E> extends LinkedBlockingQueue<E> {
@Deprecated
class RadicalBlockingQueue<E> extends LinkedBlockingQueue<E> {

public RadicalBlockingQueue(int capacity) {
super(capacity);
Expand Down
Loading

0 comments on commit 8caa7ad

Please sign in to comment.