Skip to content

Commit

Permalink
[Fix][Zeta] Introduce SeaTunnel CompletableFuture to prevent ForkJoin…
Browse files Browse the repository at this point in the history
…Pool thread shortage (#8445)
  • Loading branch information
Hisoka-X authored Jan 8, 2025
1 parent eaa15e4 commit b8b8ca1
Show file tree
Hide file tree
Showing 57 changed files with 467 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static org.apache.seatunnel.api.ImportShadeClassCheckTest.isWindows;
import static org.apache.seatunnel.api.ImportClassCheckTest.isWindows;

@Slf4j
public class ChineseCharacterCheckTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import static java.nio.file.StandardOpenOption.READ;

@Slf4j
public class ImportShadeClassCheckTest {
public class ImportClassCheckTest {

private static Map<String, NodeList<ImportDeclaration>> importsMap = new HashMap<>();
private final String SEATUNNEL_SHADE_PREFIX = "org.apache.seatunnel.shade.";
Expand Down Expand Up @@ -85,58 +85,95 @@ public static void beforeAll() {
@Test
public void guavaShadeCheck() {
Map<String, List<String>> errorMap =
checkShade(Collections.singletonList("com.google.common"));
Assertions.assertEquals(0, errorMap.size(), errorMsg("guava", errorMap));
checkImportClassPrefixWithAll(Collections.singletonList("com.google.common"));
Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("guava", errorMap));
log.info("check guava shade successfully");
}

@Test
public void jacksonShadeCheck() {
Map<String, List<String>> errorMap =
checkShade(
checkImportClassPrefixWithExclude(
Collections.singletonList("com.fasterxml.jackson"),
Arrays.asList(
"org.apache.seatunnel.format.compatible.debezium.json",
"org.apache.seatunnel.format.compatible.kafka.connect.json",
"org.apache.seatunnel.connectors.druid.sink",
"org.apache.seatunnel.connectors.seatunnel.typesense.client"));
Assertions.assertEquals(0, errorMap.size(), errorMsg("jackson", errorMap));
Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("jackson", errorMap));
log.info("check jackson shade successfully");
}

@Test
public void jettyShadeCheck() {
Map<String, List<String>> errorMap =
checkShade(Collections.singletonList("org.eclipse.jetty"));
Assertions.assertEquals(0, errorMap.size(), errorMsg("jetty", errorMap));
checkImportClassPrefixWithAll(Collections.singletonList("org.eclipse.jetty"));
Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("jetty", errorMap));
log.info("check jetty shade successfully");
}

@Test
public void janinoShadeCheck() {
Map<String, List<String>> errorMap =
checkShade(Arrays.asList("org.codehaus.janino", "org.codehaus.commons"));
Assertions.assertEquals(0, errorMap.size(), errorMsg("janino", errorMap));
checkImportClassPrefixWithAll(
Arrays.asList("org.codehaus.janino", "org.codehaus.commons"));
Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("janino", errorMap));
log.info("check janino shade successfully");
}

private Map<String, List<String>> checkShade(List<String> prefixList) {
return checkShade(prefixList, Collections.emptyList());
@Test
public void javaUtilCompletableFutureCheck() {
Map<String, List<String>> errorMap =
checkImportClassPrefix(
Collections.singletonList("java.util.concurrent.CompletableFuture"),
Collections.singletonList("org.apache.seatunnel.engine"),
Collections.singletonList("org.apache.seatunnel.engine.e2e"));
Assertions.assertEquals(
0,
errorMap.size(),
errorMsg(
"Can not use java.util.concurrent.CompletableFuture, please use org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture instead.",
errorMap));
log.info("check java concurrent CompletableFuture successfully");
}

private Map<String, List<String>> checkImportClassPrefixWithAll(List<String> prefixList) {
return checkImportClassPrefix(prefixList, Collections.emptyList(), Collections.emptyList());
}

private Map<String, List<String>> checkShade(
private Map<String, List<String>> checkImportClassPrefixWithExclude(
List<String> prefixList, List<String> packageWhiteList) {
return checkImportClassPrefix(prefixList, Collections.emptyList(), packageWhiteList);
}

private Map<String, List<String>> checkImportClassPrefixWithInclude(
List<String> prefixList, List<String> packageCheckList) {
return checkImportClassPrefix(prefixList, packageCheckList, Collections.emptyList());
}

private Map<String, List<String>> checkImportClassPrefix(
List<String> prefixList, List<String> packageCheckList, List<String> packageWhiteList) {
List<String> pathWhiteList =
packageWhiteList.stream()
.map(whitePackage -> whitePackage.replace(".", isWindows ? "\\" : "/"))
.collect(Collectors.toList());
List<String> pathCheckList =
packageCheckList.stream()
.map(whitePackage -> whitePackage.replace(".", isWindows ? "\\" : "/"))
.collect(Collectors.toList());
Map<String, List<String>> errorMap = new HashMap<>();
importsMap.forEach(
(clazzPath, imports) -> {
boolean match =
packageWhiteList.stream()
.map(
whitePackage ->
whitePackage.replace(
".", isWindows ? "\\" : "/"))
.anyMatch(clazzPath::contains);
if (!match) {
boolean match;
if (pathCheckList.isEmpty()) {
match = pathWhiteList.stream().noneMatch(clazzPath::contains);
} else {
match =
pathCheckList.stream().anyMatch(clazzPath::contains)
&& pathWhiteList.stream().noneMatch(clazzPath::contains);
}

if (match) {
List<String> collect =
imports.stream()
.filter(
Expand All @@ -156,11 +193,17 @@ private Map<String, List<String>> checkShade(
return errorMap;
}

private String errorMsg(String checkType, Map<String, List<String>> errorMap) {
private String shadeErrorMsg(String checkType, Map<String, List<String>> errorMap) {
String msg =
String.format("%s shade is not up to code, need add prefix [", checkType)
+ SEATUNNEL_SHADE_PREFIX
+ "]. \n";
return errorMsg(msg, errorMap);
}

private String errorMsg(String message, Map<String, List<String>> errorMap) {
StringBuilder msg = new StringBuilder();
msg.append(String.format("%s shade is not up to code, need add prefix [", checkType))
.append(SEATUNNEL_SHADE_PREFIX)
.append("]. \n");
msg.append(message).append("\n");
errorMap.forEach(
(key, value) -> {
msg.append(key).append("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.seatunnel.api.ImportShadeClassCheckTest.isWindows;
import static org.apache.seatunnel.api.ImportClassCheckTest.isWindows;

@Slf4j
public class UTClassNameCheckTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ private static boolean isSystemThread(String s) {
|| s.startsWith("seatunnel-coordinator-service")
|| s.startsWith("GC task thread")
|| s.contains("CompilerThread")
|| s.startsWith("SeaTunnel-CompletableFuture-Thread-")
|| s.contains("NioNetworking-closeListenerExecutor")
|| s.contains("ForkJoinPool.commonPool")
|| s.contains("DestroyJavaVM")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.ConnectorJarType;
import org.apache.seatunnel.engine.core.job.JobStatus;
Expand Down Expand Up @@ -68,7 +69,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobStatus;
Expand All @@ -57,7 +58,6 @@
import java.util.List;
import java.util.Map;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

package org.apache.seatunnel.engine.common.utils;

import java.util.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;

/** A future which prevents completion by outside caller */
public class PassiveCompletableFuture<T> extends CompletableFuture<T> {

public PassiveCompletableFuture() {}

public PassiveCompletableFuture(java.util.concurrent.CompletableFuture<T> chainedFuture) {
this(new CompletableFuture<>(chainedFuture));
}

public PassiveCompletableFuture(CompletableFuture<T> chainedFuture) {
if (chainedFuture != null) {
chainedFuture.whenComplete(
Expand Down
Loading

0 comments on commit b8b8ca1

Please sign in to comment.