Skip to content

Commit

Permalink
[Feature][st-engine] Support for transform row data stream using tran…
Browse files Browse the repository at this point in the history
…sform-v2 API

Release binary package:
- Outpout transform-* jar to `transforms/` dir

Plugin discovery:
- Support for loading transform-* jar from `transform/*` dir

Maven modules:
- Add seatunnel-transform-v2 module
- Add seatunnel-transform-v2-e2e module

ST-Engine:
- Support for Split、Replace、Filter transform

E2E:
- Support copy transform jar into container
  • Loading branch information
hailin0 committed Oct 20, 2022
1 parent f27d68c commit 9d5254c
Show file tree
Hide file tree
Showing 35 changed files with 1,726 additions and 59 deletions.
3 changes: 2 additions & 1 deletion docs/en/contribution/coding-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ This guide documents an overview of the current Apache SeaTunnel modules and bes
| seatunnel-engine | SeaTunnel engine module, seatunnel-engine is a new computational engine developed by the SeaTunnel Community that focuses on data synchronization. |
| seatunnel-formats | SeaTunnel formats module, used to offer the ability of formatting data |
| seatunnel-plugin-discovery | SeaTunnel plugin discovery module, used to offer the ability of loading SPI plugins from classpath |
| seatunnel-transforms | SeaTunnel transform plugin module |
| seatunnel-transforms | SeaTunnel transform V1 module, currently transform V1 is in a stable state, the community will continue to maintain it, but there will be no major feature updates |
| seatunnel-transforms-v2 | SeaTunnel transform V2 module, currently transform V2 is under development and the community will focus on it |
| seatunnel-translation | SeaTunnel translation module, used to adapt Connector V2 and other computing engines such as Spark Flink etc... |

## How to submit a high quality pull request
Expand Down
142 changes: 142 additions & 0 deletions docs/en/contribution/contribute-transform-v2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Contribute Transform-v2 Plugins

There are two parent modules for Spark plugins:

1. [seatunnel-connectors-spark](https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-connectors/seatunnel-connectors-spark)
2. [seatunnel-transforms-spark](https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-transforms/seatunnel-transforms-spark)

Once you want to contribute a new plugin, you need to:

## Create plugin module
Create your plugin module under the corresponding parent plugin module.
For example, if you want to add a new Spark connector plugin, you need to create a new module under the `seatunnel-connectors-spark` module.

```xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connectors-spark</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>seatunnel-connector-spark-hello</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api-spark</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
```
## Add plugin implementation

You need to implement the `Connector` service provider interface. e.g. `BaseSource`/`BaseSink`.

Conveniently, there are some abstract class can help you easy to create your plugin. If you want to create a source connector,
you can implement with `SparkBatchSource`/`SparkStreamingSource`. If you want to create a sink connector, you can implement with `SparkBatchSink`/`SparkStreamingSink`.

The methods defined in `SparkBatchSource` are some lifecycle methods. will be executed by SeaTunnel engine.
The execution order of the lifecycle methods is: `checkConfig` -> `prepare` -> `getData` -> `close`.

```java
import java.util.Date;

@AutoService(BaseSparkSource.class)
public class Hello extends SparkBatchSource {
@Override
public Dataset<Row> getData(SparkEnvironment env) {
// do your logic here to generate data
Dataset<Row> dataset = null;
return dataset;
}

@Override
public CheckResult checkConfig() {
return super.checkConfig();
}

@Override
public void prepare(SparkEnvironment env) {
super.prepare(env);
}

@Override
public void close() throws Exception {
super.close();
}

@Override
public String getPluginName() {
return "hello";
}
}
```

- The `getPluginName` method is used to identify the plugin name.
- The `@AutoService` is used to generate the `META-INF/services/org.apache.seatunnel.BaseSparkSource` file
automatically.

Since this process cannot work on scala, if you use slala to implement your plugin, you need to add a service provider
to the `META-INF/services` file. The file name should be `org.apache.seatunnel.spark.BaseSparkSource`
or `org.apache.seatunnel.spark.BaseSparkSink`, dependents on the plugin type. The content of the file should be the
fully qualified class name of your implementation.

## Add plugin to the distribution

You need to add your plugin to the `seatunnel-connectors-spark-dist` module, then the plugin will in distribution.

```xml
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connector-spark-hello</artifactId>
<version>${project.version}</version>
</dependency>
```

After you using `mvn package` to make a distribution, you can find the plugin in your ${distribution}/connectors/spark.

## Add information to plugin-mapping.properties file

SeaTunnel use `plugin-mapping.properties` file to locate the name of the jar package, the file is under module `seatunnel-connectors`, the key/value rule in
properties is : `engineName.pluginType.pluginName=artifactId`. eg: `spark.source.hello=seatunnel-connector-spark-hello`.
So that SeaTunnel can find plugin jar according to user's config file.

# Contribute Flink Plugins

The steps to contribute a Flink plugin is similar to the steps to contribute a Spark plugin. Different from Spark, you
need to add your plugin in Flink plugin modules.

# Add e2e tests for your plugin

Once you add a new plugin, it is recommended to add e2e tests for it. We have a `seatunnel-e2e` module to help you to do
this.

For example, if you want to add an e2e test for your flink connector, you can create a new test in `seatunnel-flink-e2e`
module. And extend the FlinkContainer class in the test.

```java
public class HellpSourceIT extends FlinkContainer {

@Test
public void testHellpSource() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/hello/hellosource.conf");
Assertions.assertEquals(0, execResult.getExitCode());
// do some other assertion here
}
}

```
Once your class implements the `FlinkContainer` interface, it will auto create a Flink cluster in Docker, and you just need to
execute the `executeSeaTunnelFlinkJob` method with your SeaTunnel configuration file, it will submit the SeaTunnel job.

In most times, you need to start a third-part datasource in your test, for example, if you add a clickhouse connectors, you may need to
start a Clickhouse database in your test. You can use `GenericContainer` to start a container.

It should be noted that your e2e test class should be named ending with `IT`. By default, we will not execute the test if the class name ending with `IT`.
You can add `-DskipIT=false` to execute the e2e test, it will rely on a Docker environment.
8 changes: 7 additions & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,10 @@ seatunnel.sink.MongoDB = connector-mongodb
seatunnel.source.Iceberg = connector-iceberg
seatunnel.source.InfluxDB = connector-influxdb
seatunnel.source.S3File = connector-file-s3
seatunnel.sink.S3File = connector-file-s3
seatunnel.sink.S3File = connector-file-s3

# SeaTunnel new transform API

seatunnel.transform.Split = transform-basic
seatunnel.transform.Replace = transform-basic
seatunnel.transform.Filter = transform-basic
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<module>seatunnel-apis</module>
<module>seatunnel-core</module>
<module>seatunnel-transforms</module>
<module>seatunnel-transforms-v2</module>
<module>seatunnel-connectors</module>
<module>seatunnel-connectors-v2</module>
<module>seatunnel-api</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,28 @@

import java.io.Serializable;

public interface SeaTunnelTransform<T> extends Serializable, PluginIdentifierInterface,
SeaTunnelPluginLifeCycle, SeaTunnelJobAware {

T map(T row);

void setTypeInfo(SeaTunnelDataType<T> seaTunnelRowType);

public interface SeaTunnelTransform<T> extends Serializable,
PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelJobAware {

/**
* Set the data type info of input data. This method will be automatically called by translation.
*
* @param dataType The data type info of input.
*/
void setTypeInfo(SeaTunnelDataType<T> dataType);

/**
* Get the data type of the records produced by this transform.
*
* @return SeaTunnel data type.
*/
SeaTunnelDataType<T> getProducedType();

/**
* Transform input data to {@link this#getProducedType()} type data.
*
* @param row the data need be transform.
* @return transformed data.
*/
T map(T row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ public static Path connectorDir() {
}
}

/**
* Plugin Transform Dir
*/
public static Path transformDir() {
String seatunnelHome = System.getProperty("SEATUNNEL_HOME");
if (StringUtils.isBlank(seatunnelHome)) {
seatunnelHome = appRootDir().toString();
}
return Paths.get(seatunnelHome, "transforms");
}

public static Path pluginTarball() {
return appRootDir().resolve("plugins.tar.gz");
}
Expand Down
7 changes: 7 additions & 0 deletions seatunnel-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- transforms -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>transform-basic</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
Expand Down
16 changes: 16 additions & 0 deletions seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@
<source>../plugin-mapping.properties</source>
<outputDirectory>/connectors</outputDirectory>
</file>
<file>
<source>../plugin-mapping.properties</source>
<outputDirectory>/transforms</outputDirectory>
</file>
</files>
<dependencySets>
<!-- ============ Starter Jars ============ -->
Expand Down Expand Up @@ -189,5 +193,17 @@
<outputDirectory>/connectors/spark</outputDirectory>
<scope>provided</scope>
</dependencySet>

<!-- ============ SeaTunnel Transforms Jars ============ -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
<unpack>false</unpack>
<includes>
<include>org.apache.seatunnel:transform-*:jar</include>
</includes>
<outputDirectory>/transforms</outputDirectory>
<scope>provided</scope>
</dependencySet>
</dependencySets>
</assembly>
16 changes: 16 additions & 0 deletions seatunnel-dist/src/main/assembly/assembly-bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@
<source>../plugin-mapping.properties</source>
<outputDirectory>/connectors</outputDirectory>
</file>
<file>
<source>../plugin-mapping.properties</source>
<outputDirectory>/transforms</outputDirectory>
</file>
</files>

<dependencySets>
Expand All @@ -158,5 +162,17 @@
<outputDirectory>/lib</outputDirectory>
<scope>provided</scope>
</dependencySet>

<!-- ============ SeaTunnel Transforms Jars ============ -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
<unpack>false</unpack>
<includes>
<include>org.apache.seatunnel:transform-*:jar</include>
</includes>
<outputDirectory>/transforms</outputDirectory>
<scope>provided</scope>
</dependencySet>
</dependencySets>
</assembly>
1 change: 1 addition & 0 deletions seatunnel-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<module>seatunnel-spark-connector-v2-e2e</module>
<module>seatunnel-spark-e2e</module>
<module>seatunnel-engine-e2e</module>
<module>seatunnel-transform-v2-e2e</module>
</modules>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.seatunnel.e2e.common.util.ContainerUtil.adaptPathForWin;
import static org.apache.seatunnel.e2e.common.util.ContainerUtil.copyConfigFileToContainer;
import static org.apache.seatunnel.e2e.common.util.ContainerUtil.copyConnectorJarToContainer;
import static org.apache.seatunnel.e2e.common.util.ContainerUtil.copyTransformJarToContainer;

import org.apache.seatunnel.e2e.common.util.ContainerUtil;

Expand Down Expand Up @@ -88,6 +89,9 @@ protected Container.ExecResult executeJob(GenericContainer<?> container, String
getConnectorNamePrefix(),
getConnectorType(),
SEATUNNEL_HOME);
copyTransformJarToContainer(container,
confFile,
SEATUNNEL_HOME);
return executeCommand(container, confInContainerPath);
}

Expand Down
Loading

0 comments on commit 9d5254c

Please sign in to comment.