diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml similarity index 85% rename from seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml index ac8499b0911..7f4f6c6e877 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml @@ -17,31 +17,28 @@ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - 4.0.0 org.apache.seatunnel - seatunnel-flink-connector-v2-e2e + seatunnel-connector-v2-e2e ${revision} + 4.0.0 - connector-mongodb-flink-e2e + connector-mongodb-e2e + + org.apache.seatunnel - connector-flink-e2e-base + connector-mongodb ${project.version} - tests - test-jar test - - org.apache.seatunnel - connector-mongodb + connector-assert ${project.version} - test diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java similarity index 72% rename from seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java index dcabbd5d3c2..717be9c089f 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.e2e.spark.v2.mongodb; +package org.apache.seatunnel.e2e.connector.v2.mongodb; import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED; @@ -31,20 +31,20 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer; import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer; -import org.apache.seatunnel.e2e.spark.SparkContainer; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoCursor; -import com.mongodb.client.model.Sorts; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; import org.bson.Document; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -57,103 +57,61 @@ import java.math.BigDecimal; import java.time.Duration; import java.time.LocalDate; -import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j -public class MongodbIT extends SparkContainer { +public class MongodbIT extends TestSuiteBase implements TestResource { private static final String MONGODB_IMAGE = "mongo:latest"; - private static final String MONGODB_CONTAINER_HOST = "spark_e2e_mongodb"; + private static final String MONGODB_CONTAINER_HOST = "e2e_mongodb"; private static final int MONGODB_PORT = 27017; private static final String MONGODB_DATABASE = "test_db"; private static final String MONGODB_SOURCE_TABLE = "source_table"; - private static final String MONGODB_SINK_TABLE = "sink_table"; - private static final List TEST_DATASET = generateTestDataSet(); + private static final List TEST_DATASET = generateTestDataSet(0, 10); - private MongoClient client; private GenericContainer mongodbContainer; + private MongoClient client; - @BeforeEach - public void startMongoContainer() { - DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE); - mongodbContainer = new GenericContainer<>(imageName) - .withNetwork(NETWORK) - .withNetworkAliases(MONGODB_CONTAINER_HOST) - .withExposedPorts(MONGODB_PORT) - .waitingFor(new HttpWaitStrategy() - .forPort(MONGODB_PORT) - .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED) - .withStartupTimeout(Duration.ofMinutes(2))) - .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE))); - Startables.deepStart(Stream.of(mongodbContainer)).join(); - log.info("Mongodb container started"); + @TestTemplate + public void testMongodbSourceToAssertSink(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/mongodb_source_to_assert.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } - Awaitility.given().ignoreExceptions() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(this::initConnection); - this.initSourceData(); + @TestTemplate + public void testMongodb(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/mongodb_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } - @Test - public void testMongodb() throws IOException, InterruptedException { - Container.ExecResult execResult = executeSeaTunnelSparkJob("/mongodb/mongodb_source_and_sink.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - Assertions.assertIterableEquals( - TEST_DATASET.stream() - .map(e -> { - e.remove("_id"); - return e; - }) - .collect(Collectors.toList()), - readSinkData().stream() - .map(e -> { - e.remove("_id"); - return e; - }) - .collect(Collectors.toList())); + @TestTemplate + public void testMongodbMatchQuery(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/mongodb_source_matchQuery_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } public void initConnection() { String host = mongodbContainer.getContainerIpAddress(); int port = mongodbContainer.getFirstMappedPort(); String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE); - client = MongoClients.create(url); } - private void initSourceData() { + private void initSourceData(String database, String table, List documents) { MongoCollection sourceTable = client - .getDatabase(MONGODB_DATABASE) - .getCollection(MONGODB_SOURCE_TABLE); + .getDatabase(database) + .getCollection(table); sourceTable.deleteMany(new Document()); - sourceTable.insertMany(TEST_DATASET); - } - - private List readSinkData() { - MongoCollection sinkTable = client - .getDatabase(MONGODB_DATABASE) - .getCollection(MONGODB_SINK_TABLE); - MongoCursor cursor = sinkTable.find() - .sort(Sorts.ascending("id")) - .cursor(); - List documents = new ArrayList<>(); - while (cursor.hasNext()) { - documents.add(cursor.next()); - } - return documents; + sourceTable.insertMany(documents); } - private static List generateTestDataSet() { + private static List generateTestDataSet(int start, int end) { SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( new String[]{ "id", @@ -169,8 +127,7 @@ private static List generateTestDataSet() { "c_double", "c_decimal", "c_bytes", - "c_date", - "c_timestamp" + "c_date" }, new SeaTunnelDataType[]{ BasicType.LONG_TYPE, @@ -186,14 +143,13 @@ private static List generateTestDataSet() { BasicType.DOUBLE_TYPE, new DecimalType(2, 1), PrimitiveByteArrayType.INSTANCE, - LocalTimeType.LOCAL_DATE_TYPE, - LocalTimeType.LOCAL_DATE_TIME_TYPE + LocalTimeType.LOCAL_DATE_TYPE } ); Serializer serializer = new DefaultSerializer(seatunnelRowType); List documents = new ArrayList<>(); - for (int i = 0; i < 100; i++) { + for (int i = start; i < end; i++) { SeaTunnelRow row = new SeaTunnelRow( new Object[]{ Long.valueOf(i), @@ -209,16 +165,40 @@ private static List generateTestDataSet() { Double.parseDouble("1.1"), BigDecimal.valueOf(11, 1), "test".getBytes(), - LocalDate.now(), - LocalDateTime.now() + LocalDate.now() }); documents.add(serializer.serialize(row)); } return documents; } - @AfterEach - public void closeMongoContainer() { + @BeforeAll + @Override + public void startUp() { + DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE); + mongodbContainer = new GenericContainer<>(imageName) + .withNetwork(NETWORK) + .withNetworkAliases(MONGODB_CONTAINER_HOST) + .withExposedPorts(MONGODB_PORT) + .waitingFor(new HttpWaitStrategy() + .forPort(MONGODB_PORT) + .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED) + .withStartupTimeout(Duration.ofMinutes(2))) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE))); + Startables.deepStart(Stream.of(mongodbContainer)).join(); + log.info("Mongodb container started"); + + Awaitility.given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initConnection); + this.initSourceData(MONGODB_DATABASE, MONGODB_SOURCE_TABLE, TEST_DATASET); + } + + @AfterAll + @Override + public void tearDown() { if (client != null) { client.close(); } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_and_sink.conf similarity index 77% rename from seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_and_sink.conf index 3411a1fd80e..d3c60494cb6 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_and_sink.conf @@ -16,20 +16,25 @@ # ###### -###### This config file is a demonstration of streaming processing in seatunnel config +###### This config file is a demonstration of batch processing in seatunnel config ###### env { # You can set flink configuration here execution.parallelism = 1 job.mode = "BATCH" - #execution.checkpoint.interval = 10000 - #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local } source { MongoDB { - uri = "mongodb://spark_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority" + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority" database = "test_db" collection = "source_table" schema = { @@ -48,7 +53,6 @@ source { c_decimal = "decimal(2, 1)" c_bytes = bytes c_date = date - c_timestamp = timestamp } } } @@ -59,7 +63,7 @@ transform { sink { MongoDB { - uri = "mongodb://spark_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority" + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority" database = "test_db" collection = "sink_table" } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_matchQuery_and_sink.conf similarity index 77% rename from seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_matchQuery_and_sink.conf index 2a67c1ea62a..06657ac62e1 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_matchQuery_and_sink.conf @@ -16,20 +16,25 @@ # ###### -###### This config file is a demonstration of streaming processing in seatunnel config +###### This config file is a demonstration of batch processing in seatunnel config ###### env { # You can set flink configuration here execution.parallelism = 1 job.mode = "BATCH" - #execution.checkpoint.interval = 10000 - #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local } source { MongoDB { - uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority" + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority" database = "test_db" collection = "source_matchQuery_table" matchQuery = "{"id":3}" @@ -59,7 +64,7 @@ transform { sink { MongoDB { - uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority" + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority" database = "test_db" collection = "sink_matchQuery_table" } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf similarity index 75% rename from seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf index d013995d5d4..ad5c1a771ff 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - ###### ###### This config file is a demonstration of streaming processing in seatunnel config ###### @@ -29,7 +28,7 @@ env { source { MongoDB { - uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority" + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority" database = "test_db" collection = "source_table" schema = { @@ -48,19 +47,27 @@ source { c_decimal = "decimal(2, 1)" c_bytes = bytes c_date = date - c_timestamp = timestamp } } } } -transform { -} - sink { - MongoDB { - uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority" - database = "test_db" - collection = "sink_table" + Assert { + rules = + { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 10 + }, + { + rule_type = MIN_ROW + rule_value = 10 + } + ] + } } -} \ No newline at end of file + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 457217735c8..68afc709fe9 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -47,6 +47,7 @@ connector-iceberg-hadoop3-e2e connector-tdengine-e2e connector-datahub-e2e + connector-mongodb-e2e seatunnel-connector-v2-e2e diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java deleted file mode 100644 index ecf8b2c2123..00000000000 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.flink.v2.mongodb; - -import static java.net.HttpURLConnection.HTTP_OK; -import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED; - -import org.apache.seatunnel.api.table.type.ArrayType; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.DecimalType; -import org.apache.seatunnel.api.table.type.LocalTimeType; -import org.apache.seatunnel.api.table.type.MapType; -import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer; -import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer; -import org.apache.seatunnel.e2e.flink.FlinkContainer; - -import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoClients; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoCursor; -import com.mongodb.client.model.Sorts; -import lombok.extern.slf4j.Slf4j; -import org.awaitility.Awaitility; -import org.bson.Document; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.DockerLoggerFactory; - -import java.io.IOException; -import java.math.BigDecimal; -import java.time.Duration; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -@Slf4j -public class MongodbIT extends FlinkContainer { - - private static final String MONGODB_IMAGE = "mongo:latest"; - private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb"; - private static final int MONGODB_PORT = 27017; - private static final String MONGODB_DATABASE = "test_db"; - private static final String MONGODB_SOURCE_TABLE = "source_table"; - private static final String MONGODB_SINK_TABLE = "sink_table"; - - private static final List TEST_DATASET = generateTestDataSet(); - - private GenericContainer mongodbContainer; - private MongoClient client; - - @BeforeEach - public void startMongoContainer() { - DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE); - mongodbContainer = new GenericContainer<>(imageName) - .withNetwork(NETWORK) - .withNetworkAliases(MONGODB_CONTAINER_HOST) - .withExposedPorts(MONGODB_PORT) - .waitingFor(new HttpWaitStrategy() - .forPort(MONGODB_PORT) - .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED) - .withStartupTimeout(Duration.ofMinutes(2))) - .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE))); - Startables.deepStart(Stream.of(mongodbContainer)).join(); - log.info("Mongodb container started"); - - Awaitility.given().ignoreExceptions() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(this::initConnection); - this.initSourceData(); - } - - @Test - public void testMongodb() throws IOException, InterruptedException { - Container.ExecResult execResult = executeSeaTunnelFlinkJob("/mongodb/mongodb_source_and_sink.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - Assertions.assertIterableEquals( - TEST_DATASET.stream() - .map(e -> { - e.remove("_id"); - return e; - }) - .collect(Collectors.toList()), - readSinkData().stream() - .map(e -> { - e.remove("_id"); - return e; - }) - .collect(Collectors.toList())); - } - - public void initConnection() { - String host = mongodbContainer.getContainerIpAddress(); - int port = mongodbContainer.getFirstMappedPort(); - String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE); - - client = MongoClients.create(url); - } - - private void initSourceData() { - MongoCollection sourceTable = client - .getDatabase(MONGODB_DATABASE) - .getCollection(MONGODB_SOURCE_TABLE); - - sourceTable.deleteMany(new Document()); - sourceTable.insertMany(TEST_DATASET); - } - - private List readSinkData() { - MongoCollection sinkTable = client - .getDatabase(MONGODB_DATABASE) - .getCollection(MONGODB_SINK_TABLE); - MongoCursor cursor = sinkTable.find() - .sort(Sorts.ascending("id")) - .cursor(); - List documents = new ArrayList<>(); - while (cursor.hasNext()) { - documents.add(cursor.next()); - } - return documents; - } - - private static List generateTestDataSet() { - SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( - new String[]{ - "id", - "c_map", - "c_array", - "c_string", - "c_boolean", - "c_tinyint", - "c_smallint", - "c_int", - "c_bigint", - "c_float", - "c_double", - "c_decimal", - "c_bytes", - "c_date", - "c_timestamp" - }, - new SeaTunnelDataType[]{ - BasicType.LONG_TYPE, - new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), - ArrayType.BYTE_ARRAY_TYPE, - BasicType.STRING_TYPE, - BasicType.BOOLEAN_TYPE, - BasicType.BYTE_TYPE, - BasicType.SHORT_TYPE, - BasicType.INT_TYPE, - BasicType.LONG_TYPE, - BasicType.FLOAT_TYPE, - BasicType.DOUBLE_TYPE, - new DecimalType(2, 1), - PrimitiveByteArrayType.INSTANCE, - LocalTimeType.LOCAL_DATE_TYPE, - LocalTimeType.LOCAL_DATE_TIME_TYPE - } - ); - Serializer serializer = new DefaultSerializer(seatunnelRowType); - - List documents = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - SeaTunnelRow row = new SeaTunnelRow( - new Object[]{ - Long.valueOf(i), - Collections.singletonMap("key", Short.parseShort("1")), - new Byte[]{Byte.parseByte("1")}, - "string", - Boolean.FALSE, - Byte.parseByte("1"), - Short.parseShort("1"), - Integer.parseInt("1"), - Long.parseLong("1"), - Float.parseFloat("1.1"), - Double.parseDouble("1.1"), - BigDecimal.valueOf(11, 1), - "test".getBytes(), - LocalDate.now(), - LocalDateTime.now() - }); - documents.add(serializer.serialize(row)); - } - return documents; - } - - @AfterEach - public void closeMongoContainer() { - if (client != null) { - client.close(); - } - if (mongodbContainer != null) { - mongodbContainer.close(); - } - } -} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java deleted file mode 100644 index 099efae755e..00000000000 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.flink.v2.mongodb; - -import static java.net.HttpURLConnection.HTTP_OK; -import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED; - -import org.apache.seatunnel.api.table.type.ArrayType; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.DecimalType; -import org.apache.seatunnel.api.table.type.LocalTimeType; -import org.apache.seatunnel.api.table.type.MapType; -import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer; -import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer; -import org.apache.seatunnel.e2e.flink.FlinkContainer; - -import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoClients; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoCursor; -import com.mongodb.client.model.Sorts; -import lombok.extern.slf4j.Slf4j; -import org.awaitility.Awaitility; -import org.bson.Document; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.DockerLoggerFactory; - -import java.io.IOException; -import java.math.BigDecimal; -import java.time.Duration; -import java.time.LocalDate; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -@Slf4j -public class MongodbMatchQueryIT extends FlinkContainer { - - private static final String MONGODB_IMAGE = "mongo:latest"; - private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb"; - private static final int MONGODB_PORT = 27017; - private static final String MONGODB_DATABASE = "test_db"; - private static final String MONGODB_SOURCE_TABLE = "source_matchQuery_table"; - private static final String MONGODB_SINK_TABLE = "sink_matchQuery_table"; - - private static final List TEST_DATASET = generateTestDataSet(); - private static final List RESULT_DATASET = generateResultDataSet(); - - private GenericContainer mongodbContainer; - private MongoClient client; - - @BeforeEach - public void startMongoContainer() { - DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE); - mongodbContainer = new GenericContainer<>(imageName) - .withNetwork(NETWORK) - .withNetworkAliases(MONGODB_CONTAINER_HOST) - .withExposedPorts(MONGODB_PORT) - .waitingFor(new HttpWaitStrategy() - .forPort(MONGODB_PORT) - .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED) - .withStartupTimeout(Duration.ofMinutes(2))) - .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE))); - Startables.deepStart(Stream.of(mongodbContainer)).join(); - log.info("Mongodb container started"); - - Awaitility.given().ignoreExceptions() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(this::initConnection); - this.initSourceData(); - } - - @Test - public void testMongodb() throws IOException, InterruptedException { - Container.ExecResult execResult = executeSeaTunnelFlinkJob("/mongodb/mongodb_source_matchQuery_and_sink.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - Assertions.assertIterableEquals( - RESULT_DATASET.stream() - .map(e -> { - e.remove("_id"); - return e; - }) - .collect(Collectors.toList()), - readSinkData().stream() - .map(e -> { - e.remove("_id"); - return e; - }) - .collect(Collectors.toList())); - } - - public void initConnection() { - String host = mongodbContainer.getContainerIpAddress(); - int port = mongodbContainer.getFirstMappedPort(); - String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE); - - client = MongoClients.create(url); - } - - private void initSourceData() { - MongoCollection sourceTable = client - .getDatabase(MONGODB_DATABASE) - .getCollection(MONGODB_SOURCE_TABLE); - - sourceTable.deleteMany(new Document()); - sourceTable.insertMany(TEST_DATASET); - } - - private List readSinkData() { - MongoCollection sinkTable = client - .getDatabase(MONGODB_DATABASE) - .getCollection(MONGODB_SINK_TABLE); - MongoCursor cursor = sinkTable.find() - .sort(Sorts.ascending("id")) - .cursor(); - List documents = new ArrayList<>(); - while (cursor.hasNext()) { - documents.add(cursor.next()); - } - return documents; - } - - private static List generateTestDataSet() { - SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( - new String[]{ - "id", - "c_map", - "c_array", - "c_string", - "c_boolean", - "c_tinyint", - "c_smallint", - "c_int", - "c_bigint", - "c_float", - "c_double", - "c_decimal", - "c_bytes", - "c_date" - }, - new SeaTunnelDataType[]{ - BasicType.LONG_TYPE, - new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), - ArrayType.BYTE_ARRAY_TYPE, - BasicType.STRING_TYPE, - BasicType.BOOLEAN_TYPE, - BasicType.BYTE_TYPE, - BasicType.SHORT_TYPE, - BasicType.INT_TYPE, - BasicType.LONG_TYPE, - BasicType.FLOAT_TYPE, - BasicType.DOUBLE_TYPE, - new DecimalType(2, 1), - PrimitiveByteArrayType.INSTANCE, - LocalTimeType.LOCAL_DATE_TYPE, - } - ); - Serializer serializer = new DefaultSerializer(seatunnelRowType); - - List documents = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - SeaTunnelRow row = new SeaTunnelRow( - new Object[]{ - Long.valueOf(i), - Collections.singletonMap("key", Short.parseShort("1")), - new Byte[]{Byte.parseByte("1")}, - "string", - Boolean.FALSE, - Byte.parseByte("1"), - Short.parseShort("1"), - Integer.parseInt("1"), - Long.parseLong("1"), - Float.parseFloat("1.1"), - Double.parseDouble("1.1"), - BigDecimal.valueOf(11, 1), - "test".getBytes(), - LocalDate.now(), - }); - documents.add(serializer.serialize(row)); - } - return documents; - } - - private static List generateResultDataSet() { - SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( - new String[]{ - "id", - "c_map", - "c_array", - "c_string", - "c_boolean", - "c_tinyint", - "c_smallint", - "c_int", - "c_bigint", - "c_float", - "c_double", - "c_decimal", - "c_bytes", - "c_date" - }, - new SeaTunnelDataType[]{ - BasicType.LONG_TYPE, - new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), - ArrayType.BYTE_ARRAY_TYPE, - BasicType.STRING_TYPE, - BasicType.BOOLEAN_TYPE, - BasicType.BYTE_TYPE, - BasicType.SHORT_TYPE, - BasicType.INT_TYPE, - BasicType.LONG_TYPE, - BasicType.FLOAT_TYPE, - BasicType.DOUBLE_TYPE, - new DecimalType(2, 1), - PrimitiveByteArrayType.INSTANCE, - LocalTimeType.LOCAL_DATE_TYPE - } - ); - Serializer serializer = new DefaultSerializer(seatunnelRowType); - - List documents = new ArrayList<>(); - SeaTunnelRow row = new SeaTunnelRow( - new Object[]{ - Long.valueOf(3), - Collections.singletonMap("key", Short.parseShort("1")), - new Byte[]{Byte.parseByte("1")}, - "string", - Boolean.FALSE, - Byte.parseByte("1"), - Short.parseShort("1"), - Integer.parseInt("1"), - Long.parseLong("1"), - Float.parseFloat("1.1"), - Double.parseDouble("1.1"), - BigDecimal.valueOf(11, 1), - "test".getBytes(), - LocalDate.now(), - }); - documents.add(serializer.serialize(row)); - return documents; - } - - @AfterEach - public void closeMongoContainer() { - if (client != null) { - client.close(); - } - if (mongodbContainer != null) { - mongodbContainer.close(); - } - } -} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml index c5e966bca43..78ac1bc328e 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml @@ -29,7 +29,6 @@ connector-flink-e2e-base connector-jdbc-flink-e2e - connector-mongodb-flink-e2e diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml deleted file mode 100644 index f74682cd2ae..00000000000 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml +++ /dev/null @@ -1,49 +0,0 @@ - - - - 4.0.0 - - org.apache.seatunnel - seatunnel-spark-connector-v2-e2e - ${revision} - - - connector-mongodb-spark-e2e - - - - org.apache.seatunnel - connector-spark-e2e-base - ${project.version} - tests - test-jar - test - - - - - org.apache.seatunnel - connector-mongodb - ${project.version} - test - - - diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf deleted file mode 100644 index e025b97dfca..00000000000 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf +++ /dev/null @@ -1,70 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### - -env { - # You can set spark configuration here - job.name = "SeaTunnel" - spark.executor.instances = 2 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local - job.mode = "BATCH" -} - -source { - # This is a example source plugin **only for test and demonstrate the feature source plugin** - FakeSource { - result_table_name = "fake" - schema = { - fields { - c_string = string - c_boolean = boolean - c_tinyint = tinyint - c_smallint = smallint - c_int = int - c_bigint = bigint - c_float = float - c_double = double - c_decimal = "decimal(30, 8)" - c_bytes = bytes - } - } - } - - # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake -} - -transform { - # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/transform/Sql -} - -sink { - MongoDB { - uri = "mongodb://spark_e2e_mongodb_sink:27017/test_db?retryWrites=true&writeConcern=majority" - database = "test_db" - collection = "test_table" - } - - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/MongoDB -} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml index 65b432d9008..db53fc8acc1 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml @@ -29,7 +29,6 @@ connector-spark-e2e-base connector-jdbc-spark-e2e - connector-mongodb-spark-e2e