Skip to content

Commit

Permalink
[Feature][st-engine] Support for transform-v2 API
Browse files Browse the repository at this point in the history
Release binary package:
- Move engine starter jar from `lib/` to `starter/` dir
- Outpout `seatunnel-transforms-v2.jar` to `lib/` dir

Plugin discovery:
- Support for loading transform-* jar from `transform/*` dir

Maven modules:
- Add seatunnel-transforms-v2 module
- Add seatunnel-transform-v2-e2e module

Engine Starter:
- Add `lib/` to task execute classpath

ST-Engine:
- Support for Split、Replace、Filter transform

E2E:
- Support copy transform jar into container `lib/` dir
  • Loading branch information
hailin0 committed Oct 21, 2022
1 parent 9e49638 commit 7ad1b54
Show file tree
Hide file tree
Showing 50 changed files with 1,607 additions and 61 deletions.
3 changes: 2 additions & 1 deletion docs/en/contribution/coding-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ This guide documents an overview of the current Apache SeaTunnel modules and bes
| seatunnel-engine | SeaTunnel engine module, seatunnel-engine is a new computational engine developed by the SeaTunnel Community that focuses on data synchronization. |
| seatunnel-formats | SeaTunnel formats module, used to offer the ability of formatting data |
| seatunnel-plugin-discovery | SeaTunnel plugin discovery module, used to offer the ability of loading SPI plugins from classpath |
| seatunnel-transforms | SeaTunnel transform plugin module |
| seatunnel-transforms | SeaTunnel transform V1 module, currently transform V1 is in a stable state, the community will continue to maintain it, but there will be no major feature updates |
| seatunnel-transforms-v2 | SeaTunnel transform V2 module, currently transform V2 is under development and the community will focus on it |
| seatunnel-translation | SeaTunnel translation module, used to adapt Connector V2 and other computing engines such as Spark Flink etc... |

## How to submit a high quality pull request
Expand Down
51 changes: 51 additions & 0 deletions docs/en/contribution/contribute-transform-v2-guide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Contribute Transform Guide

This document describes how to understand, develop and contribute a transform.

We also provide the [transform end to end test tool](../../../seatunnel-e2e/seatunnel-transform-v2-e2e) to verify the data input and output by the transform.

## Concepts

### Input

### Output

### Translation

## Core APIs

### SeaTunnelTransform

```java
/**
* Set the data type info of input data. This method will be automatically called by translation.
*
* @param dataType The data type info of input.
*/
void setTypeInfo(SeaTunnelDataType<T> dataType);

/**
* Get the data type of the records produced by this transform.
*
* @return SeaTunnel data type.
*/
SeaTunnelDataType<T> getProducedType();

/**
* Transform input data to {@link this#getProducedType()} type data.
*
* @param row the data need be transform.
* @return transformed data.
*/
T map(T row);
```

### Abstract Templates

#### AbstractSeaTunnelTransform
#### SingleFieldOutputTransform
#### MultipleFieldOutputTransform

## Develop a Transform

## Transform Test Tool
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<module>seatunnel-apis</module>
<module>seatunnel-core</module>
<module>seatunnel-transforms</module>
<module>seatunnel-transforms-v2</module>
<module>seatunnel-connectors</module>
<module>seatunnel-connectors-v2</module>
<module>seatunnel-api</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,28 @@

import java.io.Serializable;

public interface SeaTunnelTransform<T> extends Serializable, PluginIdentifierInterface,
SeaTunnelPluginLifeCycle, SeaTunnelJobAware {

T map(T row);

void setTypeInfo(SeaTunnelDataType<T> seaTunnelRowType);

public interface SeaTunnelTransform<T> extends Serializable,
PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelJobAware {

/**
* Set the data type info of input data. This method will be automatically called by translation.
*
* @param dataType The data type info of input.
*/
void setTypeInfo(SeaTunnelDataType<T> dataType);

/**
* Get the data type of the records produced by this transform.
*
* @return SeaTunnel data type.
*/
SeaTunnelDataType<T> getProducedType();

/**
* Transform input data to {@link this#getProducedType()} type data.
*
* @param row the data need be transform.
* @return transformed data.
*/
T map(T row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -43,6 +44,8 @@ private Common() {
*/
public static final int COLLECTION_SIZE = 16;

private static final int LIB_DIR_DEPTH = 1;

private static final int PLUGIN_LIB_DIR_DEPTH = 3;

private static DeployMode MODE;
Expand Down Expand Up @@ -87,8 +90,8 @@ public static Path appRootDir() {
}
}

public static Path appLibDir() {
return appRootDir().resolve("lib");
public static Path appStarterDir() {
return appRootDir().resolve("starter");
}

/**
Expand Down Expand Up @@ -129,6 +132,30 @@ public static Path connectorDir() {
}
}

/**
* lib Dir
*/
public static Path libDir() {
String seatunnelHome = System.getProperty("SEATUNNEL_HOME");
if (StringUtils.isBlank(seatunnelHome)) {
seatunnelHome = appRootDir().toString();
}
return Paths.get(seatunnelHome, "lib");
}

/**
* return lib jars, which located in 'lib/*'.
*/
public static List<Path> getLibJars() {
Path libRootDir = Common.libDir();
if (!Files.exists(libRootDir) || !Files.isDirectory(libRootDir)) {
return Collections.emptyList();
}
return Arrays.stream(libRootDir.toFile().listFiles((dir, name) -> name.endsWith(".jar")))
.map(file -> file.toPath())
.collect(Collectors.toList());
}

public static Path pluginTarball() {
return appRootDir().resolve("plugins.tar.gz");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class CommonTest {

@Test
public void appLibDir() {
assertEquals(Common.appRootDir().toString() + File.separator + "lib", Common.appLibDir().toString());
assertEquals(Common.appRootDir().toString() + File.separator + "starter", Common.appStarterDir().toString());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-core-flink-sql.jar
APP_JAR=${APP_DIR}/starter/seatunnel-core-flink-sql.jar
APP_MAIN="org.apache.seatunnel.core.sql.FlinkSqlStarter"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class FlinkSqlStarter implements Starter {
// set the deployment mode, used to get the job jar path.
Common.setStarter(true);
Common.setDeployMode(flinkCommandArgs.getDeployMode());
this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-core-flink.jar
APP_JAR=${APP_DIR}/starter/seatunnel-core-flink.jar
ENV_PARAMETERS_MAIN="org.apache.seatunnel.core.flink.FlinkEnvParameterParser"
APP_MAIN="org.apache.seatunnel.core.flink.FlinkStarter"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ENV FLINK_HOME=/flink

WORKDIR /seatunnel

COPY target/seatunnel-core-flink.jar /seatunnel/lib/
COPY target/seatunnel-core-flink.jar /seatunnel/starter/
COPY src/main/bin /seatunnel/bin/

ENTRYPOINT [ "/seatunnel/bin/start-seatunnel-flink.sh" ]
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class FlinkStarter implements Starter {
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode());
Common.setStarter(true);
this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
}

@SuppressWarnings("checkstyle:RegexpSingleline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-core-spark.jar
APP_JAR=${APP_DIR}/starter/seatunnel-core-spark.jar
APP_MAIN="org.apache.seatunnel.core.spark.SparkStarter"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ENV SPARK_HOME=/spark

WORKDIR /seatunnel

COPY target/seatunnel-core-spark.jar /seatunnel/lib/
COPY target/seatunnel-core-spark.jar /seatunnel/starter/
COPY src/main/bin /seatunnel/bin/

ENTRYPOINT [ "/seatunnel/bin/start-seatunnel-spark.sh" ]
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ protected void appendArgs(List<String> commands, String[] args) {
* append appJar to StringBuilder
*/
protected void appendAppJar(List<String> commands) {
commands.add(Common.appLibDir().resolve("seatunnel-core-spark.jar").toString());
commands.add(Common.appStarterDir().resolve("seatunnel-core-spark.jar").toString());
}

@SuppressWarnings("checkstyle:Indentation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-flink-starter.jar
APP_JAR=${APP_DIR}/starter/seatunnel-flink-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.flink.FlinkStarter"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ENV FLINK_HOME=/flink

WORKDIR /seatunnel

COPY target/seatunnel-core-flink.jar /seatunnel/lib/
COPY target/seatunnel-core-flink.jar /seatunnel/starter/
COPY src/main/bin /seatunnel/bin/

ENTRYPOINT [ "/seatunnel/bin/start-seatunnel-flink-connector-v2.sh" ]
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class FlinkStarter implements Starter {
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode());
Common.setStarter(true);
this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
}

@SuppressWarnings("checkstyle:RegexpSingleline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ private void registerPlugin() {

pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));

List<URL> libJarDependencies = Common.getLibJars().stream()
.map(Path::toUri)
.map(uri -> {
try {
return uri.toURL();
} catch (MalformedURLException e) {
throw new RuntimeException("the uri of jar illegal:" + uri, e);
}
})
.collect(Collectors.toList());

libJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));

flinkEnvironment.registerPlugin(pluginsJarDependencies);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-spark-starter.jar
APP_JAR=${APP_DIR}/starter/seatunnel-spark-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.spark.SparkStarter"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ENV SPARK_HOME=/spark

WORKDIR /seatunnel

COPY target/seatunnel-core-spark.jar /seatunnel/lib/
COPY target/seatunnel-core-spark.jar /seatunnel/starter/
COPY src/main/bin /seatunnel/bin/

ENTRYPOINT [ "/seatunnel/bin/start-seatunnel-spark-connector-v2.sh" ]
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public List<String> buildCommands() throws IOException {
Common.setDeployMode(commandArgs.getDeployMode());
Common.setStarter(true);
this.jars.addAll(Common.getPluginsJarDependencies());
this.jars.addAll(Common.getLibJars());
this.jars.addAll(getConnectorJarDependencies());
return buildFinal();
}
Expand Down Expand Up @@ -260,7 +261,7 @@ protected void appendArgs(List<String> commands, String[] args) {
* append appJar to StringBuilder
*/
protected void appendAppJar(List<String> commands) {
commands.add(Common.appLibDir().resolve("seatunnel-spark-starter.jar").toString());
commands.add(Common.appStarterDir().resolve("seatunnel-spark-starter.jar").toString());
}

@SuppressWarnings("checkstyle:Indentation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-starter.jar
APP_JAR=${APP_DIR}/starter/seatunnel-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
Expand All @@ -60,4 +60,9 @@ if [ -z $SEATUNNEL_CONFIG ]; then
SEATUNNEL_CONFIG=${CONF_DIR}/seatunnel.yaml
fi

java -Dseatunnel.config=${SEATUNNEL_CONFIG} -Dhazelcast.config=${HAZELCAST_CONFIG} -cp ${APP_JAR} ${APP_MAIN} ${args}
JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.config=${SEATUNNEL_CONFIG}"
JAVA_OPTS="${JAVA_OPTS} -Dhazelcast.config=${HAZELCAST_CONFIG}"

CLASS_PATH=${APP_DIR}/lib/*:${APP_JAR}

java ${JAVA_OPTS} -cp ${CLASS_PATH} ${APP_MAIN} ${args}
10 changes: 8 additions & 2 deletions seatunnel-core/seatunnel-starter/src/main/bin/seatunnel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-starter.jar
APP_JAR=${APP_DIR}/starter/seatunnel-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
Expand Down Expand Up @@ -64,4 +64,10 @@ if [ -z $SEATUNNEL_CONFIG ]; then
SEATUNNEL_CONFIG=${CONF_DIR}/seatunnel.yaml
fi

java -Dhazelcast.client.config=${HAZELCAST_CLIENT_CONFIG} -Dseatunnel.config=${SEATUNNEL_CONFIG} -Dhazelcast.config=${HAZELCAST_CONFIG} -cp ${APP_JAR} ${APP_MAIN} ${args}
JAVA_OPTS="${JAVA_OPTS} -Dhazelcast.client.config=${HAZELCAST_CLIENT_CONFIG}"
JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.config=${SEATUNNEL_CONFIG}"
JAVA_OPTS="${JAVA_OPTS} -Dhazelcast.config=${HAZELCAST_CONFIG}"

CLASS_PATH=${APP_DIR}/lib/*:${APP_JAR}

java ${JAVA_OPTS} -cp ${CLASS_PATH} ${APP_MAIN} ${args}
7 changes: 7 additions & 0 deletions seatunnel-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- transforms -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-transforms-v2</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
Expand Down
Loading

0 comments on commit 7ad1b54

Please sign in to comment.