Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][E2E] modify the method of obtaining JobId #7880

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand All @@ -49,8 +50,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;
Expand Down Expand Up @@ -329,11 +328,12 @@ public void testMultiTableWithRestore(TestContainer container)
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1);
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2);

String jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf");
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down Expand Up @@ -365,18 +365,6 @@ public void testMultiTableWithRestore(TestContainer container)
.pollInterval(1000, TimeUnit.MILLISECONDS)
.until(() -> getConnectionStatus("st_user_sink").size() == 1);

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// Restore job with add a new table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -288,12 +289,14 @@ public void testOpengaussCdcMultiTableE2e(TestContainer container) {
disabledReason = "Currently SPARK and FLINK do not support restore")
public void testMultiTableWithRestore(TestContainer container)
throws IOException, InterruptedException {
String jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf");
"/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf",
jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -319,18 +322,6 @@ public void testMultiTableWithRestore(TestContainer container)
OPENGAUSS_SCHEMA,
SINK_TABLE_1)))));

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// Restore job with add a new table
Expand Down Expand Up @@ -397,12 +388,13 @@ public void testMultiTableWithRestore(TestContainer container)
disabledReason = "Currently SPARK and FLINK do not support restore")
public void testAddFiledWithRestore(TestContainer container)
throws IOException, InterruptedException {
String jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/opengausscdc_to_opengauss_test_add_Filed.conf");
"/opengausscdc_to_opengauss_test_add_Filed.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -425,18 +417,6 @@ public void testAddFiledWithRestore(TestContainer container)
OPENGAUSS_SCHEMA,
SINK_TABLE_3)))));

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job opengausscdc_to_opengauss_test_add_Filed.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// add filed add insert source table data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -390,11 +391,12 @@ public void testMultiTableWithRestore(TestContainer container)
insertSourceTable(DATABASE, SOURCE_TABLE1);
insertSourceTable(DATABASE, SOURCE_TABLE2);

String jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf");
"/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down Expand Up @@ -432,18 +434,6 @@ public void testMultiTableWithRestore(TestContainer container)
getSourceQuerySQL(
DATABASE, SINK_TABLE1)))));

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job oraclecdc_to_oracle_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// Restore job with add a new table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -274,12 +275,13 @@ public void testPostgresCdcMultiTableE2e(TestContainer container) {
disabledReason = "Currently SPARK and FLINK do not support restore")
public void testMultiTableWithRestore(TestContainer container)
throws IOException, InterruptedException {
String jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/pgcdc_to_pg_with_multi_table_mode_one_table.conf");
"/pgcdc_to_pg_with_multi_table_mode_one_table.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -305,18 +307,6 @@ public void testMultiTableWithRestore(TestContainer container)
POSTGRESQL_SCHEMA,
SINK_TABLE_1)))));

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job pgcdc_to_pg_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// Restore job with add a new table
Expand Down Expand Up @@ -382,12 +372,13 @@ public void testMultiTableWithRestore(TestContainer container)
disabledReason = "Currently SPARK and FLINK do not support restore")
public void testAddFiledWithRestore(TestContainer container)
throws IOException, InterruptedException {
String jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/postgrescdc_to_postgres_test_add_Filed.conf");
"/postgrescdc_to_postgres_test_add_Filed.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -410,18 +401,6 @@ public void testAddFiledWithRestore(TestContainer container)
POSTGRESQL_SCHEMA,
SINK_TABLE_3)))));

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job postgrescdc_to_postgres_test_add_Filed.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// add filed add insert source table data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand All @@ -41,8 +42,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.awaitility.Awaitility.await;

Expand Down Expand Up @@ -167,11 +166,11 @@ public void testMultiTableWithRestore(TestContainer container)
// Clear related content to ensure that multiple operations are not affected
clearTable(TIDB_DATABASE, SOURCE_TABLE);
clearTable(TIDB_DATABASE, SINK_TABLE);

String jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/tidb/tidbcdc_to_tidb.conf");
container.executeJob("/tidb/tidbcdc_to_tidb.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand All @@ -192,17 +191,6 @@ public void testMultiTableWithRestore(TestContainer container)
query(getSinkQuerySQL(TIDB_DATABASE, SINK_TABLE))));
});

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job tidbcdc_to_tidb.conf \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// Restore job
Expand Down
Loading
Loading