diff --git a/dinky-core/src/main/java/org/dinky/executor/ExecutorFactory.java b/dinky-core/src/main/java/org/dinky/executor/ExecutorFactory.java index 237b462996..9aa2db5579 100644 --- a/dinky-core/src/main/java/org/dinky/executor/ExecutorFactory.java +++ b/dinky-core/src/main/java/org/dinky/executor/ExecutorFactory.java @@ -38,7 +38,7 @@ public static Executor getDefaultExecutor() { } public static Executor buildExecutor(ExecutorConfig executorConfig, DinkyClassLoader classLoader) { - if (executorConfig.isRemote()) { + if (executorConfig.isRemote() && !executorConfig.isPlan()) { return buildRemoteExecutor(executorConfig, classLoader); } else { return buildLocalExecutor(executorConfig, classLoader); diff --git a/dinky-core/src/main/java/org/dinky/job/Job.java b/dinky-core/src/main/java/org/dinky/job/Job.java index 724f46aab3..33bf093ee1 100644 --- a/dinky-core/src/main/java/org/dinky/job/Job.java +++ b/dinky-core/src/main/java/org/dinky/job/Job.java @@ -54,7 +54,7 @@ public class Job { private Executor executor; private boolean useGateway; private List jids; - private boolean isPipeline = true; + private boolean isPipeline = false; @Getter public enum JobStatus { diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java index 4bf812033e..1026962b7c 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java @@ -85,6 +85,7 @@ public JobJarRunner(JobManager jobManager) { @Override public void run(JobStatement jobStatement) throws Exception { + jobManager.getJob().setPipeline(true); if (!jobManager.isUseGateway()) { submitNormal(jobStatement); } else { @@ -169,7 +170,12 @@ private GatewayResult submitNormalWithGateway(JobStatement jobStatement) { private Pipeline getPipeline(JobStatement jobStatement) { Pipeline pipeline = getJarStreamGraph(jobStatement.getStatement(), jobManager.getDinkyClassLoader()); if (pipeline instanceof StreamGraph) { - if (Asserts.isNotNullString(jobManager.getConfig().getSavePointPath())) { + if (Asserts.isNotNullString(jobManager.getConfig().getSavePointPath()) + || (Asserts.isNotNull(jobManager.getConfig().getConfigJson()) + && Asserts.isNotNullString(jobManager + .getConfig() + .getConfigJson() + .get(SavepointConfigOptions.SAVEPOINT_PATH)))) { ((StreamGraph) pipeline) .setSavepointRestoreSettings(SavepointRestoreSettings.forPath( jobManager.getConfig().getSavePointPath(), diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java index 15e341bc2c..b536585ff9 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java @@ -91,6 +91,7 @@ public Optional execute(JobStatement jobStatement) throws Exception { @Override public void run(JobStatement jobStatement) throws Exception { + jobManager.getJob().setPipeline(true); if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) { JobJarRunner jobJarRunner = new JobJarRunner(jobManager); jobJarRunner.run(jobStatement); diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java index af9b864525..4dc2d5f700 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java @@ -105,6 +105,7 @@ public Optional execute(JobStatement jobStatement) throws Exception { public void run(JobStatement jobStatement) throws Exception { statements.add(jobStatement); if (jobStatement.isFinalExecutableStatement()) { + jobManager.getJob().setPipeline(isPipeline()); if (inferStatementSet()) { handleStatementSet(); } else { @@ -225,6 +226,11 @@ public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) { throw new DinkyException("None jobs in statement."); } + private boolean isPipeline() { + return statements.stream() + .anyMatch(jobStatement -> jobStatement.getSqlType().isPipeline()); + } + private boolean inferStatementSet() { boolean hasInsert = false; for (JobStatement item : statements) { @@ -266,7 +272,6 @@ private void processWithoutGateway() { private void processSingleInsertWithGateway() { List singleInsert = Collections.singletonList(statements.get(0)); - jobManager.getJob().setPipeline(statements.get(0).getSqlType().isPipeline()); GatewayResult gatewayResult = submitByGateway(singleInsert); setJobResultFromGatewayResult(gatewayResult); } @@ -276,9 +281,7 @@ private void processFirstStatement() throws Exception { return; } // Only process the first statement when not using statement set - JobStatement item = statements.get(0); - jobManager.getJob().setPipeline(item.getSqlType().isPipeline()); - processSingleStatement(item); + processSingleStatement(statements.get(0)); } private void processSingleStatement(JobStatement item) {