Skip to content

Commit

Permalink
feat(jdbc): safely deserialize subflow executions
Browse files Browse the repository at this point in the history
Fixes #2300
  • Loading branch information
loicmathieu committed Jan 12, 2024
1 parent b781db5 commit 19f2fa2
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,67 @@ public class DeserializationIssuesCaseTest {
}
""";

public static final String INVALID_SUBFLOW_EXECUTION_KEY = "1XKpihp8y2m3KEHR0hVEKN";
public static final String INVALID_SUBFLOW_EXECUTION_VALUE = """
{
"execution": {
"id": "1XKpihp8y2m3KEHR0hVEKN",
"state": {
"current": "CREATED",
"duration": 0.000201173,
"histories": [
{
"date": "2024-01-10T13:48:32.752Z",
"state": "CREATED"
}
],
"startDate": "2024-01-10T13:48:32.752Z"
},
"flowId": "hello-world",
"deleted": false,
"trigger": {
"id": "subflow",
"type": "io.kestra.notfound.Invalid",
"variables": {
"flowId": "subflox",
"namespace": "company.team",
"executionId": "4NzSyOQBYj1CxVg3bTghbZ",
"flowRevision": 1
}
},
"namespace": "company.team",
"originalId": "1XKpihp8y2m3KEHR0hVEKN",
"flowRevision": 2
},
"parentTask": {
"id": "subflow",
"type": "io.kestra.notfound.Invalid"
},
"parentTaskRun": {
"id": "6Gc6Dkk7medsWtg1WJfZpN",
"state": {
"current": "RUNNING",
"duration": 0.039446974,
"histories": [
{
"date": "2024-01-10T13:48:32.713Z",
"state": "CREATED"
},
{
"date": "2024-01-10T13:48:32.752Z",
"state": "RUNNING"
}
],
"startDate": "2024-01-10T13:48:32.713Z"
},
"flowId": "subflox",
"taskId": "subflow",
"namespace": "company.team",
"executionId": "4NzSyOQBYj1CxVg3bTghbZ"
}
}
""";

@Inject
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
protected QueueInterface<WorkerTaskResult> workerTaskResultQueue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
package io.kestra.runner.postgres;

import io.kestra.core.runners.DeserializationIssuesCaseTest;
import io.kestra.jdbc.runner.AbstractSubflowExecutionTest;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.impl.DSL;

class PostgresSubflowExecutionStorageTest extends AbstractSubflowExecutionTest {
import java.util.Map;

class PostgresSubflowExecutionStorageTest extends AbstractSubflowExecutionTest {
@Override
protected Map<Field<Object>, Object> persistFields() {
return Map.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"),
DSL.val(JSONB.valueOf(DeserializationIssuesCaseTest.INVALID_SUBFLOW_EXECUTION_VALUE)));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package io.kestra.jdbc.runner;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import org.jooq.DSLContext;
import org.jooq.Field;
Expand All @@ -13,6 +18,7 @@
import java.util.Optional;

public abstract class AbstractJdbcSubflowExecutionStorage extends AbstractJdbcRepository {
private final static ObjectMapper MAPPER = JacksonMapper.ofJson();
protected io.kestra.jdbc.AbstractJdbcRepository<SubflowExecution<?>> jdbcRepository;

@SuppressWarnings({"unchecked", "rawtypes"})
Expand All @@ -32,7 +38,18 @@ public Optional<SubflowExecution<?>> get(String executionId) {
AbstractJdbcRepository.field("key").eq(executionId)
);

return this.jdbcRepository.fetchOne(select);
try {
return this.jdbcRepository.fetchOne(select);
} catch (DeserializationException deserializationException) {
// we may fail to deserialize a SubflowExecution if we fail to deserialize its task
var jsonNode = MAPPER.readTree(deserializationException.getRecord());
var taskRun = MAPPER.treeToValue(jsonNode.get("parentTaskRun"), TaskRun.class);
var execution = MAPPER.treeToValue(jsonNode.get("execution"), Execution.class);
return Optional.of(SubflowExecution.builder()
.parentTaskRun(taskRun)
.execution(execution)
.build());
}
});
}

Expand Down
4 changes: 1 addition & 3 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.http.HttpResponse;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import jakarta.inject.Inject;
import jakarta.inject.Named;
Expand All @@ -49,7 +48,6 @@

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
Expand Down Expand Up @@ -468,7 +466,7 @@ private void executionQueue(Either<Execution, DeserializationException> either)
subflowExecutionStorage.get(execution.getId())
.ifPresent(subflowExecution -> {
// If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service.
if (subflowExecution.getParentTask().waitForExecution()) {
if (subflowExecution.getParentTask() != null && subflowExecution.getParentTask().waitForExecution()) {
sendSubflowExecutionResult(execution, subflowExecution, subflowExecution.getParentTaskRun().withState(execution.getState().getCurrent()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.runners.DeserializationIssuesCaseTest;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.tasks.flows.Subflow;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JdbcTestUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.jooq.Field;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -48,6 +52,25 @@ void suite() throws Exception {
assertThat(find.isPresent(), is(false));
}

@Test
void deserializationIssue() {
// insert an invalid subflowExecution
var subflowExecution = SubflowExecution.builder()
.execution(Execution.builder().id(DeserializationIssuesCaseTest.INVALID_SUBFLOW_EXECUTION_KEY).build())
.build();
Map<Field<Object>, Object> fields = persistFields();
subflowExecutionStorage.jdbcRepository.persist(subflowExecution, fields);

// load it
Optional<SubflowExecution<?>> find = subflowExecutionStorage.get(DeserializationIssuesCaseTest.INVALID_SUBFLOW_EXECUTION_KEY);
assertThat(find.isPresent(), is(true));
}

protected Map<Field<Object>, Object> persistFields() {
return Map.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"),
DeserializationIssuesCaseTest.INVALID_SUBFLOW_EXECUTION_VALUE);
}

@BeforeEach
protected void init() {
jdbcTestUtils.drop();
Expand Down

0 comments on commit 19f2fa2

Please sign in to comment.