From 19f2fa21c22382e012049c2d5c2458cedd2d1773 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Wed, 10 Jan 2024 16:34:43 +0100 Subject: [PATCH] feat(jdbc): safely deserialize subflow executions Fixes #2300 --- .../DeserializationIssuesCaseTest.java | 61 +++++++++++++++++++ .../PostgresSubflowExecutionStorageTest.java | 12 +++- .../AbstractJdbcSubflowExecutionStorage.java | 19 +++++- .../io/kestra/jdbc/runner/JdbcExecutor.java | 4 +- .../runner/AbstractSubflowExecutionTest.java | 23 +++++++ 5 files changed, 114 insertions(+), 5 deletions(-) diff --git a/core/src/test/java/io/kestra/core/runners/DeserializationIssuesCaseTest.java b/core/src/test/java/io/kestra/core/runners/DeserializationIssuesCaseTest.java index 8da685845d7..2aeb63f51d7 100644 --- a/core/src/test/java/io/kestra/core/runners/DeserializationIssuesCaseTest.java +++ b/core/src/test/java/io/kestra/core/runners/DeserializationIssuesCaseTest.java @@ -149,6 +149,67 @@ public class DeserializationIssuesCaseTest { } """; + public static final String INVALID_SUBFLOW_EXECUTION_KEY = "1XKpihp8y2m3KEHR0hVEKN"; + public static final String INVALID_SUBFLOW_EXECUTION_VALUE = """ + { + "execution": { + "id": "1XKpihp8y2m3KEHR0hVEKN", + "state": { + "current": "CREATED", + "duration": 0.000201173, + "histories": [ + { + "date": "2024-01-10T13:48:32.752Z", + "state": "CREATED" + } + ], + "startDate": "2024-01-10T13:48:32.752Z" + }, + "flowId": "hello-world", + "deleted": false, + "trigger": { + "id": "subflow", + "type": "io.kestra.notfound.Invalid", + "variables": { + "flowId": "subflox", + "namespace": "company.team", + "executionId": "4NzSyOQBYj1CxVg3bTghbZ", + "flowRevision": 1 + } + }, + "namespace": "company.team", + "originalId": "1XKpihp8y2m3KEHR0hVEKN", + "flowRevision": 2 + }, + "parentTask": { + "id": "subflow", + "type": "io.kestra.notfound.Invalid" + }, + "parentTaskRun": { + "id": "6Gc6Dkk7medsWtg1WJfZpN", + "state": { + "current": "RUNNING", + "duration": 0.039446974, + "histories": [ + { + "date": "2024-01-10T13:48:32.713Z", + "state": "CREATED" + }, + { + "date": "2024-01-10T13:48:32.752Z", + "state": "RUNNING" + } + ], + "startDate": "2024-01-10T13:48:32.713Z" + }, + "flowId": "subflox", + "taskId": "subflow", + "namespace": "company.team", + "executionId": "4NzSyOQBYj1CxVg3bTghbZ" + } + } + """; + @Inject @Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED) protected QueueInterface workerTaskResultQueue; diff --git a/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresSubflowExecutionStorageTest.java b/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresSubflowExecutionStorageTest.java index ef8ae3e41ce..9dde7a46f47 100644 --- a/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresSubflowExecutionStorageTest.java +++ b/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresSubflowExecutionStorageTest.java @@ -1,7 +1,17 @@ package io.kestra.runner.postgres; +import io.kestra.core.runners.DeserializationIssuesCaseTest; import io.kestra.jdbc.runner.AbstractSubflowExecutionTest; +import org.jooq.Field; +import org.jooq.JSONB; +import org.jooq.impl.DSL; -class PostgresSubflowExecutionStorageTest extends AbstractSubflowExecutionTest { +import java.util.Map; +class PostgresSubflowExecutionStorageTest extends AbstractSubflowExecutionTest { + @Override + protected Map, Object> persistFields() { + return Map.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"), + DSL.val(JSONB.valueOf(DeserializationIssuesCaseTest.INVALID_SUBFLOW_EXECUTION_VALUE))); + } } \ No newline at end of file diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcSubflowExecutionStorage.java b/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcSubflowExecutionStorage.java index 04bafcdd1c4..aac82d75c06 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcSubflowExecutionStorage.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcSubflowExecutionStorage.java @@ -1,6 +1,11 @@ package io.kestra.jdbc.runner; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.kestra.core.exceptions.DeserializationException; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.TaskRun; import io.kestra.core.runners.SubflowExecution; +import io.kestra.core.serializers.JacksonMapper; import io.kestra.jdbc.repository.AbstractJdbcRepository; import org.jooq.DSLContext; import org.jooq.Field; @@ -13,6 +18,7 @@ import java.util.Optional; public abstract class AbstractJdbcSubflowExecutionStorage extends AbstractJdbcRepository { + private final static ObjectMapper MAPPER = JacksonMapper.ofJson(); protected io.kestra.jdbc.AbstractJdbcRepository> jdbcRepository; @SuppressWarnings({"unchecked", "rawtypes"}) @@ -32,7 +38,18 @@ public Optional> get(String executionId) { AbstractJdbcRepository.field("key").eq(executionId) ); - return this.jdbcRepository.fetchOne(select); + try { + return this.jdbcRepository.fetchOne(select); + } catch (DeserializationException deserializationException) { + // we may fail to deserialize a SubflowExecution if we fail to deserialize its task + var jsonNode = MAPPER.readTree(deserializationException.getRecord()); + var taskRun = MAPPER.treeToValue(jsonNode.get("parentTaskRun"), TaskRun.class); + var execution = MAPPER.treeToValue(jsonNode.get("execution"), Execution.class); + return Optional.of(SubflowExecution.builder() + .parentTaskRun(taskRun) + .execution(execution) + .build()); + } }); } diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index 1a302efafd4..31d52a91a58 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -37,7 +37,6 @@ import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository; import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.Value; -import io.micronaut.http.HttpResponse; import io.micronaut.transaction.exceptions.CannotCreateTransactionException; import jakarta.inject.Inject; import jakarta.inject.Named; @@ -49,7 +48,6 @@ import java.io.IOException; import java.time.Duration; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.*; @@ -468,7 +466,7 @@ private void executionQueue(Either either) subflowExecutionStorage.get(execution.getId()) .ifPresent(subflowExecution -> { // If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service. - if (subflowExecution.getParentTask().waitForExecution()) { + if (subflowExecution.getParentTask() != null && subflowExecution.getParentTask().waitForExecution()) { sendSubflowExecutionResult(execution, subflowExecution, subflowExecution.getParentTaskRun().withState(execution.getState().getCurrent())); } diff --git a/jdbc/src/test/java/io/kestra/jdbc/runner/AbstractSubflowExecutionTest.java b/jdbc/src/test/java/io/kestra/jdbc/runner/AbstractSubflowExecutionTest.java index 27b11976ec9..066af5ee73e 100644 --- a/jdbc/src/test/java/io/kestra/jdbc/runner/AbstractSubflowExecutionTest.java +++ b/jdbc/src/test/java/io/kestra/jdbc/runner/AbstractSubflowExecutionTest.java @@ -2,16 +2,20 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.runners.DeserializationIssuesCaseTest; import io.kestra.core.runners.SubflowExecution; import io.kestra.core.tasks.flows.Subflow; import io.kestra.core.utils.IdUtils; import io.kestra.jdbc.JdbcTestUtils; import io.micronaut.test.extensions.junit5.annotation.MicronautTest; import jakarta.inject.Inject; +import org.jooq.Field; +import org.jooq.impl.DSL; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.List; +import java.util.Map; import java.util.Optional; import static org.hamcrest.MatcherAssert.assertThat; @@ -48,6 +52,25 @@ void suite() throws Exception { assertThat(find.isPresent(), is(false)); } + @Test + void deserializationIssue() { + // insert an invalid subflowExecution + var subflowExecution = SubflowExecution.builder() + .execution(Execution.builder().id(DeserializationIssuesCaseTest.INVALID_SUBFLOW_EXECUTION_KEY).build()) + .build(); + Map, Object> fields = persistFields(); + subflowExecutionStorage.jdbcRepository.persist(subflowExecution, fields); + + // load it + Optional> find = subflowExecutionStorage.get(DeserializationIssuesCaseTest.INVALID_SUBFLOW_EXECUTION_KEY); + assertThat(find.isPresent(), is(true)); + } + + protected Map, Object> persistFields() { + return Map.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"), + DeserializationIssuesCaseTest.INVALID_SUBFLOW_EXECUTION_VALUE); + } + @BeforeEach protected void init() { jdbcTestUtils.drop();