diff --git a/projects/control-service/projects/pipelines_control_service/src/integration-test/java/com/vmware/taurus/datajobs/it/DataJobCancellationIT.java b/projects/control-service/projects/pipelines_control_service/src/integration-test/java/com/vmware/taurus/datajobs/it/DataJobCancellationIT.java new file mode 100644 index 0000000000..4ac41b3390 --- /dev/null +++ b/projects/control-service/projects/pipelines_control_service/src/integration-test/java/com/vmware/taurus/datajobs/it/DataJobCancellationIT.java @@ -0,0 +1,200 @@ +/* + * Copyright 2021 VMware, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.vmware.taurus.datajobs.it; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import com.google.gson.internal.LinkedTreeMap; +import com.vmware.taurus.ControlplaneApplication; +import com.vmware.taurus.controlplane.model.data.DataJobVersion; +import com.vmware.taurus.datajobs.it.common.BaseIT; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.platform.commons.util.StringUtils; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Import; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.MvcResult; + +import java.util.ArrayList; +import java.util.UUID; + +import static com.vmware.taurus.datajobs.it.common.WebHookServerMockExtension.TEST_TEAM_NAME; +import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.user; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.header; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@Slf4j +@Import({DataJobDeploymentCrudIT.TaskExecutorConfig.class}) +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + classes = ControlplaneApplication.class) +public class DataJobCancellationIT extends BaseIT { + + private static final String TEST_JOB_NAME = + "cancellation-test-" + UUID.randomUUID().toString().substring(0, 8); + private static final Object DEPLOYMENT_ID = "testing-cancellation"; + + @AfterEach + public void cleanUp() throws Exception { + // delete job + mockMvc + .perform( + delete( + String.format( + "/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_NAME, TEST_JOB_NAME)) + .with(user("user"))) + .andExpect(status().isOk()); + + // Execute delete deployment + mockMvc + .perform( + delete( + String.format( + "/data-jobs/for-team/%s/jobs/%s/deployments/%s", + TEST_TEAM_NAME, TEST_JOB_NAME, DEPLOYMENT_ID)) + .with(user("user")) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isAccepted()); + } + + @BeforeEach + public void setup() throws Exception { + String dataJobRequestBody = getDataJobRequestBody(TEST_TEAM_NAME, TEST_JOB_NAME); + + // Execute create job + mockMvc + .perform( + post(String.format("/data-jobs/for-team/%s/jobs", TEST_TEAM_NAME)) + .with(user("user")) + .content(dataJobRequestBody) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isCreated()) + .andExpect( + header() + .string( + HttpHeaders.LOCATION, + lambdaMatcher( + s -> + s.endsWith( + String.format( + "/data-jobs/for-team/%s/jobs/%s", + TEST_TEAM_NAME, TEST_JOB_NAME))))); + } + + @Test + public void testJobCancellation_createDeployExecuteAndCancelJob() throws Exception { + // Take the job zip as byte array + byte[] jobZipBinary = + IOUtils.toByteArray( + getClass().getClassLoader().getResourceAsStream("simple_job_cancel.zip")); + + // Execute job upload with user + MvcResult jobUploadResult = + mockMvc + .perform( + post(String.format( + "/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_NAME, TEST_JOB_NAME)) + .with(user("user")) + .content(jobZipBinary) + .contentType(MediaType.APPLICATION_OCTET_STREAM)) + .andExpect(status().isOk()) + .andReturn(); + + DataJobVersion testDataJobVersion = + new ObjectMapper() + .readValue(jobUploadResult.getResponse().getContentAsString(), DataJobVersion.class); + Assertions.assertNotNull(testDataJobVersion); + + String testJobVersionSha = testDataJobVersion.getVersionSha(); + Assertions.assertFalse(StringUtils.isBlank(testJobVersionSha)); + + // Setup + String dataJobDeploymentRequestBody = getDataJobDeploymentRequestBody(testJobVersionSha); + + // Execute build and deploy job + mockMvc + .perform( + post(String.format( + "/data-jobs/for-team/%s/jobs/%s/deployments", TEST_TEAM_NAME, TEST_JOB_NAME)) + .with(user("user")) + .content(dataJobDeploymentRequestBody) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isAccepted()) + .andReturn(); + + // manually start job execution + mockMvc + .perform( + post(String.format( + "/data-jobs/for-team/%s/jobs/%s/deployments/%s/executions", + TEST_TEAM_NAME, TEST_JOB_NAME, TEST_JOB_DEPLOYMENT_ID)) + .with(user("user")) + .contentType(MediaType.APPLICATION_JSON) + .content( + "{\n" + + " \"args\": {\n" + + " \"key\": \"value\"\n" + + " },\n" + + " \"started_by\": \"schedule/runtime\"\n" + + "}")) + .andExpect(status().is(202)) + .andReturn(); + + // wait for pod to initialize + Thread.sleep(10000); + + // retrieve running job execution id. + var exc = + mockMvc + .perform( + get(String.format( + "/data-jobs/for-team/%s/jobs/%s/deployments/%s/executions", + TEST_TEAM_NAME, TEST_JOB_NAME, TEST_JOB_DEPLOYMENT_ID)) + .with(user("user")) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn(); + + var gson = new Gson(); + ArrayList parsed = + gson.fromJson(exc.getResponse().getContentAsString(), ArrayList.class); + String executionId = (String) parsed.get(0).get("id"); + + // cancel running execution + mockMvc + .perform( + delete( + String.format( + "/data-jobs/for-team/%s/jobs/%s/executions/%s", + TEST_TEAM_NAME, TEST_JOB_NAME, executionId)) + .with(user("user")) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()); + } + + @Test + public void testJobCancellation_nonExistingJob() throws Exception { + + mockMvc + .perform( + delete( + String.format( + "/data-jobs/for-team/%s/jobs/%s/executions/%s", + TEST_TEAM_NAME, TEST_JOB_NAME, "executionId")) + .with(user("user")) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isNotFound()); + } +} diff --git a/projects/control-service/projects/pipelines_control_service/src/integration-test/resources/simple_job_cancel.zip b/projects/control-service/projects/pipelines_control_service/src/integration-test/resources/simple_job_cancel.zip new file mode 100644 index 0000000000..f1eba9b57a Binary files /dev/null and b/projects/control-service/projects/pipelines_control_service/src/integration-test/resources/simple_job_cancel.zip differ diff --git a/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/JobExecutionRepository.java b/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/JobExecutionRepository.java index 4d11e860db..db6e6e3589 100644 --- a/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/JobExecutionRepository.java +++ b/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/JobExecutionRepository.java @@ -53,4 +53,13 @@ List findDataJobExecutionsByStatusInAndStartTimeBefore( + "GROUP BY dje.status, dje.dataJob") List countDataJobExecutionStatuses( @Param("statuses") List statuses, @Param("dataJobs") List dataJobs); + + @Query( + "SELECT dje from DataJobExecution dje " + + "LEFT JOIN DataJob dj ON dje.dataJob = dj.name " + + "WHERE dje.id = :jobExecutionId " + + "AND dj.name = :jobName " + + "AND dj.jobConfig.team = :jobTeam") + Optional findDataJobExecutionByIdAndTeamAndName( + String jobExecutionId, String jobName, String jobTeam); } diff --git a/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/KubernetesService.java b/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/KubernetesService.java index d2576e93ce..5a0b61fa82 100644 --- a/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/KubernetesService.java +++ b/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/KubernetesService.java @@ -708,34 +708,35 @@ private void addExtraJobArgumentsToVdkContainer( public void cancelRunningCronJob(String teamName, String jobName, String executionId) throws ApiException { log.info( - "K8S deleting job for team: {} data job name: {} execution: {}", + "K8S deleting job for team: {} data job name: {} execution: {} namespace: {}", teamName, jobName, - executionId); + executionId, + namespace); try { var operationResponse = initBatchV1Api() - .deleteNamespacedJob( + .deleteNamespacedJobWithHttpInfo( executionId, namespace, null, null, null, null, "Foreground", null); - // Status of the operation. One of: "Success" or "Failure" - if (operationResponse == null || operationResponse.getStatus() == null) { + if (operationResponse == null || operationResponse.getStatusCode() == 404) { log.info( "Execution: {} for data job: {} with team: {} not found! The data job has likely" + " completed before it could be cancelled.", executionId, - teamName, - jobName); + jobName, + teamName); throw new DataJobExecutionCannotBeCancelledException( executionId, ExecutionCancellationFailureReason.DataJobExecutionNotFound); - } else if (operationResponse.getStatus().equals("Failure")) { + } else if (operationResponse.getStatusCode() != 200) { log.warn( "Failed to delete K8S job. Reason: {} Details: {}", - operationResponse.getReason(), - operationResponse.getDetails().toString()); + operationResponse.getData().getReason(), + operationResponse.getData().getDetails()); throw new KubernetesException( - operationResponse.getMessage(), - new ApiException(operationResponse.getCode(), operationResponse.getMessage())); + operationResponse.getData().getMessage(), + new ApiException( + operationResponse.getStatusCode(), operationResponse.getData().getMessage())); } } catch (JsonSyntaxException e) { if (e.getCause() instanceof IllegalStateException) { diff --git a/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/execution/JobExecutionService.java b/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/execution/JobExecutionService.java index 5dc6dcaf2e..d3d1486335 100644 --- a/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/execution/JobExecutionService.java +++ b/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/execution/JobExecutionService.java @@ -142,14 +142,16 @@ public void cancelDataJobExecution(String teamName, String jobName, String execu executionId, ExecutionCancellationFailureReason.DataJobNotFound); } - var jobExecutionOptional = jobExecutionRepository.findById(executionId); + var jobExecutionOptional = + jobExecutionRepository.findDataJobExecutionByIdAndTeamAndName( + executionId, jobName, teamName); if (jobExecutionOptional.isEmpty()) { log.info( "Execution: {} for data job: {} with team: {} not found!", executionId, - teamName, - jobName); + jobName, + teamName); throw new DataJobExecutionCannotBeCancelledException( executionId, ExecutionCancellationFailureReason.DataJobExecutionNotFound); } @@ -162,8 +164,8 @@ public void cancelDataJobExecution(String teamName, String jobName, String execu log.info( "Trying to cancel execution: {} for data job: {} with team: {} but job has status {}!", executionId, - teamName, jobName, + teamName, jobStatus.toString()); throw new DataJobExecutionCannotBeCancelledException( executionId, ExecutionCancellationFailureReason.ExecutionNotRunning); diff --git a/projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/KubernetesServiceCancelRunningCronJobTest.java b/projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/KubernetesServiceCancelRunningCronJobTest.java index 7a9cec86db..c6e91a2332 100644 --- a/projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/KubernetesServiceCancelRunningCronJobTest.java +++ b/projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/KubernetesServiceCancelRunningCronJobTest.java @@ -6,6 +6,7 @@ package com.vmware.taurus.service; import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.ApiResponse; import io.kubernetes.client.openapi.apis.BatchV1Api; import io.kubernetes.client.openapi.models.V1Status; import io.kubernetes.client.openapi.models.V1StatusDetails; @@ -36,7 +37,8 @@ public void testIsRunningJob_nullResponse_shouldThrowDataJobExecutionCannotBeCan public void testIsRunningJob_notNullResponseAndNullStatus_shouldThrowDataJobExecutionCannotBeCancelledException() throws ApiException { - KubernetesService kubernetesService = mockKubernetesService(new V1Status().status(null)); + KubernetesService kubernetesService = + mockKubernetesService(new V1Status().status(null).code(404)); Assertions.assertThrows( DataJobExecutionCannotBeCancelledException.class, @@ -48,7 +50,7 @@ public void testIsRunningJob_nullResponse_shouldThrowDataJobExecutionCannotBeCan @Test public void testIsRunningJob_notNullResponseAndStatusSuccess_shouldNotThrowException() throws ApiException { - KubernetesService kubernetesService = mockKubernetesService(new V1Status().status("Success")); + KubernetesService kubernetesService = mockKubernetesService(new V1Status().code(200)); Assertions.assertDoesNotThrow( () -> @@ -85,9 +87,13 @@ private KubernetesService mockKubernetesService(V1Status v1Status) throws ApiExc .when(kubernetesService) .cancelRunningCronJob(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); + ApiResponse response = Mockito.mock(ApiResponse.class); + Mockito.when(response.getData()).thenReturn(v1Status); + Mockito.when(response.getStatusCode()).thenReturn(v1Status == null ? 404 : v1Status.getCode()); + BatchV1Api batchV1Api = Mockito.mock(BatchV1Api.class); Mockito.when( - batchV1Api.deleteNamespacedJob( + batchV1Api.deleteNamespacedJobWithHttpInfo( Mockito.anyString(), Mockito.isNull(), Mockito.isNull(), @@ -96,7 +102,7 @@ private KubernetesService mockKubernetesService(V1Status v1Status) throws ApiExc Mockito.isNull(), Mockito.anyString(), Mockito.isNull())) - .thenReturn(v1Status); + .thenReturn(response); Mockito.when(kubernetesService.initBatchV1Api()).thenReturn(batchV1Api);