Skip to content

Commit

Permalink
[Feature][st-engine] Support for transform-v2 API
Browse files Browse the repository at this point in the history
Release binary package:
- Move engine starter jar from `lib/` to `starter/` dir
- Outpout `seatunnel-transforms-v2.jar` to `lib/` dir

Maven modules:
- Add seatunnel-transforms-v2 module
- Add seatunnel-transforms-v2-e2e module

Engine Starter:
- Add `lib/` to task execute classpath

ST-Engine:
- Support for Split、Replace、Filter transform

E2E:
- Copy `seatunnel-transforms-v2.jar` into container `lib/` dir
- Improve get project root path for e2e test module
  • Loading branch information
hailin0 committed Oct 22, 2022
1 parent 9e49638 commit 0196c32
Show file tree
Hide file tree
Showing 49 changed files with 1,616 additions and 62 deletions.
3 changes: 2 additions & 1 deletion docs/en/contribution/coding-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ This guide documents an overview of the current Apache SeaTunnel modules and bes
| seatunnel-engine | SeaTunnel engine module, seatunnel-engine is a new computational engine developed by the SeaTunnel Community that focuses on data synchronization. |
| seatunnel-formats | SeaTunnel formats module, used to offer the ability of formatting data |
| seatunnel-plugin-discovery | SeaTunnel plugin discovery module, used to offer the ability of loading SPI plugins from classpath |
| seatunnel-transforms | SeaTunnel transform plugin module |
| seatunnel-transforms | SeaTunnel transform V1 module, currently transform V1 is in a stable state, the community will continue to maintain it, but there will be no major feature updates |
| seatunnel-transforms-v2 | SeaTunnel transform V2 module, currently transform V2 is under development and the community will focus on it |
| seatunnel-translation | SeaTunnel translation module, used to adapt Connector V2 and other computing engines such as Spark Flink etc... |

## How to submit a high quality pull request
Expand Down
90 changes: 90 additions & 0 deletions docs/en/contribution/contribute-transform-v2-guide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Contribute Transform Guide

This document describes how to understand, develop and contribute a transform.

We also provide the [transform e2e test](../../../seatunnel-e2e/seatunnel-transforms-v2-e2e)
to verify the data input and output by the transform.

## Concepts

Using SeaTunnel you can read or write data through the connector, but if you need to
process your data after reading or before writing, then need to use transform.

Use transform to make simple edits to your data rows or fields, such as split field,
change field values, add or remove field.

### DataType transform

Transform receives datatype input from upstream(source or transform) and outputs new datatype to
downstream(sink or transform), this process is datatype transform.


| A | B | C |
|-----------|-----------|-----------|
| STRING | INT | BOOLEAN |

| C | B | A |
|-----------|-----------|-----------|
| STRING | INT | BOOLEAN |

| C | B | A |
|-----------|-----------|-----------|
| BOOLEAN | INT | STRING |




| B | C |
|-----------|-----------|
| INT | BOOLEAN |

| C | B |
|-----------|-----------|
| BOOLEAN | INT |

| C | B | D |
|-----------|-----------|-----------|
| BOOLEAN | INT | FLOAT |


### Data transform

### Translation

## Core APIs

### SeaTunnelTransform

```java
/**
* Set the data type info of input data. This method will be automatically called by translation.
*
* @param dataType The data type info of input.
*/
void setTypeInfo(SeaTunnelDataType<T> dataType);

/**
* Get the data type of the records produced by this transform.
*
* @return SeaTunnel data type.
*/
SeaTunnelDataType<T> getProducedType();

/**
* Transform input data to {@link this#getProducedType()} type data.
*
* @param row the data need be transform.
* @return transformed data.
*/
T map(T row);
```

### Abstract Templates

#### SingleFieldOutputTransform

#### MultipleFieldOutputTransform

## Develop a Transform

## Transform Test Tool
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<module>seatunnel-apis</module>
<module>seatunnel-core</module>
<module>seatunnel-transforms</module>
<module>seatunnel-transforms-v2</module>
<module>seatunnel-connectors</module>
<module>seatunnel-connectors-v2</module>
<module>seatunnel-api</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,28 @@

import java.io.Serializable;

public interface SeaTunnelTransform<T> extends Serializable, PluginIdentifierInterface,
SeaTunnelPluginLifeCycle, SeaTunnelJobAware {

T map(T row);

void setTypeInfo(SeaTunnelDataType<T> seaTunnelRowType);

public interface SeaTunnelTransform<T> extends Serializable,
PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelJobAware {

/**
* Set the data type info of input data. This method will be automatically called by translation.
*
* @param dataType The data type info of input.
*/
void setTypeInfo(SeaTunnelDataType<T> dataType);

/**
* Get the data type of the records produced by this transform.
*
* @return SeaTunnel data type.
*/
SeaTunnelDataType<T> getProducedType();

/**
* Transform input data to {@link this#getProducedType()} type data.
*
* @param row the data need be transform.
* @return transformed data.
*/
T map(T row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -43,6 +44,8 @@ private Common() {
*/
public static final int COLLECTION_SIZE = 16;

private static final int LIB_DIR_DEPTH = 1;

private static final int PLUGIN_LIB_DIR_DEPTH = 3;

private static DeployMode MODE;
Expand Down Expand Up @@ -87,8 +90,8 @@ public static Path appRootDir() {
}
}

public static Path appLibDir() {
return appRootDir().resolve("lib");
public static Path appStarterDir() {
return appRootDir().resolve("starter");
}

/**
Expand Down Expand Up @@ -129,6 +132,30 @@ public static Path connectorDir() {
}
}

/**
* lib Dir
*/
public static Path libDir() {
String seatunnelHome = System.getProperty("SEATUNNEL_HOME");
if (StringUtils.isBlank(seatunnelHome)) {
seatunnelHome = appRootDir().toString();
}
return Paths.get(seatunnelHome, "lib");
}

/**
* return lib jars, which located in 'lib/*'.
*/
public static List<Path> getLibJars() {
Path libRootDir = Common.libDir();
if (!Files.exists(libRootDir) || !Files.isDirectory(libRootDir)) {
return Collections.emptyList();
}
return Arrays.stream(libRootDir.toFile().listFiles((dir, name) -> name.endsWith(".jar")))
.map(file -> file.toPath())
.collect(Collectors.toList());
}

public static Path pluginTarball() {
return appRootDir().resolve("plugins.tar.gz");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class CommonTest {

@Test
public void appLibDir() {
assertEquals(Common.appRootDir().toString() + File.separator + "lib", Common.appLibDir().toString());
assertEquals(Common.appRootDir().toString() + File.separator + "starter", Common.appStarterDir().toString());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-core-flink-sql.jar
APP_JAR=${APP_DIR}/starter/seatunnel-core-flink-sql.jar
APP_MAIN="org.apache.seatunnel.core.sql.FlinkSqlStarter"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class FlinkSqlStarter implements Starter {
// set the deployment mode, used to get the job jar path.
Common.setStarter(true);
Common.setDeployMode(flinkCommandArgs.getDeployMode());
this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-core-flink.jar
APP_JAR=${APP_DIR}/starter/seatunnel-core-flink.jar
ENV_PARAMETERS_MAIN="org.apache.seatunnel.core.flink.FlinkEnvParameterParser"
APP_MAIN="org.apache.seatunnel.core.flink.FlinkStarter"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ENV FLINK_HOME=/flink

WORKDIR /seatunnel

COPY target/seatunnel-core-flink.jar /seatunnel/lib/
COPY target/seatunnel-core-flink.jar /seatunnel/starter/
COPY src/main/bin /seatunnel/bin/

ENTRYPOINT [ "/seatunnel/bin/start-seatunnel-flink.sh" ]
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class FlinkStarter implements Starter {
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode());
Common.setStarter(true);
this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
}

@SuppressWarnings("checkstyle:RegexpSingleline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-core-spark.jar
APP_JAR=${APP_DIR}/starter/seatunnel-core-spark.jar
APP_MAIN="org.apache.seatunnel.core.spark.SparkStarter"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ENV SPARK_HOME=/spark

WORKDIR /seatunnel

COPY target/seatunnel-core-spark.jar /seatunnel/lib/
COPY target/seatunnel-core-spark.jar /seatunnel/starter/
COPY src/main/bin /seatunnel/bin/

ENTRYPOINT [ "/seatunnel/bin/start-seatunnel-spark.sh" ]
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ protected void appendArgs(List<String> commands, String[] args) {
* append appJar to StringBuilder
*/
protected void appendAppJar(List<String> commands) {
commands.add(Common.appLibDir().resolve("seatunnel-core-spark.jar").toString());
commands.add(Common.appStarterDir().resolve("seatunnel-core-spark.jar").toString());
}

@SuppressWarnings("checkstyle:Indentation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-flink-starter.jar
APP_JAR=${APP_DIR}/starter/seatunnel-flink-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.flink.FlinkStarter"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ENV FLINK_HOME=/flink

WORKDIR /seatunnel

COPY target/seatunnel-core-flink.jar /seatunnel/lib/
COPY target/seatunnel-core-flink.jar /seatunnel/starter/
COPY src/main/bin /seatunnel/bin/

ENTRYPOINT [ "/seatunnel/bin/start-seatunnel-flink-connector-v2.sh" ]
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class FlinkStarter implements Starter {
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode());
Common.setStarter(true);
this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
}

@SuppressWarnings("checkstyle:RegexpSingleline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ private void registerPlugin() {

pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));

List<URL> libJarDependencies = Common.getLibJars().stream()
.map(Path::toUri)
.map(uri -> {
try {
return uri.toURL();
} catch (MalformedURLException e) {
throw new RuntimeException("the uri of jar illegal:" + uri, e);
}
})
.collect(Collectors.toList());

libJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));

flinkEnvironment.registerPlugin(pluginsJarDependencies);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-spark-starter.jar
APP_JAR=${APP_DIR}/starter/seatunnel-spark-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.spark.SparkStarter"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ENV SPARK_HOME=/spark

WORKDIR /seatunnel

COPY target/seatunnel-core-spark.jar /seatunnel/lib/
COPY target/seatunnel-core-spark.jar /seatunnel/starter/
COPY src/main/bin /seatunnel/bin/

ENTRYPOINT [ "/seatunnel/bin/start-seatunnel-spark-connector-v2.sh" ]
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public List<String> buildCommands() throws IOException {
Common.setDeployMode(commandArgs.getDeployMode());
Common.setStarter(true);
this.jars.addAll(Common.getPluginsJarDependencies());
this.jars.addAll(Common.getLibJars());
this.jars.addAll(getConnectorJarDependencies());
return buildFinal();
}
Expand Down Expand Up @@ -260,7 +261,7 @@ protected void appendArgs(List<String> commands, String[] args) {
* append appJar to StringBuilder
*/
protected void appendAppJar(List<String> commands) {
commands.add(Common.appLibDir().resolve("seatunnel-spark-starter.jar").toString());
commands.add(Common.appStarterDir().resolve("seatunnel-spark-starter.jar").toString());
}

@SuppressWarnings("checkstyle:Indentation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/lib/seatunnel-starter.jar
APP_JAR=${APP_DIR}/starter/seatunnel-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
Expand All @@ -60,4 +60,9 @@ if [ -z $SEATUNNEL_CONFIG ]; then
SEATUNNEL_CONFIG=${CONF_DIR}/seatunnel.yaml
fi

java -Dseatunnel.config=${SEATUNNEL_CONFIG} -Dhazelcast.config=${HAZELCAST_CONFIG} -cp ${APP_JAR} ${APP_MAIN} ${args}
JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.config=${SEATUNNEL_CONFIG}"
JAVA_OPTS="${JAVA_OPTS} -Dhazelcast.config=${HAZELCAST_CONFIG}"

CLASS_PATH=${APP_DIR}/lib/*:${APP_JAR}

java ${JAVA_OPTS} -cp ${CLASS_PATH} ${APP_MAIN} ${args}
Loading

0 comments on commit 0196c32

Please sign in to comment.