diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java index f5057a4fd0d8..bca1052b3d79 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -49,8 +50,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Stream; import static org.awaitility.Awaitility.await; @@ -329,11 +328,12 @@ public void testMultiTableWithRestore(TestContainer container) clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1); clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2); + String jobId = JobIdGenerator.newJobId(); CompletableFuture.supplyAsync( () -> { try { return container.executeJob( - "/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf"); + "/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf", jobId); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -365,18 +365,6 @@ public void testMultiTableWithRestore(TestContainer container) .pollInterval(1000, TimeUnit.MILLISECONDS) .until(() -> getConnectionStatus("st_user_sink").size() == 1); - Pattern jobIdPattern = - Pattern.compile( - ".*Init JobMaster for Job mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*", - Pattern.DOTALL); - Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); - String jobId; - if (matcher.matches()) { - jobId = matcher.group(1); - } else { - throw new RuntimeException("Can not find jobId"); - } - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); // Restore job with add a new table diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java index 5529c823966c..d638b611750b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -288,6 +289,7 @@ public void testOpengaussCdcMultiTableE2e(TestContainer container) { disabledReason = "Currently SPARK and FLINK do not support restore") public void testMultiTableWithRestore(TestContainer container) throws IOException, InterruptedException { + String jobId = JobIdGenerator.newJobId(); try { CompletableFuture.supplyAsync( () -> { @@ -319,18 +321,6 @@ public void testMultiTableWithRestore(TestContainer container) OPENGAUSS_SCHEMA, SINK_TABLE_1))))); - Pattern jobIdPattern = - Pattern.compile( - ".*Init JobMaster for Job opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*", - Pattern.DOTALL); - Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); - String jobId; - if (matcher.matches()) { - jobId = matcher.group(1); - } else { - throw new RuntimeException("Can not find jobId"); - } - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); // Restore job with add a new table @@ -397,6 +387,7 @@ public void testMultiTableWithRestore(TestContainer container) disabledReason = "Currently SPARK and FLINK do not support restore") public void testAddFiledWithRestore(TestContainer container) throws IOException, InterruptedException { + String jobId = JobIdGenerator.newJobId(); try { CompletableFuture.supplyAsync( () -> { @@ -425,18 +416,6 @@ public void testAddFiledWithRestore(TestContainer container) OPENGAUSS_SCHEMA, SINK_TABLE_3))))); - Pattern jobIdPattern = - Pattern.compile( - ".*Init JobMaster for Job opengausscdc_to_opengauss_test_add_Filed.conf \\(([0-9]*)\\).*", - Pattern.DOTALL); - Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); - String jobId; - if (matcher.matches()) { - jobId = matcher.group(1); - } else { - throw new RuntimeException("Can not find jobId"); - } - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); // add filed add insert source table data diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java index 03cd2039b034..77e9e4c2e77b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -390,11 +391,12 @@ public void testMultiTableWithRestore(TestContainer container) insertSourceTable(DATABASE, SOURCE_TABLE1); insertSourceTable(DATABASE, SOURCE_TABLE2); + String jobId = JobIdGenerator.newJobId(); CompletableFuture.supplyAsync( () -> { try { return container.executeJob( - "/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf"); + "/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf", jobId); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -432,18 +434,6 @@ public void testMultiTableWithRestore(TestContainer container) getSourceQuerySQL( DATABASE, SINK_TABLE1))))); - Pattern jobIdPattern = - Pattern.compile( - ".*Init JobMaster for Job oraclecdc_to_oracle_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*", - Pattern.DOTALL); - Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); - String jobId; - if (matcher.matches()) { - jobId = matcher.group(1); - } else { - throw new RuntimeException("Can not find jobId"); - } - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); // Restore job with add a new table diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index 5b6d810de7fc..11e0301ef5b6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -274,6 +275,7 @@ public void testPostgresCdcMultiTableE2e(TestContainer container) { disabledReason = "Currently SPARK and FLINK do not support restore") public void testMultiTableWithRestore(TestContainer container) throws IOException, InterruptedException { + String jobId = JobIdGenerator.newJobId(); try { CompletableFuture.supplyAsync( () -> { @@ -305,18 +307,6 @@ public void testMultiTableWithRestore(TestContainer container) POSTGRESQL_SCHEMA, SINK_TABLE_1))))); - Pattern jobIdPattern = - Pattern.compile( - ".*Init JobMaster for Job pgcdc_to_pg_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*", - Pattern.DOTALL); - Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); - String jobId; - if (matcher.matches()) { - jobId = matcher.group(1); - } else { - throw new RuntimeException("Can not find jobId"); - } - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); // Restore job with add a new table @@ -382,6 +372,7 @@ public void testMultiTableWithRestore(TestContainer container) disabledReason = "Currently SPARK and FLINK do not support restore") public void testAddFiledWithRestore(TestContainer container) throws IOException, InterruptedException { + String jobId = JobIdGenerator.newJobId(); try { CompletableFuture.supplyAsync( () -> { @@ -410,18 +401,6 @@ public void testAddFiledWithRestore(TestContainer container) POSTGRESQL_SCHEMA, SINK_TABLE_3))))); - Pattern jobIdPattern = - Pattern.compile( - ".*Init JobMaster for Job postgrescdc_to_postgres_test_add_Filed.conf \\(([0-9]*)\\).*", - Pattern.DOTALL); - Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); - String jobId; - if (matcher.matches()) { - jobId = matcher.group(1); - } else { - throw new RuntimeException("Can not find jobId"); - } - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); // add filed add insert source table data diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java index f13226453265..02b344120772 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -41,8 +42,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static org.awaitility.Awaitility.await; @@ -167,11 +166,11 @@ public void testMultiTableWithRestore(TestContainer container) // Clear related content to ensure that multiple operations are not affected clearTable(TIDB_DATABASE, SOURCE_TABLE); clearTable(TIDB_DATABASE, SINK_TABLE); - + String jobId = JobIdGenerator.newJobId(); CompletableFuture.supplyAsync( () -> { try { - container.executeJob("/tidb/tidbcdc_to_tidb.conf"); + container.executeJob("/tidb/tidbcdc_to_tidb.conf", jobId); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -192,17 +191,6 @@ public void testMultiTableWithRestore(TestContainer container) query(getSinkQuerySQL(TIDB_DATABASE, SINK_TABLE)))); }); - Pattern jobIdPattern = - Pattern.compile( - ".*Init JobMaster for Job tidbcdc_to_tidb.conf \\(([0-9]*)\\).*", - Pattern.DOTALL); - Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); - String jobId; - if (matcher.matches()) { - jobId = matcher.group(1); - } else { - throw new RuntimeException("Can not find jobId"); - } Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); // Restore job diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java index 5ee0a1e7bdcf..35ca8d9413c1 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.TestContainerId; import org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; @@ -96,12 +97,14 @@ public void testZetaBatchCheckpointEnable(TestContainer container) public void testZetaStreamingCheckpointInterval(TestContainer container) throws IOException, InterruptedException, ExecutionException { // start job + String jobId = JobIdGenerator.newJobId(); CompletableFuture startFuture = CompletableFuture.supplyAsync( () -> { try { return container.executeJob( - "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf"); + "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf", + jobId); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -109,24 +112,9 @@ public void testZetaStreamingCheckpointInterval(TestContainer container) }); // wait obtain job id - AtomicReference jobId = new AtomicReference<>(); - await().atMost(60000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> { - Pattern jobIdPattern = - Pattern.compile( - ".*Init JobMaster for Job stream_fakesource_to_localfile_interval.conf \\(([0-9]*)\\).*", - Pattern.DOTALL); - Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); - if (matcher.matches()) { - jobId.set(matcher.group(1)); - } - Assertions.assertNotNull(jobId.get()); - }); - Thread.sleep(15000); Assertions.assertTrue(container.getServerLogs().contains("checkpoint is enabled")); - Assertions.assertEquals(0, container.savepointJob(jobId.get()).getExitCode()); + Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); Assertions.assertEquals(0, startFuture.get().getExitCode()); // restore job CompletableFuture.supplyAsync( @@ -134,7 +122,7 @@ public void testZetaStreamingCheckpointInterval(TestContainer container) try { return container.restoreJob( "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf", - jobId.get()); + jobId); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -164,36 +152,22 @@ public void testZetaStreamingCheckpointInterval(TestContainer container) public void testZetaStreamingCheckpointNoInterval(TestContainer container) throws IOException, InterruptedException { // start job + String jobId = JobIdGenerator.newJobId(); CompletableFuture.supplyAsync( () -> { try { return container.executeJob( - "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf"); + "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf", + jobId); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); } }); - // wait obtain job id - AtomicReference jobId = new AtomicReference<>(); - await().atMost(60000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> { - Pattern jobIdPattern = - Pattern.compile( - ".*Init JobMaster for Job stream_fakesource_to_localfile.conf \\(([0-9]*)\\).*", - Pattern.DOTALL); - Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); - if (matcher.matches()) { - jobId.set(matcher.group(1)); - } - Assertions.assertNotNull(jobId.get()); - }); - Thread.sleep(15000); Assertions.assertTrue(container.getServerLogs().contains("checkpoint is enabled")); - Assertions.assertEquals(0, container.savepointJob(jobId.get()).getExitCode()); + Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); // restore job CompletableFuture.supplyAsync( @@ -202,7 +176,7 @@ public void testZetaStreamingCheckpointNoInterval(TestContainer container) return container .restoreJob( "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf", - jobId.get()) + jobId) .getExitCode(); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage());