Skip to content

Commit

Permalink
[Feature] Split transform and move jar into connectors directory (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie authored Aug 10, 2024
1 parent 862e205 commit d46cf16
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 124 deletions.
13 changes: 12 additions & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,15 @@ seatunnel.source.ObsFile = connector-file-obs
seatunnel.sink.ObsFile = connector-file-obs
seatunnel.source.Milvus = connector-milvus
seatunnel.sink.Milvus = connector-milvus
seatunnel.sink.ActiveMQ = connector-activemq
seatunnel.sink.ActiveMQ = connector-activemq

seatunnel.transform.Sql = seatunnel-transforms-v2
seatunnel.transform.FieldMapper = seatunnel-transforms-v2
seatunnel.transform.Filter = seatunnel-transforms-v2
seatunnel.transform.FilterRowKind = seatunnel-transforms-v2
seatunnel.transform.JsonPath = seatunnel-transforms-v2
seatunnel.transform.Replace = seatunnel-transforms-v2
seatunnel.transform.Split = seatunnel-transforms-v2
seatunnel.transform.Copy = seatunnel-transforms-v2
seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
seatunnel.transform.LLM = seatunnel-transforms-v2
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.seatunnel.api.table.factory.FactoryException;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.SeaTunnelException;
Expand All @@ -49,7 +48,6 @@

import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;

/** The util used for Spark/Flink to create to SeaTunnelSource etc. */
public class PluginUtil {
Expand Down Expand Up @@ -130,21 +128,21 @@ private static SeaTunnelSource fallbackCreate(
return source;
}

public static TableTransformFactory createTransformFactory(
public static Optional<? extends Factory> createTransformFactory(
SeaTunnelFactoryDiscovery factoryDiscovery,
SeaTunnelTransformPluginDiscovery transformPluginDiscovery,
Config transformConfig,
List<URL> pluginJars) {
PluginIdentifier pluginIdentifier =
PluginIdentifier.of(
ENGINE_TYPE, "transform", transformConfig.getString(PLUGIN_NAME.key()));
final ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(transformConfig);
final String factoryId = readonlyConfig.get(PLUGIN_NAME);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
final TableTransformFactory factory =
discoverFactory(classLoader, TableTransformFactory.class, factoryId);
pluginJars.addAll(
transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
return factory;
try {
return factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
} catch (FactoryException e) {
return Optional.empty();
}
}

public static Optional<? extends Factory> createSinkFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
Expand All @@ -41,6 +42,7 @@
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
Expand All @@ -59,15 +61,23 @@ protected TransformExecuteProcessor(
@Override
protected List<TableTransformFactory> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {

SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableTransformFactory.class, ADD_URL_TO_CLASSLOADER);
SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
new SeaTunnelTransformPluginDiscovery();

return pluginConfigs.stream()
.map(
transformConfig ->
PluginUtil.createTransformFactory(
transformPluginDiscovery, transformConfig, jarPaths))
factoryDiscovery,
transformPluginDiscovery,
transformConfig,
jarPaths))
.distinct()
.filter(Optional::isPresent)
.map(Optional::get)
.map(e -> (TableTransformFactory) e)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
Expand All @@ -50,6 +51,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
Expand All @@ -69,16 +71,23 @@ protected TransformExecuteProcessor(
protected List<TableTransformFactory> initializePlugins(List<? extends Config> pluginConfigs) {
SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
new SeaTunnelTransformPluginDiscovery();

SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableTransformFactory.class);
List<URL> pluginJars = new ArrayList<>();
List<TableTransformFactory> transforms =
pluginConfigs.stream()
.map(
transformConfig ->
PluginUtil.createTransformFactory(
factoryDiscovery,
transformPluginDiscovery,
transformConfig,
pluginJars))
new ArrayList<>()))
.distinct()
.filter(Optional::isPresent)
.map(Optional::get)
.map(e -> (TableTransformFactory) e)
.collect(Collectors.toList());
sparkRuntimeEnvironment.registerPlugin(pluginJars);
return transforms;
Expand Down
48 changes: 17 additions & 31 deletions seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,15 @@
<scope>provided</scope>
</dependencySet>

<!-- ============ Connectors Jars ============ -->
<!-- ============ Connectors Jars And Transforms V2 Jar ============ -->
<!-- SeaTunnel connectors -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
<unpack>false</unpack>
<includes>
<include>org.apache.seatunnel:connector-*:jar</include>
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
</includes>
<excludes>
<exclude>org.apache.seatunnel:connector-common</exclude>
Expand All @@ -160,36 +161,7 @@
<scope>provided</scope>
</dependencySet>

<!-- ============ SeaTunnel Transforms V2 Jars And SeaTunnel Hadoop3 Uber Jar ============ -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
<unpack>false</unpack>
<includes>
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
<include>org.apache.hadoop:hadoop-aws:jar</include>
<include>com.amazonaws:aws-java-sdk-bundle:jar</include>
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
<!--Add hadoop aliyun jar -->
<include>org.apache.hadoop:hadoop-aliyun:jar</include>
<include>com.aliyun.oss:aliyun-sdk-oss:jar</include>
<include>org.jdom:jdom:jar</include>

<!--Add netty buffer jar -->
<include>io.netty:netty-buffer:jar</include>
<include>io.netty:netty-common:jar</include>

<!--Add hive exec jar -->
<include>org.apache.hive:hive-exec:jar</include>
<include>org.apache.hive:hive-service:jar</include>
<include>org.apache.thrift:libfb303:jar</include>
</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
<outputDirectory>/lib</outputDirectory>
<scope>provided</scope>
</dependencySet>

<!-- =================== JDBC Connector Drivers =================== -->
<!-- =================== JDBC Connector Drivers And SeaTunnel Hadoop3 Uber Jar =================== -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
Expand All @@ -209,6 +181,20 @@
<include>com.amazon.redshift:redshift-jdbc42:jar</include>
<include>net.snowflake.snowflake-jdbc:jar</include>
<include>com.xugudb:xugu-jdbc:jar</include>
<include>org.apache.hadoop:hadoop-aws:jar</include>
<include>com.amazonaws:aws-java-sdk-bundle:jar</include>
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
<!--Add hadoop aliyun jar -->
<include>org.apache.hadoop:hadoop-aliyun:jar</include>
<include>com.aliyun.oss:aliyun-sdk-oss:jar</include>
<include>org.jdom:jdom:jar</include>
<!--Add netty buffer jar -->
<include>io.netty:netty-buffer:jar</include>
<include>io.netty:netty-common:jar</include>
<!--Add hive exec jar -->
<include>org.apache.hive:hive-exec:jar</include>
<include>org.apache.hive:hive-service:jar</include>
<include>org.apache.thrift:libfb303:jar</include>
</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
<outputDirectory>/lib</outputDirectory>
Expand Down
6 changes: 3 additions & 3 deletions seatunnel-dist/src/main/assembly/assembly-bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,20 @@
<scope>provided</scope>
</dependencySet>

<!-- ============ SeaTunnel Transforms V2 Jars And SeaTunnel Hadoop3 Uber Jar============ -->
<!-- ============ SeaTunnel Hadoop3 Uber Jar============ -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
<unpack>false</unpack>
<includes>
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
<outputDirectory>/lib</outputDirectory>
<scope>provided</scope>
</dependencySet>

<!-- ============ Connectors Jars ============ -->
<!-- ============ Connectors Jars And Transforms V2 Jar ============ -->
<!-- SeaTunnel connectors for Demo -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
Expand All @@ -184,6 +183,7 @@
<includes>
<include>org.apache.seatunnel:connector-fake:jar</include>
<include>org.apache.seatunnel:connector-console:jar</include>
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
</includes>
<outputDirectory>/connectors</outputDirectory>
<scope>provided</scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,13 @@ public static void copySeaTunnelStarterToContainer(
MountableFile.forHostPath(startJarPath),
Paths.get(seatunnelHomeInContainer, "starter", startJarName).toString());

// copy lib
// copy transform
String transformJar = "seatunnel-transforms-v2.jar";
Path transformJarPath =
Paths.get(PROJECT_ROOT_PATH, "seatunnel-transforms-v2", "target", transformJar);
container.withCopyFileToContainer(
MountableFile.forHostPath(transformJarPath),
Paths.get(seatunnelHomeInContainer, "lib", transformJar).toString());
Paths.get(seatunnelHomeInContainer, "connectors", transformJar).toString());

// copy bin
final String startBinPath = startModulePath + File.separator + "src/main/bin/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService classLoade
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "sink", Collections.emptyList());

List<URL> connectorJars = getConnectorJarList(sourceConfigs, sinkConfigs);
List<URL> connectorJars = getConnectorJarList(sourceConfigs, transformConfigs, sinkConfigs);
if (!commonPluginJars.isEmpty()) {
connectorJars.addAll(commonPluginJars);
}
Expand Down Expand Up @@ -238,18 +238,32 @@ public Set<URL> getUsedFactoryUrls(List<Action> sinkActions) {
}

private List<URL> getConnectorJarList(
List<? extends Config> sourceConfigs, List<? extends Config> sinkConfigs) {
List<? extends Config> sourceConfigs,
List<? extends Config> transformConfigs,
List<? extends Config> sinkConfigs) {
List<PluginIdentifier> factoryIds =
Stream.concat(
sourceConfigs.stream()
.map(ConfigParserUtil::getFactoryId)
.map(
factory ->
PluginIdentifier.of(
CollectionConstants
.SEATUNNEL_PLUGIN,
CollectionConstants.SOURCE_PLUGIN,
factory)),
Stream.concat(
sourceConfigs.stream()
.map(ConfigParserUtil::getFactoryId)
.map(
factory ->
PluginIdentifier.of(
CollectionConstants
.SEATUNNEL_PLUGIN,
CollectionConstants
.SOURCE_PLUGIN,
factory)),
transformConfigs.stream()
.map(ConfigParserUtil::getFactoryId)
.map(
factory ->
PluginIdentifier.of(
CollectionConstants
.SEATUNNEL_PLUGIN,
CollectionConstants
.TRANSFORM_PLUGIN,
factory))),
sinkConfigs.stream()
.map(ConfigParserUtil::getFactoryId)
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class SeaTunnelTransformPluginDiscovery extends AbstractPluginDiscovery<SeaTunnelTransform> {

public SeaTunnelTransformPluginDiscovery() {
super(Common.libDir());
super(Common.connectorDir());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;

import lombok.NonNull;

public abstract class AbstractCatalogSupportTransform extends AbstractSeaTunnelTransform {
public abstract class AbstractCatalogSupportTransform implements SeaTunnelTransform<SeaTunnelRow> {
protected CatalogTable inputCatalogTable;

protected volatile CatalogTable outputCatalogTable;
Expand All @@ -32,6 +34,18 @@ public AbstractCatalogSupportTransform(@NonNull CatalogTable inputCatalogTable)
this.inputCatalogTable = inputCatalogTable;
}

@Override
public SeaTunnelRow map(SeaTunnelRow row) {
return transformRow(row);
}

/**
* Outputs transformed row data.
*
* @param inputRow upstream input row data
*/
protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);

@Override
public CatalogTable getProducedCatalogTable() {
if (outputCatalogTable == null) {
Expand Down
Loading

0 comments on commit d46cf16

Please sign in to comment.