Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] [Engine] Fix Default Cluster Not Working In Config File #3770

Merged
merged 8 commits into from
Mar 14, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class ClientCommandArgs extends AbstractCommandArgs {
@Parameter(
names = {"-cn", "--cluster"},
description = "The name of cluster")
private String clusterName = "seatunnel_default_cluster";
private String clusterName;

@Parameter(
names = {"-j", "--job-id"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ServerCommandArgs extends CommandArgs {
@Parameter(
names = {"-cn", "--cluster"},
description = "The name of cluster")
private String clusterName = "seatunnel_default_cluster";
private String clusterName;

@Override
public Command<?> buildCommand() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;

import org.apache.commons.lang3.StringUtils;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
Expand Down Expand Up @@ -76,12 +79,20 @@ public void execute() throws CommandExecuteException {
try {
String clusterName = clientCommandArgs.getClusterName();
if (clientCommandArgs.getMasterType().equals(MasterType.LOCAL)) {
clusterName = creatRandomClusterName(clusterName);
clusterName =
creatRandomClusterName(
StringUtils.isNotEmpty(clusterName)
? clusterName
: Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
instance = createServerInLocal(clusterName);
}
seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
if (StringUtils.isNotEmpty(clusterName)) {
seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
}
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(clusterName);
if (StringUtils.isNotEmpty(clusterName)) {
clientConfig.setClusterName(clusterName);
}
engineClient = new SeaTunnelClient(clientConfig);
if (clientCommandArgs.isListJob()) {
String jobStatus = engineClient.getJobClient().listJobStatus(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;

import org.apache.commons.lang3.StringUtils;

import com.hazelcast.instance.impl.HazelcastInstanceFactory;

/** This command is used to execute the SeaTunnel engine job by SeaTunnel API. */
Expand All @@ -37,7 +39,9 @@ public ServerExecuteCommand(ServerCommandArgs serverCommandArgs) {
@Override
public void execute() {
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(serverCommandArgs.getClusterName());
if (StringUtils.isNotEmpty(serverCommandArgs.getClusterName())) {
seaTunnelConfig.getHazelcastConfig().setClusterName(serverCommandArgs.getClusterName());
}
HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
Thread.currentThread().getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public SeaTunnelConfig() {
.getJoin()
.getMulticastConfig()
.setMulticastPort(Constant.DEFAULT_SEATUNNEL_MULTICAST_PORT);
hazelcastConfig.setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
hazelcastConfig
.getHotRestartPersistenceConfig()
.setBaseDir(new File(seatunnelHome(), "recovery").getAbsoluteFile());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,33 +133,35 @@ public void testHandleCheckpointTimeout() throws Exception {
.untilAsserted(
() -> Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));

// call checkpoint timeout
jobMaster.handleCheckpointError(1);

// Because handleCheckpointTimeout is an async method, so we need sleep 5s to waiting job
// status become running again
Thread.sleep(5000);

// test job still run
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));
jobMaster.neverNeedRestore();
// call checkpoint timeout
jobMaster.handleCheckpointError(1);

PassiveCompletableFuture<JobResult> jobMasterCompleteFuture =
jobMaster.getJobMasterCompleteFuture();
// cancel job
jobMaster.cancelJob();

// test job turn to complete
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
// Why equals CANCELED or FAILED? because handleCheckpointError
// should call by CheckpointCoordinator,
// before do this, CheckpointCoordinator should be failed. Anyway,
// use handleCheckpointError not good to test checkpoint timeout.
Assertions.assertTrue(
jobMasterCompleteFuture.isDone()
&& JobStatus.CANCELED.equals(
jobMasterCompleteFuture
.get()
.getStatus())));
&& (JobStatus.CANCELED.equals(
jobMasterCompleteFuture
.get()
.getStatus())
|| JobStatus.FAILED.equals(
jobMasterCompleteFuture
.get()
.getStatus()))));

testIMapRemovedAfterJobComplete(jobMaster);
}
Expand Down