Skip to content

Commit

Permalink
chore(webserver): move taskrun operations to a dedicated endpoint (#1759
Browse files Browse the repository at this point in the history
)
  • Loading branch information
loicmathieu authored Aug 24, 2023
1 parent 123a977 commit 9fd38a2
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
Expand Down Expand Up @@ -80,7 +79,7 @@

@Slf4j
@Validated
@Controller("/api/v1/")
@Controller("/api/v1/executions")
public class ExecutionController {
@Nullable
@Value("${micronaut.server.context-path}")
Expand Down Expand Up @@ -119,7 +118,7 @@ public class ExecutionController {
private RunContextFactory runContextFactory;

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/search", produces = MediaType.TEXT_JSON)
@Get(uri = "/search", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Search for executions")
public PagedResults<Execution> find(
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") int page,
Expand All @@ -146,39 +145,7 @@ public PagedResults<Execution> find(
}

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "taskruns/search", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Search for taskruns")
public PagedResults<TaskRun> findTaskRun(
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") int page,
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
@Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state
) {
return PagedResults.of(executionRepository.findTaskRun(
PageableUtils.from(page, size, sort, executionRepository.sortMapping()),
query,
namespace,
flowId,
startDate,
endDate,
state
));
}

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "taskruns/maxTaskRunSetting")
@Hidden
public Integer maxTaskRunSetting() {
return executionRepository.maxTaskRunSetting();
}

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/{executionId}/graph", produces = MediaType.TEXT_JSON)
@Get(uri = "/{executionId}/graph", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Generate a graph for an execution")
public FlowGraph flowGraph(
@Parameter(description = "The execution id") @PathVariable String executionId
Expand All @@ -200,7 +167,7 @@ public FlowGraph flowGraph(
}

@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/{executionId}/eval/{taskRunId}", produces = MediaType.TEXT_JSON, consumes = MediaType.TEXT_PLAIN)
@Post(uri = "/{executionId}/eval/{taskRunId}", produces = MediaType.TEXT_JSON, consumes = MediaType.TEXT_PLAIN)
@Operation(tags = {"Executions"}, summary = "Evaluate a variable expression for this taskrun")
public EvalResult eval(
@Parameter(description = "The execution id") @PathVariable String executionId,
Expand Down Expand Up @@ -243,7 +210,7 @@ public static class EvalResult {
}

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/{executionId}", produces = MediaType.TEXT_JSON)
@Get(uri = "/{executionId}", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Get an execution")
public Execution get(
@Parameter(description = "The execution id") @PathVariable String executionId
Expand All @@ -253,7 +220,7 @@ public Execution get(
.orElse(null);
}

@Delete(uri = "executions/{executionId}", produces = MediaType.TEXT_JSON)
@Delete(uri = "/{executionId}", produces = MediaType.TEXT_JSON)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Executions"}, summary = "Delete an execution")
@ApiResponse(responseCode = "204", description = "On success")
Expand All @@ -269,7 +236,7 @@ public HttpResponse<Void> delete(
}
}

@Delete(uri = "executions/by-ids", produces = MediaType.TEXT_JSON)
@Delete(uri = "/by-ids", produces = MediaType.TEXT_JSON)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Executions"}, summary = "Delete a list of executions")
@ApiResponse(responseCode = "200", description = "On success", content = {@Content(schema = @Schema(implementation = BulkResponse.class))})
Expand Down Expand Up @@ -310,7 +277,7 @@ public MutableHttpResponse<?> deleteByIds(
return HttpResponse.ok(BulkResponse.builder().count(executions.size()).build());
}

@Delete(uri = "executions/by-query", produces = MediaType.TEXT_JSON)
@Delete(uri = "/by-query", produces = MediaType.TEXT_JSON)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Executions"}, summary = "Delete executions filter by query parameters")
public HttpResponse<BulkResponse> deleteByQuery(
Expand Down Expand Up @@ -344,7 +311,7 @@ public HttpResponse<BulkResponse> deleteByQuery(


@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions", produces = MediaType.TEXT_JSON)
@Get(produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Search for executions for a flow")
public PagedResults<Execution> findByFlowId(
@Parameter(description = "The flow namespace") @QueryValue String namespace,
Expand All @@ -359,7 +326,7 @@ public PagedResults<Execution> findByFlowId(
}

@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON)
@Post(uri = "/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Trigger a new execution by POST webhook trigger")
public Execution webhookTriggerPost(
@Parameter(description = "The flow namespace") @PathVariable String namespace,
Expand All @@ -371,7 +338,7 @@ public Execution webhookTriggerPost(
}

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON)
@Get(uri = "/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Trigger a new execution by GET webhook trigger")
public Execution webhookTriggerGet(
@Parameter(description = "The flow namespace") @PathVariable String namespace,
Expand All @@ -383,7 +350,7 @@ public Execution webhookTriggerGet(
}

@ExecuteOn(TaskExecutors.IO)
@Put(uri = "executions/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON)
@Put(uri = "/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Trigger a new execution by PUT webhook trigger")
public Execution webhookTriggerPut(
@Parameter(description = "The flow namespace") @PathVariable String namespace,
Expand Down Expand Up @@ -448,7 +415,7 @@ private Execution webhook(
}

@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/trigger/{namespace}/{id}", produces = MediaType.TEXT_JSON, consumes = MediaType.MULTIPART_FORM_DATA)
@Post(uri = "/trigger/{namespace}/{id}", produces = MediaType.TEXT_JSON, consumes = MediaType.MULTIPART_FORM_DATA)
@Operation(tags = {"Executions"}, summary = "Trigger a new execution for a flow")
@ApiResponse(responseCode = "409", description = "if the flow is disabled")
public Execution trigger(
Expand Down Expand Up @@ -558,7 +525,7 @@ protected <T> HttpResponse<T> validateFile(String executionId, URI path, String
}

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/{executionId}/file", produces = MediaType.APPLICATION_OCTET_STREAM)
@Get(uri = "/{executionId}/file", produces = MediaType.APPLICATION_OCTET_STREAM)
@Operation(tags = {"Executions"}, summary = "Download file for an execution")
public HttpResponse<StreamedFile> file(
@Parameter(description = "The execution id") @PathVariable String executionId,
Expand All @@ -576,7 +543,7 @@ public HttpResponse<StreamedFile> file(
}

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/{executionId}/file/metas", produces = MediaType.TEXT_JSON)
@Get(uri = "/{executionId}/file/metas", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Get file meta information for an execution")
public HttpResponse<FileMetas> filesize(
@Parameter(description = "The execution id") @PathVariable String executionId,
Expand All @@ -594,7 +561,7 @@ public HttpResponse<FileMetas> filesize(
}

@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/{executionId}/restart", produces = MediaType.TEXT_JSON)
@Post(uri = "/{executionId}/restart", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Restart a new execution from an old one")
public Execution restart(
@Parameter(description = "The execution id") @PathVariable String executionId,
Expand All @@ -614,7 +581,7 @@ public Execution restart(
}

@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/restart/by-ids", produces = MediaType.TEXT_JSON)
@Post(uri = "/restart/by-ids", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Restart a list of executions")
@ApiResponse(responseCode = "200", description = "On success", content = {@Content(schema = @Schema(implementation = BulkResponse.class))})
@ApiResponse(responseCode = "422", description = "Restarted with errors", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))})
Expand Down Expand Up @@ -665,7 +632,7 @@ public MutableHttpResponse<?> restartByIds(
}

@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/restart/by-query", produces = MediaType.TEXT_JSON)
@Post(uri = "/restart/by-query", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Restart executions filter by query parameters")
public HttpResponse<BulkResponse> restartByQuery(
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
Expand Down Expand Up @@ -699,7 +666,7 @@ public HttpResponse<BulkResponse> restartByQuery(
}

@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/{executionId}/replay", produces = MediaType.TEXT_JSON)
@Post(uri = "/{executionId}/replay", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Create a new execution from an old one and start it from a specified task run id")
public Execution replay(
@Parameter(description = "the original execution id to clone") @PathVariable String executionId,
Expand Down Expand Up @@ -737,7 +704,7 @@ private void controlRevision(Execution execution, Integer revision) {
}

@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/{executionId}/state", produces = MediaType.TEXT_JSON)
@Post(uri = "/{executionId}/state", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Change state for a taskrun in an execution")
public Execution changeState(
@Parameter(description = "The execution id") @PathVariable String executionId,
Expand All @@ -762,7 +729,7 @@ public static class StateRequest {
}

@ExecuteOn(TaskExecutors.IO)
@Delete(uri = "executions/{executionId}/kill", produces = MediaType.TEXT_JSON)
@Delete(uri = "/{executionId}/kill", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Kill an execution")
@ApiResponse(responseCode = "204", description = "On success")
@ApiResponse(responseCode = "409", description = "if the executions is already finished")
Expand Down Expand Up @@ -797,7 +764,7 @@ public HttpResponse<?> kill(
}

@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/{executionId}/resume", produces = MediaType.TEXT_JSON)
@Post(uri = "/{executionId}/resume", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Resume a paused execution.")
@ApiResponse(responseCode = "204", description = "On success")
@ApiResponse(responseCode = "409", description = "if the executions is not paused")
Expand All @@ -820,7 +787,7 @@ public HttpResponse<?> resume(
}

@ExecuteOn(TaskExecutors.IO)
@Delete(uri = "executions/kill/by-ids", produces = MediaType.TEXT_JSON)
@Delete(uri = "/kill/by-ids", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Kill a list of executions")
@ApiResponse(responseCode = "200", description = "On success", content = {@Content(schema = @Schema(implementation = BulkResponse.class))})
@ApiResponse(responseCode = "422", description = "Killed with errors", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))})
Expand Down Expand Up @@ -884,7 +851,7 @@ public MutableHttpResponse<?> killByIds(
}

@ExecuteOn(TaskExecutors.IO)
@Delete(uri = "executions/kill/by-query", produces = MediaType.TEXT_JSON)
@Delete(uri = "/kill/by-query", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Kill executions filter by query parameters")
public HttpResponse<?> killByQuery(
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
Expand Down Expand Up @@ -918,7 +885,7 @@ private boolean isStopFollow(Flow flow, Execution execution) {
}

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/{executionId}/follow", produces = MediaType.TEXT_EVENT_STREAM)
@Get(uri = "/{executionId}/follow", produces = MediaType.TEXT_EVENT_STREAM)
@Operation(tags = {"Executions"}, summary = "Follow an execution")
public Flowable<Event<Execution>> follow(
@Parameter(description = "The execution id") @PathVariable String executionId
Expand Down Expand Up @@ -971,7 +938,7 @@ public Flowable<Event<Execution>> follow(
}

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/{executionId}/file/preview", produces = MediaType.APPLICATION_JSON)
@Get(uri = "/{executionId}/file/preview", produces = MediaType.APPLICATION_JSON)
@Operation(tags = {"Executions"}, summary = "Get file preview for an execution")
public HttpResponse<?> filePreview(
@Parameter(description = "The execution id") @PathVariable String executionId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.kestra.webserver.controllers;

import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.webserver.responses.PagedResults;
import io.kestra.webserver.utils.PageableUtils;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.format.Format;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.QueryValue;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import jakarta.inject.Inject;

import java.time.ZonedDateTime;
import java.util.List;

@Controller("/api/v1/taskruns")
@Requires(property = "kestra.repository.type", value = "elasticsearch")
public class TaskRunController {
@Inject
protected ExecutionRepositoryInterface executionRepository;

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "/search", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Search for taskruns")
public PagedResults<TaskRun> findTaskRun(
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") int page,
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
@Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state
) {
return PagedResults.of(executionRepository.findTaskRun(
PageableUtils.from(page, size, sort, executionRepository.sortMapping()),
query,
namespace,
flowId,
startDate,
endDate,
state
));
}

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "/maxTaskRunSetting")
@Hidden
public Integer maxTaskRunSetting() {
return executionRepository.maxTaskRunSetting();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.kestra.webserver.controllers;

import io.kestra.webserver.controllers.h2.JdbcH2ControllerTest;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.rxjava2.http.client.RxHttpClient;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.*;

class TaskRunControllerTest extends JdbcH2ControllerTest {
@Inject
@Client("/")
private RxHttpClient client;

@Test
void search() {
HttpClientResponseException e = assertThrows(
HttpClientResponseException.class,
() -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/taskruns/search"))
);

assertThat(e.getStatus(), is(HttpStatus.NOT_FOUND));
}

@Test
void maxTaskRunSetting() {
HttpClientResponseException e = assertThrows(
HttpClientResponseException.class,
() -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/taskruns/maxTaskRunSetting"))
);

assertThat(e.getStatus(), is(HttpStatus.NOT_FOUND));
}
}

0 comments on commit 9fd38a2

Please sign in to comment.