From 1151ec0e06d23597e8b37cb22582943adf19f281 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Wed, 8 Nov 2023 11:49:57 +0100 Subject: [PATCH] feat(cli): SubmitQueuedCommand to submit queued executions --- .../cli/commands/sys/SubmitQueuedCommand.java | 64 +++++++++++++++++++ .../kestra/cli/commands/sys/SysCommand.java | 3 +- .../AbstractJdbcExecutionQueuedStorage.java | 16 +++++ 3 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 cli/src/main/java/io/kestra/cli/commands/sys/SubmitQueuedCommand.java diff --git a/cli/src/main/java/io/kestra/cli/commands/sys/SubmitQueuedCommand.java b/cli/src/main/java/io/kestra/cli/commands/sys/SubmitQueuedCommand.java new file mode 100644 index 00000000000..5f0c175c71f --- /dev/null +++ b/cli/src/main/java/io/kestra/cli/commands/sys/SubmitQueuedCommand.java @@ -0,0 +1,64 @@ +package io.kestra.cli.commands.sys; + +import io.kestra.cli.AbstractCommand; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.State; +import io.kestra.core.queues.QueueFactoryInterface; +import io.kestra.core.queues.QueueInterface; +import io.kestra.core.runners.ExecutionQueued; +import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import lombok.extern.slf4j.Slf4j; +import picocli.CommandLine; + +import java.util.Optional; + +@CommandLine.Command( + name = "submit-queued-execution", + description = {"Submit all queued execution to the executor", + "All queued execution will be submitted to the executor. Warning, if there is still running executions and concurrency limit configured, the executions may be queued again." + } +) +@Slf4j +public class SubmitQueuedCommand extends AbstractCommand { + @Inject + private ApplicationContext applicationContext; + + @Inject + @Named(QueueFactoryInterface.EXECUTION_NAMED) + private QueueInterface executionQueue; + + @Override + public Integer call() throws Exception { + super.call(); + + Optional queueType = applicationContext.getProperty("kestra.queue.type", String.class); + if (queueType.isEmpty()) { + stdOut("Unable to submit queued executions, the 'kestra.queue.type' configuration is not set"); + return 0; + } + + int cpt = 0; + if (queueType.get().equals("kafka")) { + stdOut("Unable to submit queued executions, the 'kestra.queue.type' configuration is set to 'kafka', use the corresponding sys-ee command"); + return 1; + } + else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) { + var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class); + + for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) { + executionQueuedStorage.pop(queued.getTenantId(), queued.getNamespace(), queued.getFlowId(), execution -> executionQueue.emit(execution.withState(State.Type.CREATED))); + cpt++; + } + } + else { + stdOut("Unable to submit queued executions, the 'kestra.queue.type' is set to an unknown type '{0}'", queueType.get()); + return 1; + } + + stdOut("Successfully submitted {0} queued executions", cpt); + return 0; + } +} diff --git a/cli/src/main/java/io/kestra/cli/commands/sys/SysCommand.java b/cli/src/main/java/io/kestra/cli/commands/sys/SysCommand.java index 6cf801cc314..08535047c54 100644 --- a/cli/src/main/java/io/kestra/cli/commands/sys/SysCommand.java +++ b/cli/src/main/java/io/kestra/cli/commands/sys/SysCommand.java @@ -13,7 +13,8 @@ mixinStandardHelpOptions = true, subcommands = { ReindexCommand.class, - DatabaseCommand.class + DatabaseCommand.class, + SubmitQueuedCommand.class } ) @Slf4j diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcExecutionQueuedStorage.java b/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcExecutionQueuedStorage.java index 434429531f3..a2c26044c8a 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcExecutionQueuedStorage.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcExecutionQueuedStorage.java @@ -6,6 +6,7 @@ import org.jooq.Field; import org.jooq.impl.DSL; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Consumer; @@ -45,4 +46,19 @@ public void pop(String tenantId, String namespace, String flowId, Consumer getAllForAllTenants() { + return this.jdbcRepository + .getDslContextWrapper() + .transactionResult(configuration -> { + var select = DSL + .using(configuration) + .select(AbstractJdbcRepository.field("value")) + .from(this.jdbcRepository.getTable()); + + return this.jdbcRepository.fetch(select); + }); + } }