Skip to content

Commit

Permalink
[Fix][Zeta] Fix thread classloader be set to null when use cache mode (
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored and chaorongzhi committed Aug 21, 2024
1 parent bcb6975 commit a2da875
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 44 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,27 @@ public synchronized void releaseClassLoader(long jobId, Collection<URL> jars) {
return;
}
if (referenceCount == 0) {
classLoaderMap.remove(key);
ClassLoader classLoader = classLoaderMap.remove(key);
log.info("Release classloader for job {} with jars {}", jobId, jars);
classLoaderReferenceCount.get(jobId).remove(key);
recycleClassLoaderFromThread(classLoader);
}
if (classLoaderMap.isEmpty()) {
classLoaderCache.remove(jobId);
classLoaderReferenceCount.remove(jobId);
}
}

private static void recycleClassLoaderFromThread(ClassLoader classLoader) {
Thread.getAllStackTraces().keySet().stream()
.filter(thread -> thread.getContextClassLoader() == classLoader)
.forEach(
thread -> {
log.info("recycle classloader for thread " + thread.getName());
thread.setContextClassLoader(null);
});
}

private String covertJarsToKey(Collection<URL> jars) {
return jars.stream().map(URL::toString).sorted().reduce((a, b) -> a + b).orElse("");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil;
import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
Expand Down Expand Up @@ -209,7 +208,6 @@ public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService classLoade
classLoaderService.releaseClassLoader(
Long.parseLong(jobConfig.getJobContext().getJobId()), connectorJars);
}
ClassLoaderUtil.recycleClassLoaderFromThread(classLoader);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,35 @@ void testSameJarInDifferentJob() throws MalformedURLException {
Lists.newArrayList(new URL("file:///console.jar"), new URL("file:///fake.jar")));
Assertions.assertEquals(0, classLoaderService.queryClassLoaderCount());
}

@Test
void testRecycleClassLoaderFromThread() throws MalformedURLException, InterruptedException {
ClassLoader classLoader =
classLoaderService.getClassLoader(
3L,
Lists.newArrayList(
new URL("file:///console.jar"), new URL("file:///fake.jar")));
ClassLoader appClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
Thread thread =
new Thread(
() -> {
while (Thread.currentThread().getContextClassLoader() != null) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
thread.start();
Thread.currentThread().setContextClassLoader(appClassLoader);
Assertions.assertEquals(classLoader, thread.getContextClassLoader());
classLoaderService.releaseClassLoader(
3L,
Lists.newArrayList(new URL("file:///console.jar"), new URL("file:///fake.jar")));
Assertions.assertNull(thread.getContextClassLoader());
Thread.sleep(2000);
Assertions.assertFalse(thread.isAlive());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.ThreadShareMode;
import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
Expand Down Expand Up @@ -919,9 +918,7 @@ void taskDone(Task task) {

private void recycleClassLoader(TaskGroupLocation taskGroupLocation) {
TaskGroupContext context = executionContexts.get(taskGroupLocation);
ClassLoader classLoader = context.getClassLoader();
executionContexts.get(taskGroupLocation).setClassLoader(null);
ClassLoaderUtil.recycleClassLoaderFromThread(classLoader);
classLoaderService.releaseClassLoader(taskGroupLocation.getJobId(), context.getJars());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
Expand Down Expand Up @@ -217,8 +216,6 @@ public void init(long initializationTimestamp, boolean restart) throws Exception
jobImmutableInformation.getJobId(),
jobImmutableInformation.getPluginJarsUrls());

ClassLoaderUtil.recycleClassLoaderFromThread(classLoader);

final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
PlanUtils.fromLogicalDAG(
logicalDag,
Expand Down

0 comments on commit a2da875

Please sign in to comment.