Skip to content

Commit

Permalink
[Improve][E2E]modify the method of obtaining JobId
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Oct 21, 2024
1 parent c0f27c2 commit 330b98b
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand All @@ -49,8 +50,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;
Expand Down Expand Up @@ -329,11 +328,12 @@ public void testMultiTableWithRestore(TestContainer container)
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1);
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2);

String jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf");
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down Expand Up @@ -365,18 +365,6 @@ public void testMultiTableWithRestore(TestContainer container)
.pollInterval(1000, TimeUnit.MILLISECONDS)
.until(() -> getConnectionStatus("st_user_sink").size() == 1);

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// Restore job with add a new table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -288,12 +289,14 @@ public void testOpengaussCdcMultiTableE2e(TestContainer container) {
disabledReason = "Currently SPARK and FLINK do not support restore")
public void testMultiTableWithRestore(TestContainer container)
throws IOException, InterruptedException {
String jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf");
"/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf",
jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -319,18 +322,6 @@ public void testMultiTableWithRestore(TestContainer container)
OPENGAUSS_SCHEMA,
SINK_TABLE_1)))));

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// Restore job with add a new table
Expand Down Expand Up @@ -397,12 +388,13 @@ public void testMultiTableWithRestore(TestContainer container)
disabledReason = "Currently SPARK and FLINK do not support restore")
public void testAddFiledWithRestore(TestContainer container)
throws IOException, InterruptedException {
String jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/opengausscdc_to_opengauss_test_add_Filed.conf");
"/opengausscdc_to_opengauss_test_add_Filed.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -425,18 +417,6 @@ public void testAddFiledWithRestore(TestContainer container)
OPENGAUSS_SCHEMA,
SINK_TABLE_3)))));

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job opengausscdc_to_opengauss_test_add_Filed.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// add filed add insert source table data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -390,11 +391,12 @@ public void testMultiTableWithRestore(TestContainer container)
insertSourceTable(DATABASE, SOURCE_TABLE1);
insertSourceTable(DATABASE, SOURCE_TABLE2);

String jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf");
"/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down Expand Up @@ -432,18 +434,6 @@ public void testMultiTableWithRestore(TestContainer container)
getSourceQuerySQL(
DATABASE, SINK_TABLE1)))));

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job oraclecdc_to_oracle_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// Restore job with add a new table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -274,12 +275,13 @@ public void testPostgresCdcMultiTableE2e(TestContainer container) {
disabledReason = "Currently SPARK and FLINK do not support restore")
public void testMultiTableWithRestore(TestContainer container)
throws IOException, InterruptedException {
String jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/pgcdc_to_pg_with_multi_table_mode_one_table.conf");
"/pgcdc_to_pg_with_multi_table_mode_one_table.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -305,18 +307,6 @@ public void testMultiTableWithRestore(TestContainer container)
POSTGRESQL_SCHEMA,
SINK_TABLE_1)))));

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job pgcdc_to_pg_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// Restore job with add a new table
Expand Down Expand Up @@ -382,12 +372,13 @@ public void testMultiTableWithRestore(TestContainer container)
disabledReason = "Currently SPARK and FLINK do not support restore")
public void testAddFiledWithRestore(TestContainer container)
throws IOException, InterruptedException {
String jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/postgrescdc_to_postgres_test_add_Filed.conf");
"/postgrescdc_to_postgres_test_add_Filed.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -410,18 +401,6 @@ public void testAddFiledWithRestore(TestContainer container)
POSTGRESQL_SCHEMA,
SINK_TABLE_3)))));

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job postgrescdc_to_postgres_test_add_Filed.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// add filed add insert source table data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand All @@ -41,8 +42,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.awaitility.Awaitility.await;

Expand Down Expand Up @@ -167,11 +166,11 @@ public void testMultiTableWithRestore(TestContainer container)
// Clear related content to ensure that multiple operations are not affected
clearTable(TIDB_DATABASE, SOURCE_TABLE);
clearTable(TIDB_DATABASE, SINK_TABLE);

String jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/tidb/tidbcdc_to_tidb.conf");
container.executeJob("/tidb/tidbcdc_to_tidb.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -192,17 +191,6 @@ public void testMultiTableWithRestore(TestContainer container)
query(getSinkQuerySQL(TIDB_DATABASE, SINK_TABLE))));
});

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job tidbcdc_to_tidb.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// Restore job
Expand Down
Loading

0 comments on commit 330b98b

Please sign in to comment.