Skip to content

Commit

Permalink
[Fix-4004] Fix unable to execute statements such as create database (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo authored Feb 8, 2025
1 parent ade81bd commit 4072b0e
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static Executor getDefaultExecutor() {
}

public static Executor buildExecutor(ExecutorConfig executorConfig, DinkyClassLoader classLoader) {
if (executorConfig.isRemote()) {
if (executorConfig.isRemote() && !executorConfig.isPlan()) {
return buildRemoteExecutor(executorConfig, classLoader);
} else {
return buildLocalExecutor(executorConfig, classLoader);
Expand Down
2 changes: 1 addition & 1 deletion dinky-core/src/main/java/org/dinky/job/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class Job {
private Executor executor;
private boolean useGateway;
private List<String> jids;
private boolean isPipeline = true;
private boolean isPipeline = false;

@Getter
public enum JobStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public JobJarRunner(JobManager jobManager) {

@Override
public void run(JobStatement jobStatement) throws Exception {
jobManager.getJob().setPipeline(true);
if (!jobManager.isUseGateway()) {
submitNormal(jobStatement);
} else {
Expand Down Expand Up @@ -169,7 +170,12 @@ private GatewayResult submitNormalWithGateway(JobStatement jobStatement) {
private Pipeline getPipeline(JobStatement jobStatement) {
Pipeline pipeline = getJarStreamGraph(jobStatement.getStatement(), jobManager.getDinkyClassLoader());
if (pipeline instanceof StreamGraph) {
if (Asserts.isNotNullString(jobManager.getConfig().getSavePointPath())) {
if (Asserts.isNotNullString(jobManager.getConfig().getSavePointPath())
|| (Asserts.isNotNull(jobManager.getConfig().getConfigJson())
&& Asserts.isNotNullString(jobManager
.getConfig()
.getConfigJson()
.get(SavepointConfigOptions.SAVEPOINT_PATH)))) {
((StreamGraph) pipeline)
.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(
jobManager.getConfig().getSavePointPath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public Optional<JobClient> execute(JobStatement jobStatement) throws Exception {

@Override
public void run(JobStatement jobStatement) throws Exception {
jobManager.getJob().setPipeline(true);
if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) {
JobJarRunner jobJarRunner = new JobJarRunner(jobManager);
jobJarRunner.run(jobStatement);
Expand Down
11 changes: 7 additions & 4 deletions dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public Optional<JobClient> execute(JobStatement jobStatement) throws Exception {
public void run(JobStatement jobStatement) throws Exception {
statements.add(jobStatement);
if (jobStatement.isFinalExecutableStatement()) {
jobManager.getJob().setPipeline(isPipeline());
if (inferStatementSet()) {
handleStatementSet();
} else {
Expand Down Expand Up @@ -225,6 +226,11 @@ public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) {
throw new DinkyException("None jobs in statement.");
}

private boolean isPipeline() {
return statements.stream()
.anyMatch(jobStatement -> jobStatement.getSqlType().isPipeline());
}

private boolean inferStatementSet() {
boolean hasInsert = false;
for (JobStatement item : statements) {
Expand Down Expand Up @@ -266,7 +272,6 @@ private void processWithoutGateway() {

private void processSingleInsertWithGateway() {
List<JobStatement> singleInsert = Collections.singletonList(statements.get(0));
jobManager.getJob().setPipeline(statements.get(0).getSqlType().isPipeline());
GatewayResult gatewayResult = submitByGateway(singleInsert);
setJobResultFromGatewayResult(gatewayResult);
}
Expand All @@ -276,9 +281,7 @@ private void processFirstStatement() throws Exception {
return;
}
// Only process the first statement when not using statement set
JobStatement item = statements.get(0);
jobManager.getJob().setPipeline(item.getSqlType().isPipeline());
processSingleStatement(item);
processSingleStatement(statements.get(0));
}

private void processSingleStatement(JobStatement item) {
Expand Down

0 comments on commit 4072b0e

Please sign in to comment.