From cb0d4bcb13249b6ea1004152a0140356770c9b9e Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Wed, 28 Dec 2022 21:28:15 +0800
Subject: [PATCH 01/25] [Improve][Connector-V2][MongoDB] Refactor MongoDB
connector e2e test cases
---
.../connector-mongodb-e2e/pom.xml | 39 +++
.../e2e/flink/v2/mongodb/MongodbIT.java | 316 ++++++++++++++++++
.../mongodb/mongodb_source_and_sink.conf | 66 ++++
.../mongodb_source_matchQuery_and_sink.conf | 66 ++++
4 files changed, 487 insertions(+)
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
new file mode 100644
index 00000000000..168c4fcf621
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
@@ -0,0 +1,39 @@
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-connector-v2-e2e
+ ${revision}
+
+ 4.0.0
+
+ connector-mongodb-e2e
+
+
+
+
+
+ org.apache.seatunnel
+ connector-mongodb
+ ${project.version}
+ test
+
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
new file mode 100644
index 00000000000..294583bf418
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.mongodb;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Sorts;
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.bson.Document;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+@DisabledOnContainer(value = {}, type = {EngineType.FLINK}, disabledReason = "")
+public class MongodbIT extends TestSuiteBase implements TestResource {
+
+ private static final String MONGODB_IMAGE = "mongo:latest";
+ private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb";
+ private static final int MONGODB_PORT = 27017;
+ private static final String MONGODB_DATABASE = "test_db";
+ private static final String MONGODB_SOURCE_TABLE = "source_table";
+ private static final String MONGODB_SINK_TABLE = "sink_table";
+
+ private static final List TEST_DATASET = generateTestDataSet();
+
+ private static final List RESULT_DATASET = generateMatchQueryResultDataSet();
+
+ private GenericContainer> mongodbContainer;
+ private MongoClient client;
+
+ @TestTemplate
+ public void testMongodb(TestContainer container) throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob("/mongodb/mongodb_source_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertIterableEquals(
+ TEST_DATASET.stream()
+ .map(e -> {
+ e.remove("_id");
+ return e;
+ })
+ .collect(Collectors.toList()),
+ readSinkData().stream()
+ .map(e -> {
+ e.remove("_id");
+ return e;
+ })
+ .collect(Collectors.toList()));
+ }
+
+ @TestTemplate
+ public void testMongodbMatchQuery(TestContainer container) throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob("/mongodb/mongodb_source_matchQuery_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertIterableEquals(
+ RESULT_DATASET.stream()
+ .map(e -> {
+ e.remove("_id");
+ return e;
+ })
+ .collect(Collectors.toList()),
+ readSinkData().stream()
+ .map(e -> {
+ e.remove("_id");
+ return e;
+ })
+ .collect(Collectors.toList()));
+ }
+
+ public void initConnection() {
+ String host = mongodbContainer.getContainerIpAddress();
+ int port = mongodbContainer.getFirstMappedPort();
+ String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE);
+
+ client = MongoClients.create(url);
+ }
+
+ private void initSourceData() {
+ MongoCollection sourceTable = client
+ .getDatabase(MONGODB_DATABASE)
+ .getCollection(MONGODB_SOURCE_TABLE);
+
+ sourceTable.deleteMany(new Document());
+ sourceTable.insertMany(TEST_DATASET);
+ }
+
+ private List readSinkData() {
+ MongoCollection sinkTable = client
+ .getDatabase(MONGODB_DATABASE)
+ .getCollection(MONGODB_SINK_TABLE);
+ MongoCursor cursor = sinkTable.find()
+ .sort(Sorts.ascending("id"))
+ .cursor();
+ List documents = new ArrayList<>();
+ while (cursor.hasNext()) {
+ documents.add(cursor.next());
+ }
+ return documents;
+ }
+
+ private static List generateTestDataSet() {
+ SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ }
+ );
+ Serializer serializer = new DefaultSerializer(seatunnelRowType);
+
+ List documents = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ LocalDateTime.now()
+ });
+ documents.add(serializer.serialize(row));
+ }
+ return documents;
+ }
+
+ private static List generateMatchQueryResultDataSet() {
+ SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE
+ }
+ );
+ Serializer serializer = new DefaultSerializer(seatunnelRowType);
+
+ List documents = new ArrayList<>();
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(3),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ });
+ documents.add(serializer.serialize(row));
+ return documents;
+ }
+
+ @Override
+ public void startUp() {
+ DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
+ mongodbContainer = new GenericContainer<>(imageName)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MONGODB_CONTAINER_HOST)
+ .withExposedPorts(MONGODB_PORT)
+ .waitingFor(new HttpWaitStrategy()
+ .forPort(MONGODB_PORT)
+ .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
+ .withStartupTimeout(Duration.ofMinutes(2)))
+ .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
+ Startables.deepStart(Stream.of(mongodbContainer)).join();
+ log.info("Mongodb container started");
+
+ Awaitility.given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(this::initConnection);
+ this.initSourceData();
+ generateTestDataSet();
+ }
+
+ @AfterEach
+ @Override
+ public void tearDown() {
+ if (client != null) {
+ client.close();
+ }
+ if (mongodbContainer != null) {
+ mongodbContainer.close();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
new file mode 100644
index 00000000000..d013995d5d4
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ MongoDB {
+ uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+ database = "test_db"
+ collection = "source_table"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ MongoDB {
+ uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+ database = "test_db"
+ collection = "sink_table"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
new file mode 100644
index 00000000000..2a67c1ea62a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ MongoDB {
+ uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+ database = "test_db"
+ collection = "source_matchQuery_table"
+ matchQuery = "{"id":3}"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ MongoDB {
+ uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+ database = "test_db"
+ collection = "sink_matchQuery_table"
+ }
+}
\ No newline at end of file
From ff4e5bfbfef9814f8921a93ac5eb4e6085cd48ea Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Wed, 28 Dec 2022 21:38:26 +0800
Subject: [PATCH 02/25] fix
---
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
1 file changed, 1 insertion(+)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 07f21f36d9d..f306f1afa25 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -43,6 +43,7 @@
connector-elasticsearch-e2e
connector-iotdb-e2e
connector-cdc-mysql-e2e
+ connector-mongodb-e2e
seatunnel-connector-v2-e2e
From 92ad2d5085d6dcc904793e1ec2f89d9c0caab148 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Wed, 28 Dec 2022 21:41:04 +0800
Subject: [PATCH 03/25] fix
---
.../org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java | 1 -
1 file changed, 1 deletion(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
index 294583bf418..59e3e0e4923 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
@@ -47,7 +47,6 @@
import org.bson.Document;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
From 7ca4b730e7f108fbeb7151fc913189f4211c209d Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Wed, 28 Dec 2022 23:00:01 +0800
Subject: [PATCH 04/25] fix
---
.../apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
index 59e3e0e4923..0f62c2d9f20 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
@@ -45,7 +45,7 @@
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.bson.Document;
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
@@ -69,7 +69,7 @@
import java.util.stream.Stream;
@Slf4j
-@DisabledOnContainer(value = {}, type = {EngineType.FLINK}, disabledReason = "")
+@DisabledOnContainer(value = {}, type = {EngineType.SPARK,EngineType.SEATUNNEL}, disabledReason = "")
public class MongodbIT extends TestSuiteBase implements TestResource {
private static final String MONGODB_IMAGE = "mongo:latest";
@@ -300,9 +300,10 @@ public void startUp() {
.untilAsserted(this::initConnection);
this.initSourceData();
generateTestDataSet();
+ generateMatchQueryResultDataSet();
}
- @AfterEach
+ @AfterAll
@Override
public void tearDown() {
if (client != null) {
From 1983314cccd92eb5bb4b7f3461eb8014619a3307 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Wed, 28 Dec 2022 23:05:51 +0800
Subject: [PATCH 05/25] fix
---
.../org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
index 0f62c2d9f20..ac6002f314d 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
@@ -69,7 +69,7 @@
import java.util.stream.Stream;
@Slf4j
-@DisabledOnContainer(value = {}, type = {EngineType.SPARK,EngineType.SEATUNNEL}, disabledReason = "")
+@DisabledOnContainer(value = {}, type = {EngineType.SPARK, EngineType.SEATUNNEL}, disabledReason = "")
public class MongodbIT extends TestSuiteBase implements TestResource {
private static final String MONGODB_IMAGE = "mongo:latest";
From ceb262afe5b47457f59202a9c2d7b0928a8531ce Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Wed, 28 Dec 2022 23:25:02 +0800
Subject: [PATCH 06/25] fix
---
.../org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java | 2 ++
1 file changed, 2 insertions(+)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
index ac6002f314d..8023c39d169 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
@@ -47,6 +47,7 @@
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
@@ -278,6 +279,7 @@ private static List generateMatchQueryResultDataSet() {
return documents;
}
+ @BeforeAll
@Override
public void startUp() {
DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
From 1d504ab42b9202efb3a5a6014253ab1c120648b1 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 29 Dec 2022 10:01:59 +0800
Subject: [PATCH 07/25] fix
---
.../v2/mongodb/MongodbIT.java | 35 ++-
.../mongodb/mongodb_source_and_sink.conf | 13 +-
.../mongodb_source_matchQuery_and_sink.conf | 11 +-
.../connector-mongodb-spark-e2e/pom.xml | 49 ----
.../e2e/spark/v2/mongodb/MongodbIT.java | 229 ------------------
.../resources/mongodb/fake_to_mongodb.conf | 70 ------
.../mongodb/mongodb_source_and_sink.conf | 66 -----
.../seatunnel-spark-connector-v2-e2e/pom.xml | 1 -
8 files changed, 34 insertions(+), 440 deletions(-)
rename seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/{flink => connector}/v2/mongodb/MongodbIT.java (91%)
delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml
delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java
delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf
delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
similarity index 91%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 8023c39d169..03aa7394493 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.e2e.flink.v2.mongodb;
+package org.apache.seatunnel.e2e.connector.v2.mongodb;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
@@ -33,9 +33,7 @@
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
@@ -70,16 +68,19 @@
import java.util.stream.Stream;
@Slf4j
-@DisabledOnContainer(value = {}, type = {EngineType.SPARK, EngineType.SEATUNNEL}, disabledReason = "")
public class MongodbIT extends TestSuiteBase implements TestResource {
private static final String MONGODB_IMAGE = "mongo:latest";
- private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb";
+ private static final String MONGODB_CONTAINER_HOST = "e2e_mongodb";
private static final int MONGODB_PORT = 27017;
private static final String MONGODB_DATABASE = "test_db";
private static final String MONGODB_SOURCE_TABLE = "source_table";
+
+ private static final String MONGODB_SOURCE_MATCHQUERY_TABLE = "source_matchQuery_table";
private static final String MONGODB_SINK_TABLE = "sink_table";
+ private static final String MONGODB_SINK_MATCHQUERY_TABLE = "sink_matchQuery_table";
+
private static final List TEST_DATASET = generateTestDataSet();
private static final List RESULT_DATASET = generateMatchQueryResultDataSet();
@@ -98,7 +99,7 @@ public void testMongodb(TestContainer container) throws IOException, Interrupted
return e;
})
.collect(Collectors.toList()),
- readSinkData().stream()
+ readSinkData(MONGODB_DATABASE, MONGODB_SINK_TABLE).stream()
.map(e -> {
e.remove("_id");
return e;
@@ -117,7 +118,7 @@ public void testMongodbMatchQuery(TestContainer container) throws IOException, I
return e;
})
.collect(Collectors.toList()),
- readSinkData().stream()
+ readSinkData(MONGODB_DATABASE, MONGODB_SINK_MATCHQUERY_TABLE).stream()
.map(e -> {
e.remove("_id");
return e;
@@ -129,23 +130,22 @@ public void initConnection() {
String host = mongodbContainer.getContainerIpAddress();
int port = mongodbContainer.getFirstMappedPort();
String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE);
-
client = MongoClients.create(url);
}
- private void initSourceData() {
+ private void initSourceData(String database, String table, List documents) {
MongoCollection sourceTable = client
- .getDatabase(MONGODB_DATABASE)
- .getCollection(MONGODB_SOURCE_TABLE);
+ .getDatabase(database)
+ .getCollection(table);
sourceTable.deleteMany(new Document());
- sourceTable.insertMany(TEST_DATASET);
+ sourceTable.insertMany(documents);
}
- private List readSinkData() {
+ private List readSinkData(String database, String table) {
MongoCollection sinkTable = client
- .getDatabase(MONGODB_DATABASE)
- .getCollection(MONGODB_SINK_TABLE);
+ .getDatabase(database)
+ .getCollection(table);
MongoCursor cursor = sinkTable.find()
.sort(Sorts.ascending("id"))
.cursor();
@@ -300,9 +300,8 @@ public void startUp() {
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initConnection);
- this.initSourceData();
- generateTestDataSet();
- generateMatchQueryResultDataSet();
+ this.initSourceData(MONGODB_DATABASE, MONGODB_SOURCE_TABLE, TEST_DATASET);
+ this.initSourceData(MONGODB_DATABASE, MONGODB_SOURCE_MATCHQUERY_TABLE, RESULT_DATASET);
}
@AfterAll
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
index d013995d5d4..d613143564f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
@@ -23,13 +23,18 @@ env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
- #execution.checkpoint.interval = 10000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
}
source {
MongoDB {
- uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+ uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
database = "test_db"
collection = "source_table"
schema = {
@@ -59,7 +64,7 @@ transform {
sink {
MongoDB {
- uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+ uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
database = "test_db"
collection = "sink_table"
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
index 2a67c1ea62a..6924ea81b34 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
@@ -23,13 +23,18 @@ env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
- #execution.checkpoint.interval = 10000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
}
source {
MongoDB {
- uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+ uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
database = "test_db"
collection = "source_matchQuery_table"
matchQuery = "{"id":3}"
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml
deleted file mode 100644
index f74682cd2ae..00000000000
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-
-
-
- 4.0.0
-
- org.apache.seatunnel
- seatunnel-spark-connector-v2-e2e
- ${revision}
-
-
- connector-mongodb-spark-e2e
-
-
-
- org.apache.seatunnel
- connector-spark-e2e-base
- ${project.version}
- tests
- test-jar
- test
-
-
-
-
- org.apache.seatunnel
- connector-mongodb
- ${project.version}
- test
-
-
-
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java
deleted file mode 100644
index dcabbd5d3c2..00000000000
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.mongodb;
-
-import static java.net.HttpURLConnection.HTTP_OK;
-import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
-import org.apache.seatunnel.e2e.spark.SparkContainer;
-
-import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
-import com.mongodb.client.model.Sorts;
-import lombok.extern.slf4j.Slf4j;
-import org.awaitility.Awaitility;
-import org.bson.Document;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.time.Duration;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-@Slf4j
-public class MongodbIT extends SparkContainer {
-
- private static final String MONGODB_IMAGE = "mongo:latest";
- private static final String MONGODB_CONTAINER_HOST = "spark_e2e_mongodb";
- private static final int MONGODB_PORT = 27017;
- private static final String MONGODB_DATABASE = "test_db";
- private static final String MONGODB_SOURCE_TABLE = "source_table";
- private static final String MONGODB_SINK_TABLE = "sink_table";
-
- private static final List TEST_DATASET = generateTestDataSet();
-
- private MongoClient client;
- private GenericContainer> mongodbContainer;
-
- @BeforeEach
- public void startMongoContainer() {
- DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
- mongodbContainer = new GenericContainer<>(imageName)
- .withNetwork(NETWORK)
- .withNetworkAliases(MONGODB_CONTAINER_HOST)
- .withExposedPorts(MONGODB_PORT)
- .waitingFor(new HttpWaitStrategy()
- .forPort(MONGODB_PORT)
- .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
- .withStartupTimeout(Duration.ofMinutes(2)))
- .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
- Startables.deepStart(Stream.of(mongodbContainer)).join();
- log.info("Mongodb container started");
-
- Awaitility.given().ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(this::initConnection);
- this.initSourceData();
- }
-
- @Test
- public void testMongodb() throws IOException, InterruptedException {
- Container.ExecResult execResult = executeSeaTunnelSparkJob("/mongodb/mongodb_source_and_sink.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertIterableEquals(
- TEST_DATASET.stream()
- .map(e -> {
- e.remove("_id");
- return e;
- })
- .collect(Collectors.toList()),
- readSinkData().stream()
- .map(e -> {
- e.remove("_id");
- return e;
- })
- .collect(Collectors.toList()));
- }
-
- public void initConnection() {
- String host = mongodbContainer.getContainerIpAddress();
- int port = mongodbContainer.getFirstMappedPort();
- String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE);
-
- client = MongoClients.create(url);
- }
-
- private void initSourceData() {
- MongoCollection sourceTable = client
- .getDatabase(MONGODB_DATABASE)
- .getCollection(MONGODB_SOURCE_TABLE);
-
- sourceTable.deleteMany(new Document());
- sourceTable.insertMany(TEST_DATASET);
- }
-
- private List readSinkData() {
- MongoCollection sinkTable = client
- .getDatabase(MONGODB_DATABASE)
- .getCollection(MONGODB_SINK_TABLE);
- MongoCursor cursor = sinkTable.find()
- .sort(Sorts.ascending("id"))
- .cursor();
- List documents = new ArrayList<>();
- while (cursor.hasNext()) {
- documents.add(cursor.next());
- }
- return documents;
- }
-
- private static List generateTestDataSet() {
- SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
- new String[]{
- "id",
- "c_map",
- "c_array",
- "c_string",
- "c_boolean",
- "c_tinyint",
- "c_smallint",
- "c_int",
- "c_bigint",
- "c_float",
- "c_double",
- "c_decimal",
- "c_bytes",
- "c_date",
- "c_timestamp"
- },
- new SeaTunnelDataType[]{
- BasicType.LONG_TYPE,
- new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
- ArrayType.BYTE_ARRAY_TYPE,
- BasicType.STRING_TYPE,
- BasicType.BOOLEAN_TYPE,
- BasicType.BYTE_TYPE,
- BasicType.SHORT_TYPE,
- BasicType.INT_TYPE,
- BasicType.LONG_TYPE,
- BasicType.FLOAT_TYPE,
- BasicType.DOUBLE_TYPE,
- new DecimalType(2, 1),
- PrimitiveByteArrayType.INSTANCE,
- LocalTimeType.LOCAL_DATE_TYPE,
- LocalTimeType.LOCAL_DATE_TIME_TYPE
- }
- );
- Serializer serializer = new DefaultSerializer(seatunnelRowType);
-
- List documents = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- SeaTunnelRow row = new SeaTunnelRow(
- new Object[]{
- Long.valueOf(i),
- Collections.singletonMap("key", Short.parseShort("1")),
- new Byte[]{Byte.parseByte("1")},
- "string",
- Boolean.FALSE,
- Byte.parseByte("1"),
- Short.parseShort("1"),
- Integer.parseInt("1"),
- Long.parseLong("1"),
- Float.parseFloat("1.1"),
- Double.parseDouble("1.1"),
- BigDecimal.valueOf(11, 1),
- "test".getBytes(),
- LocalDate.now(),
- LocalDateTime.now()
- });
- documents.add(serializer.serialize(row));
- }
- return documents;
- }
-
- @AfterEach
- public void closeMongoContainer() {
- if (client != null) {
- client.close();
- }
- if (mongodbContainer != null) {
- mongodbContainer.close();
- }
- }
-}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf
deleted file mode 100644
index e025b97dfca..00000000000
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
-
-env {
- # You can set spark configuration here
- job.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
- job.mode = "BATCH"
-}
-
-source {
- # This is a example source plugin **only for test and demonstrate the feature source plugin**
- FakeSource {
- result_table_name = "fake"
- schema = {
- fields {
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(30, 8)"
- c_bytes = bytes
- }
- }
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
-}
-
-transform {
- # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/transform/Sql
-}
-
-sink {
- MongoDB {
- uri = "mongodb://spark_e2e_mongodb_sink:27017/test_db?retryWrites=true&writeConcern=majority"
- database = "test_db"
- collection = "test_table"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/MongoDB
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
deleted file mode 100644
index 3411a1fd80e..00000000000
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
+++ /dev/null
@@ -1,66 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
-
-env {
- # You can set flink configuration here
- execution.parallelism = 1
- job.mode = "BATCH"
- #execution.checkpoint.interval = 10000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
- MongoDB {
- uri = "mongodb://spark_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
- database = "test_db"
- collection = "source_table"
- schema = {
- fields {
- id = bigint
- c_map = "map"
- c_array = "array"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
- }
-}
-
-transform {
-}
-
-sink {
- MongoDB {
- uri = "mongodb://spark_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
- database = "test_db"
- collection = "sink_table"
- }
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 1e4d9ef162b..5aecd61775e 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -30,7 +30,6 @@
connector-spark-e2e-base
connector-datahub-spark-e2e
connector-jdbc-spark-e2e
- connector-mongodb-spark-e2e
connector-iceberg-hadoop3-spark-e2e
connector-iceberg-spark-e2e
From ec688c57c66a73815aa9623cd651ab8bc2d09f0d Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 29 Dec 2022 10:14:28 +0800
Subject: [PATCH 08/25] fix
---
.../resources/mongodb/mongodb_source_matchQuery_and_sink.conf | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
index 6924ea81b34..e8cfa64eb1b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
@@ -64,7 +64,7 @@ transform {
sink {
MongoDB {
- uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+ uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
database = "test_db"
collection = "sink_matchQuery_table"
}
From a36bb0cacc5398dbb13e51231bc060c6f16ec176 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 29 Dec 2022 10:51:35 +0800
Subject: [PATCH 09/25] fix
---
.../e2e/connector/v2/mongodb/MongodbIT.java | 80 +++----------------
1 file changed, 10 insertions(+), 70 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 03aa7394493..58bc9a23b09 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -33,7 +33,9 @@
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
@@ -59,7 +61,6 @@
import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDate;
-import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -68,6 +69,7 @@
import java.util.stream.Stream;
@Slf4j
+@DisabledOnContainer(value = {}, type = {EngineType.FLINK, EngineType.SEATUNNEL}, disabledReason = "")
public class MongodbIT extends TestSuiteBase implements TestResource {
private static final String MONGODB_IMAGE = "mongo:latest";
@@ -81,9 +83,9 @@ public class MongodbIT extends TestSuiteBase implements TestResource {
private static final String MONGODB_SINK_MATCHQUERY_TABLE = "sink_matchQuery_table";
- private static final List TEST_DATASET = generateTestDataSet();
+ private static final List TEST_DATASET = generateTestDataSet(0, 10);
- private static final List RESULT_DATASET = generateMatchQueryResultDataSet();
+ private static final List RESULT_DATASET = generateTestDataSet(3, 4);
private GenericContainer> mongodbContainer;
private MongoClient client;
@@ -156,7 +158,7 @@ private List readSinkData(String database, String table) {
return documents;
}
- private static List generateTestDataSet() {
+ private static List generateTestDataSet(int start, int end) {
SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
new String[]{
"id",
@@ -172,8 +174,7 @@ private static List generateTestDataSet() {
"c_double",
"c_decimal",
"c_bytes",
- "c_date",
- "c_timestamp"
+ "c_date"
},
new SeaTunnelDataType[]{
BasicType.LONG_TYPE,
@@ -189,14 +190,13 @@ private static List generateTestDataSet() {
BasicType.DOUBLE_TYPE,
new DecimalType(2, 1),
PrimitiveByteArrayType.INSTANCE,
- LocalTimeType.LOCAL_DATE_TYPE,
- LocalTimeType.LOCAL_DATE_TIME_TYPE
+ LocalTimeType.LOCAL_DATE_TYPE
}
);
Serializer serializer = new DefaultSerializer(seatunnelRowType);
List documents = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
+ for (int i = start; i < end; i++) {
SeaTunnelRow row = new SeaTunnelRow(
new Object[]{
Long.valueOf(i),
@@ -212,73 +212,13 @@ private static List generateTestDataSet() {
Double.parseDouble("1.1"),
BigDecimal.valueOf(11, 1),
"test".getBytes(),
- LocalDate.now(),
- LocalDateTime.now()
+ LocalDate.now()
});
documents.add(serializer.serialize(row));
}
return documents;
}
- private static List generateMatchQueryResultDataSet() {
- SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
- new String[]{
- "id",
- "c_map",
- "c_array",
- "c_string",
- "c_boolean",
- "c_tinyint",
- "c_smallint",
- "c_int",
- "c_bigint",
- "c_float",
- "c_double",
- "c_decimal",
- "c_bytes",
- "c_date"
- },
- new SeaTunnelDataType[]{
- BasicType.LONG_TYPE,
- new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
- ArrayType.BYTE_ARRAY_TYPE,
- BasicType.STRING_TYPE,
- BasicType.BOOLEAN_TYPE,
- BasicType.BYTE_TYPE,
- BasicType.SHORT_TYPE,
- BasicType.INT_TYPE,
- BasicType.LONG_TYPE,
- BasicType.FLOAT_TYPE,
- BasicType.DOUBLE_TYPE,
- new DecimalType(2, 1),
- PrimitiveByteArrayType.INSTANCE,
- LocalTimeType.LOCAL_DATE_TYPE
- }
- );
- Serializer serializer = new DefaultSerializer(seatunnelRowType);
-
- List documents = new ArrayList<>();
- SeaTunnelRow row = new SeaTunnelRow(
- new Object[]{
- Long.valueOf(3),
- Collections.singletonMap("key", Short.parseShort("1")),
- new Byte[]{Byte.parseByte("1")},
- "string",
- Boolean.FALSE,
- Byte.parseByte("1"),
- Short.parseShort("1"),
- Integer.parseInt("1"),
- Long.parseLong("1"),
- Float.parseFloat("1.1"),
- Double.parseDouble("1.1"),
- BigDecimal.valueOf(11, 1),
- "test".getBytes(),
- LocalDate.now(),
- });
- documents.add(serializer.serialize(row));
- return documents;
- }
-
@BeforeAll
@Override
public void startUp() {
From 1dc8cd6757a52ccc89327e10f561997e57c5172b Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 29 Dec 2022 11:20:31 +0800
Subject: [PATCH 10/25] fix
---
.../apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java | 3 ---
.../src/test/resources/mongodb/mongodb_source_and_sink.conf | 1 -
2 files changed, 4 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 58bc9a23b09..a8f8b6b1d23 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -33,9 +33,7 @@
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
@@ -69,7 +67,6 @@
import java.util.stream.Stream;
@Slf4j
-@DisabledOnContainer(value = {}, type = {EngineType.FLINK, EngineType.SEATUNNEL}, disabledReason = "")
public class MongodbIT extends TestSuiteBase implements TestResource {
private static final String MONGODB_IMAGE = "mongo:latest";
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
index d613143564f..cb7731551a1 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
@@ -53,7 +53,6 @@ source {
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
- c_timestamp = timestamp
}
}
}
From 2186c25e49b43fec5378fce1f92ce2b3c69b4746 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 29 Dec 2022 11:38:06 +0800
Subject: [PATCH 11/25] fix
---
.../apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index a8f8b6b1d23..23d2e3e4cda 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -45,7 +45,7 @@
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
@@ -216,7 +216,7 @@ private static List generateTestDataSet(int start, int end) {
return documents;
}
- @BeforeAll
+ @BeforeEach
@Override
public void startUp() {
DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
From 232d5ec65bdd3c89e120de3753dbd87f181526f5 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 29 Dec 2022 12:11:04 +0800
Subject: [PATCH 12/25] fix
---
.../e2e/connector/v2/mongodb/MongodbIT.java | 12 ++--
.../pom.xml | 6 ++
.../main/resources/examples/spark.batch.conf | 69 ++++++-------------
3 files changed, 33 insertions(+), 54 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 23d2e3e4cda..2ba70561b6f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -45,7 +45,7 @@
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
@@ -75,14 +75,13 @@ public class MongodbIT extends TestSuiteBase implements TestResource {
private static final String MONGODB_DATABASE = "test_db";
private static final String MONGODB_SOURCE_TABLE = "source_table";
- private static final String MONGODB_SOURCE_MATCHQUERY_TABLE = "source_matchQuery_table";
private static final String MONGODB_SINK_TABLE = "sink_table";
private static final String MONGODB_SINK_MATCHQUERY_TABLE = "sink_matchQuery_table";
- private static final List TEST_DATASET = generateTestDataSet(0, 10);
+ private static List TEST_DATASET;
- private static final List RESULT_DATASET = generateTestDataSet(3, 4);
+ private static List RESULT_DATASET;
private GenericContainer> mongodbContainer;
private MongoClient client;
@@ -216,7 +215,7 @@ private static List generateTestDataSet(int start, int end) {
return documents;
}
- @BeforeEach
+ @BeforeAll
@Override
public void startUp() {
DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
@@ -238,7 +237,8 @@ public void startUp() {
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initConnection);
this.initSourceData(MONGODB_DATABASE, MONGODB_SOURCE_TABLE, TEST_DATASET);
- this.initSourceData(MONGODB_DATABASE, MONGODB_SOURCE_MATCHQUERY_TABLE, RESULT_DATASET);
+ TEST_DATASET = generateTestDataSet(0, 10);
+ RESULT_DATASET = generateTestDataSet(3, 4);
}
@AfterAll
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
index 23f22f062ac..93dc17a66a6 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
@@ -99,6 +99,12 @@
lz4
1.3.0
+
+
+ org.apache.seatunnel
+ connector-mongodb
+ ${project.version}
+
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 7692598da63..28bc195427a 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -6,7 +6,7 @@
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,14 +16,16 @@
#
######
-###### This config file is a demonstration of batch processing in SeaTunnel config
+###### This config file is a demonstration of streaming processing in seatunnel config
######
env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
# You can set spark configuration here
- # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties
- #job.mode = BATCH
- job.name = "SeaTunnel"
+ spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
@@ -31,14 +33,16 @@ env {
}
source {
- # This is a example input plugin **only for test and demonstrate the feature input plugin**
- FakeSource {
- row.num = 16
- parallelism = 2
+ MongoDB {
+ uri = "mongodb://localhost:62921/test_db?retryWrites=true&writeConcern=majority"
+ database = "test_db"
+ collection = "source_table"
+ matchQuery = "{"id":3}"
schema = {
fields {
- c_map = "map"
- c_array = "array"
+ id = bigint
+ c_map = "map"
+ c_array = "array"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
@@ -47,52 +51,21 @@ source {
c_bigint = bigint
c_float = float
c_double = double
- c_decimal = "decimal(30, 8)"
- c_null = "null"
+ c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
- c_timestamp = timestamp
}
}
- result_table_name = "fake"
}
-
- # You can also use other input plugins, such as hdfs
- # hdfs {
- # result_table_name = "accesslog"
- # path = "hdfs://hadoop-cluster-01/nginx/accesslog"
- # format = "json"
- # }
-
- # If you would like to get more information about how to configure seatunnel and see full list of input plugins,
- # please go to https://seatunnel.apache.org/docs/category/source-v2
}
transform {
- # split data by specific delimiter
-
- # you can also use other transform plugins, such as sql
- sql {
- sql = "select c_map,c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp from fake"
- result_table_name = "sql"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
- # choose stdout output plugin to output data to console
- Console {
- parallelism = 2
+ MongoDB {
+ uri = "mongodb://localhost:62921/test_db?retryWrites=true&writeConcern=majority"
+ database = "test_db"
+ collection = "sink_table"
}
-
- # you can also you other output plugins, such as sql
- # hdfs {
- # path = "hdfs://hadoop-cluster-01/nginx/accesslog_processed"
- # save_mode = "append"
- # }
-
- # If you would like to get more information about how to configure seatunnel and see full list of output plugins,
- # please go to https://seatunnel.apache.org/docs/category/sink-v2
-}
+}
\ No newline at end of file
From 953ab383d5f7f0b090e40025ca03448801e0b690 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 29 Dec 2022 12:39:01 +0800
Subject: [PATCH 13/25] fix
---
.../apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 2ba70561b6f..16c887d634e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -236,9 +236,9 @@ public void startUp() {
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initConnection);
- this.initSourceData(MONGODB_DATABASE, MONGODB_SOURCE_TABLE, TEST_DATASET);
TEST_DATASET = generateTestDataSet(0, 10);
RESULT_DATASET = generateTestDataSet(3, 4);
+ this.initSourceData(MONGODB_DATABASE, MONGODB_SOURCE_TABLE, TEST_DATASET);
}
@AfterAll
From bf120977d14db44a4b3316e4279f5f56a2787ea3 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 29 Dec 2022 13:02:36 +0800
Subject: [PATCH 14/25] fix
---
.../e2e/connector/v2/mongodb/MongodbIT.java | 57 +------------------
1 file changed, 3 insertions(+), 54 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 16c887d634e..d66a3766435 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -38,8 +38,6 @@
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
-import com.mongodb.client.model.Sorts;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.bson.Document;
@@ -63,7 +61,6 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
@Slf4j
@@ -75,13 +72,7 @@ public class MongodbIT extends TestSuiteBase implements TestResource {
private static final String MONGODB_DATABASE = "test_db";
private static final String MONGODB_SOURCE_TABLE = "source_table";
- private static final String MONGODB_SINK_TABLE = "sink_table";
-
- private static final String MONGODB_SINK_MATCHQUERY_TABLE = "sink_matchQuery_table";
-
- private static List TEST_DATASET;
-
- private static List RESULT_DATASET;
+ private static final List TEST_DATASET = generateTestDataSet(0, 10);;
private GenericContainer> mongodbContainer;
private MongoClient client;
@@ -89,39 +80,13 @@ public class MongodbIT extends TestSuiteBase implements TestResource {
@TestTemplate
public void testMongodb(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/mongodb/mongodb_source_and_sink.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertIterableEquals(
- TEST_DATASET.stream()
- .map(e -> {
- e.remove("_id");
- return e;
- })
- .collect(Collectors.toList()),
- readSinkData(MONGODB_DATABASE, MONGODB_SINK_TABLE).stream()
- .map(e -> {
- e.remove("_id");
- return e;
- })
- .collect(Collectors.toList()));
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
@TestTemplate
public void testMongodbMatchQuery(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/mongodb/mongodb_source_matchQuery_and_sink.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertIterableEquals(
- RESULT_DATASET.stream()
- .map(e -> {
- e.remove("_id");
- return e;
- })
- .collect(Collectors.toList()),
- readSinkData(MONGODB_DATABASE, MONGODB_SINK_MATCHQUERY_TABLE).stream()
- .map(e -> {
- e.remove("_id");
- return e;
- })
- .collect(Collectors.toList()));
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
public void initConnection() {
@@ -140,20 +105,6 @@ private void initSourceData(String database, String table, List docume
sourceTable.insertMany(documents);
}
- private List readSinkData(String database, String table) {
- MongoCollection sinkTable = client
- .getDatabase(database)
- .getCollection(table);
- MongoCursor cursor = sinkTable.find()
- .sort(Sorts.ascending("id"))
- .cursor();
- List documents = new ArrayList<>();
- while (cursor.hasNext()) {
- documents.add(cursor.next());
- }
- return documents;
- }
-
private static List generateTestDataSet(int start, int end) {
SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
new String[]{
@@ -236,8 +187,6 @@ public void startUp() {
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initConnection);
- TEST_DATASET = generateTestDataSet(0, 10);
- RESULT_DATASET = generateTestDataSet(3, 4);
this.initSourceData(MONGODB_DATABASE, MONGODB_SOURCE_TABLE, TEST_DATASET);
}
From 3e57c8d489f9e15673acb0839a7081b80016cae2 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 29 Dec 2022 13:05:43 +0800
Subject: [PATCH 15/25] fix
---
.../apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index d66a3766435..8fc9598e80b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -72,7 +72,7 @@ public class MongodbIT extends TestSuiteBase implements TestResource {
private static final String MONGODB_DATABASE = "test_db";
private static final String MONGODB_SOURCE_TABLE = "source_table";
- private static final List TEST_DATASET = generateTestDataSet(0, 10);;
+ private static final List TEST_DATASET = generateTestDataSet(0, 10);
private GenericContainer> mongodbContainer;
private MongoClient client;
From 62bec27d0fc5c2abf925a1104d1699d4580cb63e Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 29 Dec 2022 13:21:34 +0800
Subject: [PATCH 16/25] fix
---
.../mongodb/mongodb_source_and_sink.conf | 2 +-
.../mongodb_source_matchQuery_and_sink.conf | 2 +-
.../pom.xml | 6 --
.../main/resources/examples/spark.batch.conf | 67 +++++++++++++------
4 files changed, 49 insertions(+), 28 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
index cb7731551a1..d3c60494cb6 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
@@ -16,7 +16,7 @@
#
######
-###### This config file is a demonstration of streaming processing in seatunnel config
+###### This config file is a demonstration of batch processing in seatunnel config
######
env {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
index e8cfa64eb1b..06657ac62e1 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
@@ -16,7 +16,7 @@
#
######
-###### This config file is a demonstration of streaming processing in seatunnel config
+###### This config file is a demonstration of batch processing in seatunnel config
######
env {
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
index 93dc17a66a6..23f22f062ac 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
@@ -99,12 +99,6 @@
lz4
1.3.0
-
-
- org.apache.seatunnel
- connector-mongodb
- ${project.version}
-
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 28bc195427a..c7e56f577f5 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -16,16 +16,14 @@
#
######
-###### This config file is a demonstration of streaming processing in seatunnel config
+###### This config file is a demonstration of batch processing in SeaTunnel config
######
env {
- # You can set flink configuration here
- execution.parallelism = 1
- job.mode = "BATCH"
-
# You can set spark configuration here
- spark.app.name = "SeaTunnel"
+ # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties
+ #job.mode = BATCH
+ job.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
@@ -33,16 +31,14 @@ env {
}
source {
- MongoDB {
- uri = "mongodb://localhost:62921/test_db?retryWrites=true&writeConcern=majority"
- database = "test_db"
- collection = "source_table"
- matchQuery = "{"id":3}"
+ # This is a example input plugin **only for test and demonstrate the feature input plugin**
+ FakeSource {
+ row.num = 16
+ parallelism = 2
schema = {
fields {
- id = bigint
- c_map = "map"
- c_array = "array"
+ c_map = "map"
+ c_array = "array"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
@@ -51,21 +47,52 @@ source {
c_bigint = bigint
c_float = float
c_double = double
- c_decimal = "decimal(2, 1)"
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
c_bytes = bytes
c_date = date
+ c_timestamp = timestamp
}
}
+ result_table_name = "fake"
}
+
+ # You can also use other input plugins, such as hdfs
+ # hdfs {
+ # result_table_name = "accesslog"
+ # path = "hdfs://hadoop-cluster-01/nginx/accesslog"
+ # format = "json"
+ # }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of input plugins,
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
}
transform {
+ # split data by specific delimiter
+
+ # you can also use other transform plugins, such as sql
+ sql {
+ sql = "select c_map,c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp from fake"
+ result_table_name = "sql"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
- MongoDB {
- uri = "mongodb://localhost:62921/test_db?retryWrites=true&writeConcern=majority"
- database = "test_db"
- collection = "sink_table"
+ # choose stdout output plugin to output data to console
+ Console {
+ parallelism = 2
}
-}
\ No newline at end of file
+
+ # you can also you other output plugins, such as sql
+ # hdfs {
+ # path = "hdfs://hadoop-cluster-01/nginx/accesslog_processed"
+ # save_mode = "append"
+ # }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of output plugins,
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
From b61cff830aa621aab5efe53ccc0988b87ff70fa7 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 29 Dec 2022 13:24:03 +0800
Subject: [PATCH 17/25] fix
---
.../src/main/resources/examples/spark.batch.conf | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index c7e56f577f5..7692598da63 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -6,7 +6,7 @@
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
From 3c01a19602c1be7f3df67cfb082dd2db618c637c Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 5 Jan 2023 17:23:06 +0800
Subject: [PATCH 18/25] fix
---
.../connector-mongodb-flink-e2e/pom.xml | 47 ---
.../e2e/flink/v2/mongodb/MongodbIT.java | 229 --------------
.../flink/v2/mongodb/MongodbMatchQueryIT.java | 285 ------------------
.../mongodb/mongodb_source_and_sink.conf | 66 ----
.../mongodb_source_matchQuery_and_sink.conf | 66 ----
.../seatunnel-flink-connector-v2-e2e/pom.xml | 1 -
.../seatunnel-spark-connector-v2-e2e/pom.xml | 1 -
7 files changed, 695 deletions(-)
delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml
delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java
delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml
deleted file mode 100644
index ac8499b0911..00000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml
+++ /dev/null
@@ -1,47 +0,0 @@
-
-
-
- 4.0.0
-
- org.apache.seatunnel
- seatunnel-flink-connector-v2-e2e
- ${revision}
-
-
- connector-mongodb-flink-e2e
-
-
-
- org.apache.seatunnel
- connector-flink-e2e-base
- ${project.version}
- tests
- test-jar
- test
-
-
-
-
- org.apache.seatunnel
- connector-mongodb
- ${project.version}
- test
-
-
-
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
deleted file mode 100644
index ecf8b2c2123..00000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.flink.v2.mongodb;
-
-import static java.net.HttpURLConnection.HTTP_OK;
-import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
-
-import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
-import com.mongodb.client.model.Sorts;
-import lombok.extern.slf4j.Slf4j;
-import org.awaitility.Awaitility;
-import org.bson.Document;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.time.Duration;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-@Slf4j
-public class MongodbIT extends FlinkContainer {
-
- private static final String MONGODB_IMAGE = "mongo:latest";
- private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb";
- private static final int MONGODB_PORT = 27017;
- private static final String MONGODB_DATABASE = "test_db";
- private static final String MONGODB_SOURCE_TABLE = "source_table";
- private static final String MONGODB_SINK_TABLE = "sink_table";
-
- private static final List TEST_DATASET = generateTestDataSet();
-
- private GenericContainer> mongodbContainer;
- private MongoClient client;
-
- @BeforeEach
- public void startMongoContainer() {
- DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
- mongodbContainer = new GenericContainer<>(imageName)
- .withNetwork(NETWORK)
- .withNetworkAliases(MONGODB_CONTAINER_HOST)
- .withExposedPorts(MONGODB_PORT)
- .waitingFor(new HttpWaitStrategy()
- .forPort(MONGODB_PORT)
- .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
- .withStartupTimeout(Duration.ofMinutes(2)))
- .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
- Startables.deepStart(Stream.of(mongodbContainer)).join();
- log.info("Mongodb container started");
-
- Awaitility.given().ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(this::initConnection);
- this.initSourceData();
- }
-
- @Test
- public void testMongodb() throws IOException, InterruptedException {
- Container.ExecResult execResult = executeSeaTunnelFlinkJob("/mongodb/mongodb_source_and_sink.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertIterableEquals(
- TEST_DATASET.stream()
- .map(e -> {
- e.remove("_id");
- return e;
- })
- .collect(Collectors.toList()),
- readSinkData().stream()
- .map(e -> {
- e.remove("_id");
- return e;
- })
- .collect(Collectors.toList()));
- }
-
- public void initConnection() {
- String host = mongodbContainer.getContainerIpAddress();
- int port = mongodbContainer.getFirstMappedPort();
- String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE);
-
- client = MongoClients.create(url);
- }
-
- private void initSourceData() {
- MongoCollection sourceTable = client
- .getDatabase(MONGODB_DATABASE)
- .getCollection(MONGODB_SOURCE_TABLE);
-
- sourceTable.deleteMany(new Document());
- sourceTable.insertMany(TEST_DATASET);
- }
-
- private List readSinkData() {
- MongoCollection sinkTable = client
- .getDatabase(MONGODB_DATABASE)
- .getCollection(MONGODB_SINK_TABLE);
- MongoCursor cursor = sinkTable.find()
- .sort(Sorts.ascending("id"))
- .cursor();
- List documents = new ArrayList<>();
- while (cursor.hasNext()) {
- documents.add(cursor.next());
- }
- return documents;
- }
-
- private static List generateTestDataSet() {
- SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
- new String[]{
- "id",
- "c_map",
- "c_array",
- "c_string",
- "c_boolean",
- "c_tinyint",
- "c_smallint",
- "c_int",
- "c_bigint",
- "c_float",
- "c_double",
- "c_decimal",
- "c_bytes",
- "c_date",
- "c_timestamp"
- },
- new SeaTunnelDataType[]{
- BasicType.LONG_TYPE,
- new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
- ArrayType.BYTE_ARRAY_TYPE,
- BasicType.STRING_TYPE,
- BasicType.BOOLEAN_TYPE,
- BasicType.BYTE_TYPE,
- BasicType.SHORT_TYPE,
- BasicType.INT_TYPE,
- BasicType.LONG_TYPE,
- BasicType.FLOAT_TYPE,
- BasicType.DOUBLE_TYPE,
- new DecimalType(2, 1),
- PrimitiveByteArrayType.INSTANCE,
- LocalTimeType.LOCAL_DATE_TYPE,
- LocalTimeType.LOCAL_DATE_TIME_TYPE
- }
- );
- Serializer serializer = new DefaultSerializer(seatunnelRowType);
-
- List documents = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- SeaTunnelRow row = new SeaTunnelRow(
- new Object[]{
- Long.valueOf(i),
- Collections.singletonMap("key", Short.parseShort("1")),
- new Byte[]{Byte.parseByte("1")},
- "string",
- Boolean.FALSE,
- Byte.parseByte("1"),
- Short.parseShort("1"),
- Integer.parseInt("1"),
- Long.parseLong("1"),
- Float.parseFloat("1.1"),
- Double.parseDouble("1.1"),
- BigDecimal.valueOf(11, 1),
- "test".getBytes(),
- LocalDate.now(),
- LocalDateTime.now()
- });
- documents.add(serializer.serialize(row));
- }
- return documents;
- }
-
- @AfterEach
- public void closeMongoContainer() {
- if (client != null) {
- client.close();
- }
- if (mongodbContainer != null) {
- mongodbContainer.close();
- }
- }
-}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java
deleted file mode 100644
index 099efae755e..00000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.flink.v2.mongodb;
-
-import static java.net.HttpURLConnection.HTTP_OK;
-import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
-
-import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
-import com.mongodb.client.model.Sorts;
-import lombok.extern.slf4j.Slf4j;
-import org.awaitility.Awaitility;
-import org.bson.Document;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.time.Duration;
-import java.time.LocalDate;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-@Slf4j
-public class MongodbMatchQueryIT extends FlinkContainer {
-
- private static final String MONGODB_IMAGE = "mongo:latest";
- private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb";
- private static final int MONGODB_PORT = 27017;
- private static final String MONGODB_DATABASE = "test_db";
- private static final String MONGODB_SOURCE_TABLE = "source_matchQuery_table";
- private static final String MONGODB_SINK_TABLE = "sink_matchQuery_table";
-
- private static final List TEST_DATASET = generateTestDataSet();
- private static final List RESULT_DATASET = generateResultDataSet();
-
- private GenericContainer> mongodbContainer;
- private MongoClient client;
-
- @BeforeEach
- public void startMongoContainer() {
- DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
- mongodbContainer = new GenericContainer<>(imageName)
- .withNetwork(NETWORK)
- .withNetworkAliases(MONGODB_CONTAINER_HOST)
- .withExposedPorts(MONGODB_PORT)
- .waitingFor(new HttpWaitStrategy()
- .forPort(MONGODB_PORT)
- .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
- .withStartupTimeout(Duration.ofMinutes(2)))
- .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
- Startables.deepStart(Stream.of(mongodbContainer)).join();
- log.info("Mongodb container started");
-
- Awaitility.given().ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(this::initConnection);
- this.initSourceData();
- }
-
- @Test
- public void testMongodb() throws IOException, InterruptedException {
- Container.ExecResult execResult = executeSeaTunnelFlinkJob("/mongodb/mongodb_source_matchQuery_and_sink.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertIterableEquals(
- RESULT_DATASET.stream()
- .map(e -> {
- e.remove("_id");
- return e;
- })
- .collect(Collectors.toList()),
- readSinkData().stream()
- .map(e -> {
- e.remove("_id");
- return e;
- })
- .collect(Collectors.toList()));
- }
-
- public void initConnection() {
- String host = mongodbContainer.getContainerIpAddress();
- int port = mongodbContainer.getFirstMappedPort();
- String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE);
-
- client = MongoClients.create(url);
- }
-
- private void initSourceData() {
- MongoCollection sourceTable = client
- .getDatabase(MONGODB_DATABASE)
- .getCollection(MONGODB_SOURCE_TABLE);
-
- sourceTable.deleteMany(new Document());
- sourceTable.insertMany(TEST_DATASET);
- }
-
- private List readSinkData() {
- MongoCollection sinkTable = client
- .getDatabase(MONGODB_DATABASE)
- .getCollection(MONGODB_SINK_TABLE);
- MongoCursor cursor = sinkTable.find()
- .sort(Sorts.ascending("id"))
- .cursor();
- List documents = new ArrayList<>();
- while (cursor.hasNext()) {
- documents.add(cursor.next());
- }
- return documents;
- }
-
- private static List generateTestDataSet() {
- SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
- new String[]{
- "id",
- "c_map",
- "c_array",
- "c_string",
- "c_boolean",
- "c_tinyint",
- "c_smallint",
- "c_int",
- "c_bigint",
- "c_float",
- "c_double",
- "c_decimal",
- "c_bytes",
- "c_date"
- },
- new SeaTunnelDataType[]{
- BasicType.LONG_TYPE,
- new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
- ArrayType.BYTE_ARRAY_TYPE,
- BasicType.STRING_TYPE,
- BasicType.BOOLEAN_TYPE,
- BasicType.BYTE_TYPE,
- BasicType.SHORT_TYPE,
- BasicType.INT_TYPE,
- BasicType.LONG_TYPE,
- BasicType.FLOAT_TYPE,
- BasicType.DOUBLE_TYPE,
- new DecimalType(2, 1),
- PrimitiveByteArrayType.INSTANCE,
- LocalTimeType.LOCAL_DATE_TYPE,
- }
- );
- Serializer serializer = new DefaultSerializer(seatunnelRowType);
-
- List documents = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- SeaTunnelRow row = new SeaTunnelRow(
- new Object[]{
- Long.valueOf(i),
- Collections.singletonMap("key", Short.parseShort("1")),
- new Byte[]{Byte.parseByte("1")},
- "string",
- Boolean.FALSE,
- Byte.parseByte("1"),
- Short.parseShort("1"),
- Integer.parseInt("1"),
- Long.parseLong("1"),
- Float.parseFloat("1.1"),
- Double.parseDouble("1.1"),
- BigDecimal.valueOf(11, 1),
- "test".getBytes(),
- LocalDate.now(),
- });
- documents.add(serializer.serialize(row));
- }
- return documents;
- }
-
- private static List generateResultDataSet() {
- SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
- new String[]{
- "id",
- "c_map",
- "c_array",
- "c_string",
- "c_boolean",
- "c_tinyint",
- "c_smallint",
- "c_int",
- "c_bigint",
- "c_float",
- "c_double",
- "c_decimal",
- "c_bytes",
- "c_date"
- },
- new SeaTunnelDataType[]{
- BasicType.LONG_TYPE,
- new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
- ArrayType.BYTE_ARRAY_TYPE,
- BasicType.STRING_TYPE,
- BasicType.BOOLEAN_TYPE,
- BasicType.BYTE_TYPE,
- BasicType.SHORT_TYPE,
- BasicType.INT_TYPE,
- BasicType.LONG_TYPE,
- BasicType.FLOAT_TYPE,
- BasicType.DOUBLE_TYPE,
- new DecimalType(2, 1),
- PrimitiveByteArrayType.INSTANCE,
- LocalTimeType.LOCAL_DATE_TYPE
- }
- );
- Serializer serializer = new DefaultSerializer(seatunnelRowType);
-
- List documents = new ArrayList<>();
- SeaTunnelRow row = new SeaTunnelRow(
- new Object[]{
- Long.valueOf(3),
- Collections.singletonMap("key", Short.parseShort("1")),
- new Byte[]{Byte.parseByte("1")},
- "string",
- Boolean.FALSE,
- Byte.parseByte("1"),
- Short.parseShort("1"),
- Integer.parseInt("1"),
- Long.parseLong("1"),
- Float.parseFloat("1.1"),
- Double.parseDouble("1.1"),
- BigDecimal.valueOf(11, 1),
- "test".getBytes(),
- LocalDate.now(),
- });
- documents.add(serializer.serialize(row));
- return documents;
- }
-
- @AfterEach
- public void closeMongoContainer() {
- if (client != null) {
- client.close();
- }
- if (mongodbContainer != null) {
- mongodbContainer.close();
- }
- }
-}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
deleted file mode 100644
index d013995d5d4..00000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
+++ /dev/null
@@ -1,66 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
-
-env {
- # You can set flink configuration here
- execution.parallelism = 1
- job.mode = "BATCH"
- #execution.checkpoint.interval = 10000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
- MongoDB {
- uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
- database = "test_db"
- collection = "source_table"
- schema = {
- fields {
- id = bigint
- c_map = "map"
- c_array = "array"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
- }
-}
-
-transform {
-}
-
-sink {
- MongoDB {
- uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
- database = "test_db"
- collection = "sink_table"
- }
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
deleted file mode 100644
index 2a67c1ea62a..00000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
+++ /dev/null
@@ -1,66 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
-
-env {
- # You can set flink configuration here
- execution.parallelism = 1
- job.mode = "BATCH"
- #execution.checkpoint.interval = 10000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
- MongoDB {
- uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
- database = "test_db"
- collection = "source_matchQuery_table"
- matchQuery = "{"id":3}"
- schema = {
- fields {
- id = bigint
- c_map = "map"
- c_array = "array"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- }
- }
- }
-}
-
-transform {
-}
-
-sink {
- MongoDB {
- uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
- database = "test_db"
- collection = "sink_matchQuery_table"
- }
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 42ae59143f1..bec654173d2 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -30,7 +30,6 @@
connector-flink-e2e-base
connector-jdbc-flink-e2e
connector-datahub-flink-e2e
- connector-mongodb-flink-e2e
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 32e46c8dce9..d7c439a6df6 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -30,7 +30,6 @@
connector-spark-e2e-base
connector-datahub-spark-e2e
connector-jdbc-spark-e2e
- connector-mongodb-spark-e2e
From 20379e8d8f6f0f1350d7a2be826e95d831ad7784 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Thu, 5 Jan 2023 17:23:50 +0800
Subject: [PATCH 19/25] fix
---
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
1 file changed, 1 insertion(+)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 2f11b69ce8f..990b87c16c4 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -45,6 +45,7 @@
connector-cdc-mysql-e2e
connector-iceberg-e2e
connector-iceberg-hadoop3-e2e
+ connector-mongodb-e2e
seatunnel-connector-v2-e2e
From 779ee1adca1ec91ff0385afcc2aa9e62c3b52dfe Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Tue, 10 Jan 2023 15:21:40 +0800
Subject: [PATCH 20/25] fix
---
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml | 1 -
2 files changed, 1 insertion(+), 1 deletion(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 457217735c8..68afc709fe9 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -47,6 +47,7 @@
connector-iceberg-hadoop3-e2e
connector-tdengine-e2e
connector-datahub-e2e
+ connector-mongodb-e2e
seatunnel-connector-v2-e2e
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index c5e966bca43..78ac1bc328e 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -29,7 +29,6 @@
connector-flink-e2e-base
connector-jdbc-flink-e2e
- connector-mongodb-flink-e2e
From 9b872032b0aa955ba2fcf4b7689d65f36171b092 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Mon, 16 Jan 2023 18:04:36 +0800
Subject: [PATCH 21/25] fix
---
.../e2e/connector/v2/mongodb/MongodbIT.java | 6 ++
.../mongodb/mongodb_source_to_assert.conf | 73 +++++++++++++++++++
2 files changed, 79 insertions(+)
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_to_assert.conf
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 8fc9598e80b..b30bb26520f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -77,6 +77,12 @@ public class MongodbIT extends TestSuiteBase implements TestResource {
private GenericContainer> mongodbContainer;
private MongoClient client;
+ @TestTemplate
+ public void testMongodbSourceToAssertSink(TestContainer container) throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob("/mongodb_source_to_assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
@TestTemplate
public void testMongodb(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/mongodb/mongodb_source_and_sink.conf");
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_to_assert.conf
new file mode 100644
index 00000000000..ad5c1a771ff
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_to_assert.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ MongoDB {
+ uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+ database = "test_db"
+ collection = "source_table"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ]
+ }
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert
+}
From 86504633ca915c4dd0ee983a99ce4ccbb0b82675 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Mon, 16 Jan 2023 18:26:47 +0800
Subject: [PATCH 22/25] fix
---
.../src/test/resources/{mongodb => }/mongodb_source_and_sink.conf | 0
.../{mongodb => }/mongodb_source_matchQuery_and_sink.conf | 0
.../test/resources/{mongodb => }/mongodb_source_to_assert.conf | 0
3 files changed, 0 insertions(+), 0 deletions(-)
rename seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/{mongodb => }/mongodb_source_and_sink.conf (100%)
rename seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/{mongodb => }/mongodb_source_matchQuery_and_sink.conf (100%)
rename seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/{mongodb => }/mongodb_source_to_assert.conf (100%)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_and_sink.conf
similarity index 100%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_and_sink.conf
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_matchQuery_and_sink.conf
similarity index 100%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_matchQuery_and_sink.conf
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
similarity index 100%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb/mongodb_source_to_assert.conf
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
From f9f29263e26b371ee32ff729c0a87021de3ebed4 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Mon, 16 Jan 2023 18:35:58 +0800
Subject: [PATCH 23/25] fix
---
.../apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index b30bb26520f..2d1fc566230 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -85,13 +85,13 @@ public void testMongodbSourceToAssertSink(TestContainer container) throws IOExce
@TestTemplate
public void testMongodb(TestContainer container) throws IOException, InterruptedException {
- Container.ExecResult execResult = container.executeJob("/mongodb/mongodb_source_and_sink.conf");
+ Container.ExecResult execResult = container.executeJob("/mongodb_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
@TestTemplate
public void testMongodbMatchQuery(TestContainer container) throws IOException, InterruptedException {
- Container.ExecResult execResult = container.executeJob("/mongodb/mongodb_source_matchQuery_and_sink.conf");
+ Container.ExecResult execResult = container.executeJob("/mongodb_source_matchQuery_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
From 466691b8ce95edef863b4c8e92414c1ac1fac194 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Mon, 16 Jan 2023 18:57:24 +0800
Subject: [PATCH 24/25] fix
---
.../apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 2d1fc566230..717be9c089f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -80,7 +80,7 @@ public class MongodbIT extends TestSuiteBase implements TestResource {
@TestTemplate
public void testMongodbSourceToAssertSink(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/mongodb_source_to_assert.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
@TestTemplate
From 9a0431ded2fc8dc8f5170a3940fa82421ea14998 Mon Sep 17 00:00:00 2001
From: shihong90 <2572805166@qq.com>
Date: Mon, 16 Jan 2023 19:28:52 +0800
Subject: [PATCH 25/25] fix
---
.../seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
index 168c4fcf621..7f4f6c6e877 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
@@ -35,5 +35,10 @@
${project.version}
test
+
+ org.apache.seatunnel
+ connector-assert
+ ${project.version}
+