Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control-service: refactor job cancellation method due to 404 errors #1114

Merged
merged 8 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright 2021 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.datajobs.it;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.controlplane.model.data.DataJobVersion;
import com.vmware.taurus.datajobs.it.common.BaseIT;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.StringUtils;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;

import java.util.ArrayList;
import java.util.UUID;

import static com.vmware.taurus.datajobs.it.common.WebHookServerMockExtension.TEST_TEAM_NAME;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.user;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.header;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

@Slf4j
@Import({DataJobDeploymentCrudIT.TaskExecutorConfig.class})
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = ControlplaneApplication.class)
public class DataJobCancellationIT extends BaseIT {

private static final String TEST_JOB_NAME =
"cancellation-test-" + UUID.randomUUID().toString().substring(0, 8);
private static final Object DEPLOYMENT_ID = "testing-cancellation";

@AfterEach
public void cleanUp() throws Exception {
// delete job
mockMvc
.perform(
delete(
String.format(
"/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_NAME, TEST_JOB_NAME))
.with(user("user")))
.andExpect(status().isOk());

// Execute delete deployment
mockMvc
.perform(
delete(
String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
.with(user("user"))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isAccepted());
}

@BeforeEach
public void setup() throws Exception {
String dataJobRequestBody = getDataJobRequestBody(TEST_TEAM_NAME, TEST_JOB_NAME);

// Execute create job
mockMvc
.perform(
post(String.format("/data-jobs/for-team/%s/jobs", TEST_TEAM_NAME))
.with(user("user"))
.content(dataJobRequestBody)
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isCreated())
.andExpect(
header()
.string(
HttpHeaders.LOCATION,
lambdaMatcher(
s ->
s.endsWith(
String.format(
"/data-jobs/for-team/%s/jobs/%s",
TEST_TEAM_NAME, TEST_JOB_NAME)))));
}

@Test
public void testJobCancellation_createDeployExecuteAndCancelJob() throws Exception {
// Take the job zip as byte array
byte[] jobZipBinary =
IOUtils.toByteArray(
getClass().getClassLoader().getResourceAsStream("simple_job_cancel.zip"));

// Execute job upload with user
MvcResult jobUploadResult =
mockMvc
.perform(
post(String.format(
"/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_NAME, TEST_JOB_NAME))
.with(user("user"))
.content(jobZipBinary)
.contentType(MediaType.APPLICATION_OCTET_STREAM))
.andExpect(status().isOk())
.andReturn();

DataJobVersion testDataJobVersion =
new ObjectMapper()
.readValue(jobUploadResult.getResponse().getContentAsString(), DataJobVersion.class);
Assertions.assertNotNull(testDataJobVersion);

String testJobVersionSha = testDataJobVersion.getVersionSha();
Assertions.assertFalse(StringUtils.isBlank(testJobVersionSha));

// Setup
String dataJobDeploymentRequestBody = getDataJobDeploymentRequestBody(testJobVersionSha);

// Execute build and deploy job
mockMvc
.perform(
post(String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments", TEST_TEAM_NAME, TEST_JOB_NAME))
.with(user("user"))
.content(dataJobDeploymentRequestBody)
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isAccepted())
.andReturn();

// manually start job execution
mockMvc
.perform(
post(String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s/executions",
TEST_TEAM_NAME, TEST_JOB_NAME, TEST_JOB_DEPLOYMENT_ID))
.with(user("user"))
.contentType(MediaType.APPLICATION_JSON)
.content(
"{\n"
+ " \"args\": {\n"
+ " \"key\": \"value\"\n"
+ " },\n"
+ " \"started_by\": \"schedule/runtime\"\n"
+ "}"))
.andExpect(status().is(202))
.andReturn();

// wait for pod to initialize
Thread.sleep(10000);

// retrieve running job execution id.
var exc =
mockMvc
.perform(
get(String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s/executions",
TEST_TEAM_NAME, TEST_JOB_NAME, TEST_JOB_DEPLOYMENT_ID))
.with(user("user"))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn();

var gson = new Gson();
ArrayList<LinkedTreeMap> parsed =
gson.fromJson(exc.getResponse().getContentAsString(), ArrayList.class);
String executionId = (String) parsed.get(0).get("id");

// cancel running execution
mockMvc
.perform(
delete(
String.format(
"/data-jobs/for-team/%s/jobs/%s/executions/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, executionId))
.with(user("user"))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isOk());
}

@Test
public void testJobCancellation_nonExistingJob() throws Exception {

mockMvc
.perform(
delete(
String.format(
"/data-jobs/for-team/%s/jobs/%s/executions/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, "executionId"))
.with(user("user"))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isNotFound());
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,13 @@ List<DataJobExecution> findDataJobExecutionsByStatusInAndStartTimeBefore(
+ "GROUP BY dje.status, dje.dataJob")
List<DataJobExecutionStatusCount> countDataJobExecutionStatuses(
@Param("statuses") List<ExecutionStatus> statuses, @Param("dataJobs") List<String> dataJobs);

@Query(
"SELECT dje from DataJobExecution dje "
+ "LEFT JOIN DataJob dj ON dje.dataJob = dj.name "
+ "WHERE dje.id = :jobExecutionId "
+ "AND dj.name = :jobName "
+ "AND dj.jobConfig.team = :jobTeam")
Optional<DataJobExecution> findDataJobExecutionByIdAndTeamAndName(
String jobExecutionId, String jobName, String jobTeam);
}
Original file line number Diff line number Diff line change
Expand Up @@ -708,34 +708,35 @@ private void addExtraJobArgumentsToVdkContainer(
public void cancelRunningCronJob(String teamName, String jobName, String executionId)
throws ApiException {
log.info(
"K8S deleting job for team: {} data job name: {} execution: {}",
"K8S deleting job for team: {} data job name: {} execution: {} namespace: {}",
teamName,
jobName,
executionId);
executionId,
namespace);
try {
var operationResponse =
initBatchV1Api()
.deleteNamespacedJob(
.deleteNamespacedJobWithHttpInfo(
executionId, namespace, null, null, null, null, "Foreground", null);

// Status of the operation. One of: "Success" or "Failure"
if (operationResponse == null || operationResponse.getStatus() == null) {
if (operationResponse == null || operationResponse.getStatusCode() == 404) {
log.info(
"Execution: {} for data job: {} with team: {} not found! The data job has likely"
+ " completed before it could be cancelled.",
executionId,
teamName,
jobName);
jobName,
teamName);
throw new DataJobExecutionCannotBeCancelledException(
executionId, ExecutionCancellationFailureReason.DataJobExecutionNotFound);
} else if (operationResponse.getStatus().equals("Failure")) {
} else if (operationResponse.getStatusCode() != 200) {
log.warn(
"Failed to delete K8S job. Reason: {} Details: {}",
operationResponse.getReason(),
operationResponse.getDetails().toString());
operationResponse.getData().getReason(),
operationResponse.getData().getDetails());
throw new KubernetesException(
operationResponse.getMessage(),
new ApiException(operationResponse.getCode(), operationResponse.getMessage()));
operationResponse.getData().getMessage(),
new ApiException(
operationResponse.getStatusCode(), operationResponse.getData().getMessage()));
}
} catch (JsonSyntaxException e) {
if (e.getCause() instanceof IllegalStateException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,16 @@ public void cancelDataJobExecution(String teamName, String jobName, String execu
executionId, ExecutionCancellationFailureReason.DataJobNotFound);
}

var jobExecutionOptional = jobExecutionRepository.findById(executionId);
var jobExecutionOptional =
jobExecutionRepository.findDataJobExecutionByIdAndTeamAndName(
executionId, jobName, teamName);

if (jobExecutionOptional.isEmpty()) {
log.info(
"Execution: {} for data job: {} with team: {} not found!",
executionId,
teamName,
jobName);
jobName,
teamName);
throw new DataJobExecutionCannotBeCancelledException(
executionId, ExecutionCancellationFailureReason.DataJobExecutionNotFound);
}
Expand All @@ -162,8 +164,8 @@ public void cancelDataJobExecution(String teamName, String jobName, String execu
log.info(
"Trying to cancel execution: {} for data job: {} with team: {} but job has status {}!",
executionId,
teamName,
jobName,
teamName,
jobStatus.toString());
throw new DataJobExecutionCannotBeCancelledException(
executionId, ExecutionCancellationFailureReason.ExecutionNotRunning);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.vmware.taurus.service;

import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.ApiResponse;
import io.kubernetes.client.openapi.apis.BatchV1Api;
import io.kubernetes.client.openapi.models.V1Status;
import io.kubernetes.client.openapi.models.V1StatusDetails;
Expand Down Expand Up @@ -36,7 +37,8 @@ public void testIsRunningJob_nullResponse_shouldThrowDataJobExecutionCannotBeCan
public void
testIsRunningJob_notNullResponseAndNullStatus_shouldThrowDataJobExecutionCannotBeCancelledException()
throws ApiException {
KubernetesService kubernetesService = mockKubernetesService(new V1Status().status(null));
KubernetesService kubernetesService =
mockKubernetesService(new V1Status().status(null).code(404));

Assertions.assertThrows(
DataJobExecutionCannotBeCancelledException.class,
Expand All @@ -48,7 +50,7 @@ public void testIsRunningJob_nullResponse_shouldThrowDataJobExecutionCannotBeCan
@Test
public void testIsRunningJob_notNullResponseAndStatusSuccess_shouldNotThrowException()
throws ApiException {
KubernetesService kubernetesService = mockKubernetesService(new V1Status().status("Success"));
KubernetesService kubernetesService = mockKubernetesService(new V1Status().code(200));

Assertions.assertDoesNotThrow(
() ->
Expand Down Expand Up @@ -85,9 +87,13 @@ private KubernetesService mockKubernetesService(V1Status v1Status) throws ApiExc
.when(kubernetesService)
.cancelRunningCronJob(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());

ApiResponse<V1Status> response = Mockito.mock(ApiResponse.class);
Mockito.when(response.getData()).thenReturn(v1Status);
Mockito.when(response.getStatusCode()).thenReturn(v1Status == null ? 404 : v1Status.getCode());

BatchV1Api batchV1Api = Mockito.mock(BatchV1Api.class);
Mockito.when(
batchV1Api.deleteNamespacedJob(
batchV1Api.deleteNamespacedJobWithHttpInfo(
Mockito.anyString(),
Mockito.isNull(),
Mockito.isNull(),
Expand All @@ -96,7 +102,7 @@ private KubernetesService mockKubernetesService(V1Status v1Status) throws ApiExc
Mockito.isNull(),
Mockito.anyString(),
Mockito.isNull()))
.thenReturn(v1Status);
.thenReturn(response);

Mockito.when(kubernetesService.initBatchV1Api()).thenReturn(batchV1Api);

Expand Down