Skip to content

Commit

Permalink
[Improve][Connector-V2][iceberg] Refactor iceberg connector e2e test …
Browse files Browse the repository at this point in the history
…cases (apache#3820)

* [Improve][Connector-V2][ioTdb] Refactor iceberg connector e2e test cases

* [Improve][Connector-V2][ioTdb] Refactor iceberg connector e2e test cases

* [Improve][Connector-V2][ioTdb] Refactor iceberg connector e2e test cases

* [Improve][Connector-V2][iceberg] Refactor iceberg connector e2e test cases

* [Improve][Connector-V2][iceberg] Refactor iceberg connector e2e test cases

Co-authored-by: zhouyao <[email protected]>
  • Loading branch information
2 people authored and lhyundeadsoul committed Jan 3, 2023
1 parent fe2bca9 commit eff8ef4
Show file tree
Hide file tree
Showing 17 changed files with 71 additions and 699 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,29 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
<artifactId>seatunnel-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-iceberg-flink-e2e</artifactId>
<artifactId>connector-iceberg-e2e</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-flink-e2e-base</artifactId>
<artifactId>connector-iceberg</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-iceberg</artifactId>
<artifactId>connector-assert</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-console</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
* limitations under the License.
*/

package org.apache.seatunnel.e2e.flink.v2.icegerg.hadoop3;
package org.apache.seatunnel.e2e.connector.iceberg;

import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;

import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
import org.apache.seatunnel.e2e.flink.FlinkContainer;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -42,9 +46,10 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.utility.MountableFile;

Expand All @@ -61,7 +66,7 @@
import java.util.List;

@Slf4j
public class IcebergSourceIT extends FlinkContainer {
public class IcebergSourceIT extends TestSuiteBase implements TestResource {

private static final TableIdentifier TABLE = TableIdentifier.of(
Namespace.of("database1"), "source");
Expand Down Expand Up @@ -90,22 +95,31 @@ public class IcebergSourceIT extends FlinkContainer {

private static final String CATALOG_NAME = "seatunnel";
private static final IcebergCatalogType CATALOG_TYPE = HADOOP;
private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/flink/";
private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/hadoop/";
private static final String WAREHOUSE = "file://" + CATALOG_DIR;
private static Catalog CATALOG;

@TestContainerExtension
private final ContainerExtendedFactory extendedFactory = container -> {
container.copyFileToContainer(MountableFile.forHostPath(CATALOG_DIR), CATALOG_DIR);
};

@BeforeEach
public void start() {
@Override
public void startUp() throws Exception {
initializeIcebergTable();
batchInsertData();
MountableFile catalogPath = MountableFile.forHostPath(CATALOG_DIR);
jobManager.copyFileToContainer(catalogPath, CATALOG_DIR);
taskManager.copyFileToContainer(catalogPath, CATALOG_DIR);
}

@Test
public void testIcebergSource() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/iceberg/iceberg_source.conf");
@AfterAll
@Override
public void tearDown() throws Exception {

}

@TestTemplate
public void testIcebergSource(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/iceberg/iceberg_source.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ source {
f5 = "float"
f6 = "double"
f7 = "date"
f8 = "time"
f9 = "timestamp"
f10 = "timestamp"
f11 = "string"
Expand All @@ -46,7 +45,7 @@ source {
}
catalog_name = "seatunnel"
catalog_type = "hadoop"
warehouse = "file:///tmp/seatunnel/iceberg/flink/"
warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"
namespace = "database1"
table = "source"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,36 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
<artifactId>seatunnel-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-iceberg-spark-e2e</artifactId>
<artifactId>connector-iceberg-hadoop3-e2e</artifactId>

<properties>
<hadoop-client.version>3.3.4</hadoop-client.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-spark-e2e-base</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-iceberg</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-console</artifactId>
<artifactId>connector-assert</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
<artifactId>connector-console</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
Expand All @@ -72,5 +61,4 @@
</exclusions>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
* limitations under the License.
*/

package org.apache.seatunnel.e2e.flink.v2.iceberg;
package org.apache.seatunnel.e2e.connector.iceberg.hadoop3;

import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;

import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
import org.apache.seatunnel.e2e.flink.FlinkContainer;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -42,9 +46,10 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.utility.MountableFile;

Expand All @@ -61,7 +66,7 @@
import java.util.List;

@Slf4j
public class IcebergSourceIT extends FlinkContainer {
public class IcebergSourceIT extends TestSuiteBase implements TestResource {

private static final TableIdentifier TABLE = TableIdentifier.of(
Namespace.of("database1"), "source");
Expand Down Expand Up @@ -90,22 +95,31 @@ public class IcebergSourceIT extends FlinkContainer {

private static final String CATALOG_NAME = "seatunnel";
private static final IcebergCatalogType CATALOG_TYPE = HADOOP;
private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/flink/";
private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/hadoop3/";
private static final String WAREHOUSE = "file://" + CATALOG_DIR;
private static Catalog CATALOG;

@TestContainerExtension
private final ContainerExtendedFactory extendedFactory = container -> {
container.copyFileToContainer(MountableFile.forHostPath(CATALOG_DIR), CATALOG_DIR);
};

@BeforeEach
public void start() {
@Override
public void startUp() throws Exception {
initializeIcebergTable();
batchInsertData();
MountableFile catalogPath = MountableFile.forHostPath(CATALOG_DIR);
jobManager.copyFileToContainer(catalogPath, CATALOG_DIR);
taskManager.copyFileToContainer(catalogPath, CATALOG_DIR);
}

@Test
public void testIcebergSource() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/iceberg/iceberg_source.conf");
@AfterAll
@Override
public void tearDown() throws Exception {

}

@TestTemplate
public void testIcebergSource(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/iceberg/iceberg_source.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ source {
}
catalog_name = "seatunnel"
catalog_type = "hadoop"
warehouse = "file:///tmp/seatunnel/iceberg/spark/"
warehouse = "file:///tmp/seatunnel/iceberg/hadoop3/"
namespace = "database1"
table = "source"
}
Expand Down
3 changes: 3 additions & 0 deletions seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
<module>connector-elasticsearch-e2e</module>
<module>connector-iotdb-e2e</module>
<module>connector-cdc-mysql-e2e</module>
<module>connector-tdengine-e2e</module>
<module>connector-iceberg-e2e</module>
<module>connector-iceberg-hadoop3-e2e</module>
</modules>

<artifactId>seatunnel-connector-v2-e2e</artifactId>
Expand Down

This file was deleted.

Loading

0 comments on commit eff8ef4

Please sign in to comment.