diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java index f41f76bd55a..59d3bcf3329 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java @@ -51,7 +51,6 @@ import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.Single; -import io.swagger.v3.oas.annotations.Hidden; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.media.Content; @@ -80,7 +79,7 @@ @Slf4j @Validated -@Controller("/api/v1/") +@Controller("/api/v1/executions") public class ExecutionController { @Nullable @Value("${micronaut.server.context-path}") @@ -119,7 +118,7 @@ public class ExecutionController { private RunContextFactory runContextFactory; @ExecuteOn(TaskExecutors.IO) - @Get(uri = "executions/search", produces = MediaType.TEXT_JSON) + @Get(uri = "/search", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Search for executions") public PagedResults find( @Parameter(description = "The current page") @QueryValue(defaultValue = "1") int page, @@ -146,39 +145,7 @@ public PagedResults find( } @ExecuteOn(TaskExecutors.IO) - @Get(uri = "taskruns/search", produces = MediaType.TEXT_JSON) - @Operation(tags = {"Executions"}, summary = "Search for taskruns") - public PagedResults findTaskRun( - @Parameter(description = "The current page") @QueryValue(defaultValue = "1") int page, - @Parameter(description = "The current page size") @QueryValue(defaultValue = "10") int size, - @Parameter(description = "The sort of current page") @Nullable @QueryValue List sort, - @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query, - @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace, - @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId, - @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate, - @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate, - @Parameter(description = "A state filter") @Nullable @QueryValue List state - ) { - return PagedResults.of(executionRepository.findTaskRun( - PageableUtils.from(page, size, sort, executionRepository.sortMapping()), - query, - namespace, - flowId, - startDate, - endDate, - state - )); - } - - @ExecuteOn(TaskExecutors.IO) - @Get(uri = "taskruns/maxTaskRunSetting") - @Hidden - public Integer maxTaskRunSetting() { - return executionRepository.maxTaskRunSetting(); - } - - @ExecuteOn(TaskExecutors.IO) - @Get(uri = "executions/{executionId}/graph", produces = MediaType.TEXT_JSON) + @Get(uri = "/{executionId}/graph", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Generate a graph for an execution") public FlowGraph flowGraph( @Parameter(description = "The execution id") @PathVariable String executionId @@ -200,7 +167,7 @@ public FlowGraph flowGraph( } @ExecuteOn(TaskExecutors.IO) - @Post(uri = "executions/{executionId}/eval/{taskRunId}", produces = MediaType.TEXT_JSON, consumes = MediaType.TEXT_PLAIN) + @Post(uri = "/{executionId}/eval/{taskRunId}", produces = MediaType.TEXT_JSON, consumes = MediaType.TEXT_PLAIN) @Operation(tags = {"Executions"}, summary = "Evaluate a variable expression for this taskrun") public EvalResult eval( @Parameter(description = "The execution id") @PathVariable String executionId, @@ -243,7 +210,7 @@ public static class EvalResult { } @ExecuteOn(TaskExecutors.IO) - @Get(uri = "executions/{executionId}", produces = MediaType.TEXT_JSON) + @Get(uri = "/{executionId}", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Get an execution") public Execution get( @Parameter(description = "The execution id") @PathVariable String executionId @@ -253,7 +220,7 @@ public Execution get( .orElse(null); } - @Delete(uri = "executions/{executionId}", produces = MediaType.TEXT_JSON) + @Delete(uri = "/{executionId}", produces = MediaType.TEXT_JSON) @ExecuteOn(TaskExecutors.IO) @Operation(tags = {"Executions"}, summary = "Delete an execution") @ApiResponse(responseCode = "204", description = "On success") @@ -269,7 +236,7 @@ public HttpResponse delete( } } - @Delete(uri = "executions/by-ids", produces = MediaType.TEXT_JSON) + @Delete(uri = "/by-ids", produces = MediaType.TEXT_JSON) @ExecuteOn(TaskExecutors.IO) @Operation(tags = {"Executions"}, summary = "Delete a list of executions") @ApiResponse(responseCode = "200", description = "On success", content = {@Content(schema = @Schema(implementation = BulkResponse.class))}) @@ -310,7 +277,7 @@ public MutableHttpResponse deleteByIds( return HttpResponse.ok(BulkResponse.builder().count(executions.size()).build()); } - @Delete(uri = "executions/by-query", produces = MediaType.TEXT_JSON) + @Delete(uri = "/by-query", produces = MediaType.TEXT_JSON) @ExecuteOn(TaskExecutors.IO) @Operation(tags = {"Executions"}, summary = "Delete executions filter by query parameters") public HttpResponse deleteByQuery( @@ -344,7 +311,7 @@ public HttpResponse deleteByQuery( @ExecuteOn(TaskExecutors.IO) - @Get(uri = "executions", produces = MediaType.TEXT_JSON) + @Get(produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Search for executions for a flow") public PagedResults findByFlowId( @Parameter(description = "The flow namespace") @QueryValue String namespace, @@ -359,7 +326,7 @@ public PagedResults findByFlowId( } @ExecuteOn(TaskExecutors.IO) - @Post(uri = "executions/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON) + @Post(uri = "/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Trigger a new execution by POST webhook trigger") public Execution webhookTriggerPost( @Parameter(description = "The flow namespace") @PathVariable String namespace, @@ -371,7 +338,7 @@ public Execution webhookTriggerPost( } @ExecuteOn(TaskExecutors.IO) - @Get(uri = "executions/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON) + @Get(uri = "/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Trigger a new execution by GET webhook trigger") public Execution webhookTriggerGet( @Parameter(description = "The flow namespace") @PathVariable String namespace, @@ -383,7 +350,7 @@ public Execution webhookTriggerGet( } @ExecuteOn(TaskExecutors.IO) - @Put(uri = "executions/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON) + @Put(uri = "/webhook/{namespace}/{id}/{key}", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Trigger a new execution by PUT webhook trigger") public Execution webhookTriggerPut( @Parameter(description = "The flow namespace") @PathVariable String namespace, @@ -448,7 +415,7 @@ private Execution webhook( } @ExecuteOn(TaskExecutors.IO) - @Post(uri = "executions/trigger/{namespace}/{id}", produces = MediaType.TEXT_JSON, consumes = MediaType.MULTIPART_FORM_DATA) + @Post(uri = "/trigger/{namespace}/{id}", produces = MediaType.TEXT_JSON, consumes = MediaType.MULTIPART_FORM_DATA) @Operation(tags = {"Executions"}, summary = "Trigger a new execution for a flow") @ApiResponse(responseCode = "409", description = "if the flow is disabled") public Execution trigger( @@ -558,7 +525,7 @@ protected HttpResponse validateFile(String executionId, URI path, String } @ExecuteOn(TaskExecutors.IO) - @Get(uri = "executions/{executionId}/file", produces = MediaType.APPLICATION_OCTET_STREAM) + @Get(uri = "/{executionId}/file", produces = MediaType.APPLICATION_OCTET_STREAM) @Operation(tags = {"Executions"}, summary = "Download file for an execution") public HttpResponse file( @Parameter(description = "The execution id") @PathVariable String executionId, @@ -576,7 +543,7 @@ public HttpResponse file( } @ExecuteOn(TaskExecutors.IO) - @Get(uri = "executions/{executionId}/file/metas", produces = MediaType.TEXT_JSON) + @Get(uri = "/{executionId}/file/metas", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Get file meta information for an execution") public HttpResponse filesize( @Parameter(description = "The execution id") @PathVariable String executionId, @@ -594,7 +561,7 @@ public HttpResponse filesize( } @ExecuteOn(TaskExecutors.IO) - @Post(uri = "executions/{executionId}/restart", produces = MediaType.TEXT_JSON) + @Post(uri = "/{executionId}/restart", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Restart a new execution from an old one") public Execution restart( @Parameter(description = "The execution id") @PathVariable String executionId, @@ -614,7 +581,7 @@ public Execution restart( } @ExecuteOn(TaskExecutors.IO) - @Post(uri = "executions/restart/by-ids", produces = MediaType.TEXT_JSON) + @Post(uri = "/restart/by-ids", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Restart a list of executions") @ApiResponse(responseCode = "200", description = "On success", content = {@Content(schema = @Schema(implementation = BulkResponse.class))}) @ApiResponse(responseCode = "422", description = "Restarted with errors", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))}) @@ -665,7 +632,7 @@ public MutableHttpResponse restartByIds( } @ExecuteOn(TaskExecutors.IO) - @Post(uri = "executions/restart/by-query", produces = MediaType.TEXT_JSON) + @Post(uri = "/restart/by-query", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Restart executions filter by query parameters") public HttpResponse restartByQuery( @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query, @@ -699,7 +666,7 @@ public HttpResponse restartByQuery( } @ExecuteOn(TaskExecutors.IO) - @Post(uri = "executions/{executionId}/replay", produces = MediaType.TEXT_JSON) + @Post(uri = "/{executionId}/replay", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Create a new execution from an old one and start it from a specified task run id") public Execution replay( @Parameter(description = "the original execution id to clone") @PathVariable String executionId, @@ -737,7 +704,7 @@ private void controlRevision(Execution execution, Integer revision) { } @ExecuteOn(TaskExecutors.IO) - @Post(uri = "executions/{executionId}/state", produces = MediaType.TEXT_JSON) + @Post(uri = "/{executionId}/state", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Change state for a taskrun in an execution") public Execution changeState( @Parameter(description = "The execution id") @PathVariable String executionId, @@ -762,7 +729,7 @@ public static class StateRequest { } @ExecuteOn(TaskExecutors.IO) - @Delete(uri = "executions/{executionId}/kill", produces = MediaType.TEXT_JSON) + @Delete(uri = "/{executionId}/kill", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Kill an execution") @ApiResponse(responseCode = "204", description = "On success") @ApiResponse(responseCode = "409", description = "if the executions is already finished") @@ -797,7 +764,7 @@ public HttpResponse kill( } @ExecuteOn(TaskExecutors.IO) - @Post(uri = "executions/{executionId}/resume", produces = MediaType.TEXT_JSON) + @Post(uri = "/{executionId}/resume", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Resume a paused execution.") @ApiResponse(responseCode = "204", description = "On success") @ApiResponse(responseCode = "409", description = "if the executions is not paused") @@ -820,7 +787,7 @@ public HttpResponse resume( } @ExecuteOn(TaskExecutors.IO) - @Delete(uri = "executions/kill/by-ids", produces = MediaType.TEXT_JSON) + @Delete(uri = "/kill/by-ids", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Kill a list of executions") @ApiResponse(responseCode = "200", description = "On success", content = {@Content(schema = @Schema(implementation = BulkResponse.class))}) @ApiResponse(responseCode = "422", description = "Killed with errors", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))}) @@ -884,7 +851,7 @@ public MutableHttpResponse killByIds( } @ExecuteOn(TaskExecutors.IO) - @Delete(uri = "executions/kill/by-query", produces = MediaType.TEXT_JSON) + @Delete(uri = "/kill/by-query", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Kill executions filter by query parameters") public HttpResponse killByQuery( @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query, @@ -918,7 +885,7 @@ private boolean isStopFollow(Flow flow, Execution execution) { } @ExecuteOn(TaskExecutors.IO) - @Get(uri = "executions/{executionId}/follow", produces = MediaType.TEXT_EVENT_STREAM) + @Get(uri = "/{executionId}/follow", produces = MediaType.TEXT_EVENT_STREAM) @Operation(tags = {"Executions"}, summary = "Follow an execution") public Flowable> follow( @Parameter(description = "The execution id") @PathVariable String executionId @@ -971,7 +938,7 @@ public Flowable> follow( } @ExecuteOn(TaskExecutors.IO) - @Get(uri = "executions/{executionId}/file/preview", produces = MediaType.APPLICATION_JSON) + @Get(uri = "/{executionId}/file/preview", produces = MediaType.APPLICATION_JSON) @Operation(tags = {"Executions"}, summary = "Get file preview for an execution") public HttpResponse filePreview( @Parameter(description = "The execution id") @PathVariable String executionId, diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/TaskRunController.java b/webserver/src/main/java/io/kestra/webserver/controllers/TaskRunController.java new file mode 100644 index 00000000000..09d5534efd7 --- /dev/null +++ b/webserver/src/main/java/io/kestra/webserver/controllers/TaskRunController.java @@ -0,0 +1,62 @@ +package io.kestra.webserver.controllers; + +import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.flows.State; +import io.kestra.core.repositories.ExecutionRepositoryInterface; +import io.kestra.webserver.responses.PagedResults; +import io.kestra.webserver.utils.PageableUtils; +import io.micronaut.context.annotation.Requires; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.convert.format.Format; +import io.micronaut.http.MediaType; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.QueryValue; +import io.micronaut.scheduling.TaskExecutors; +import io.micronaut.scheduling.annotation.ExecuteOn; +import io.swagger.v3.oas.annotations.Hidden; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import jakarta.inject.Inject; + +import java.time.ZonedDateTime; +import java.util.List; + +@Controller("/api/v1/taskruns") +@Requires(property = "kestra.repository.type", value = "elasticsearch") +public class TaskRunController { + @Inject + protected ExecutionRepositoryInterface executionRepository; + + @ExecuteOn(TaskExecutors.IO) + @Get(uri = "/search", produces = MediaType.TEXT_JSON) + @Operation(tags = {"Executions"}, summary = "Search for taskruns") + public PagedResults findTaskRun( + @Parameter(description = "The current page") @QueryValue(defaultValue = "1") int page, + @Parameter(description = "The current page size") @QueryValue(defaultValue = "10") int size, + @Parameter(description = "The sort of current page") @Nullable @QueryValue List sort, + @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query, + @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace, + @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId, + @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate, + @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate, + @Parameter(description = "A state filter") @Nullable @QueryValue List state + ) { + return PagedResults.of(executionRepository.findTaskRun( + PageableUtils.from(page, size, sort, executionRepository.sortMapping()), + query, + namespace, + flowId, + startDate, + endDate, + state + )); + } + + @ExecuteOn(TaskExecutors.IO) + @Get(uri = "/maxTaskRunSetting") + @Hidden + public Integer maxTaskRunSetting() { + return executionRepository.maxTaskRunSetting(); + } +} diff --git a/webserver/src/test/java/io/kestra/webserver/controllers/TaskRunControllerTest.java b/webserver/src/test/java/io/kestra/webserver/controllers/TaskRunControllerTest.java new file mode 100644 index 00000000000..2c2c9f6bdb3 --- /dev/null +++ b/webserver/src/test/java/io/kestra/webserver/controllers/TaskRunControllerTest.java @@ -0,0 +1,40 @@ +package io.kestra.webserver.controllers; + +import io.kestra.webserver.controllers.h2.JdbcH2ControllerTest; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpStatus; +import io.micronaut.http.client.annotation.Client; +import io.micronaut.http.client.exceptions.HttpClientResponseException; +import io.micronaut.rxjava2.http.client.RxHttpClient; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.*; + +class TaskRunControllerTest extends JdbcH2ControllerTest { + @Inject + @Client("/") + private RxHttpClient client; + + @Test + void search() { + HttpClientResponseException e = assertThrows( + HttpClientResponseException.class, + () -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/taskruns/search")) + ); + + assertThat(e.getStatus(), is(HttpStatus.NOT_FOUND)); + } + + @Test + void maxTaskRunSetting() { + HttpClientResponseException e = assertThrows( + HttpClientResponseException.class, + () -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/taskruns/maxTaskRunSetting")) + ); + + assertThat(e.getStatus(), is(HttpStatus.NOT_FOUND)); + } +} \ No newline at end of file