Skip to content

Commit

Permalink
control-service: bugfix on the execution cancellation api (#215)
Browse files Browse the repository at this point in the history
why: During testing, when making calls to the endpoint I noticed
that the job execution state would get overriden by the control
service. The cancel functionality would execute and set the status
of the execution to cancelled, but then it would get overriden by the
DataJobStatusMonitor functionality. Also as a user if I try to cancel
an execution in running state which is no longer present in K8S and the
API returns 404 (job not found) we would return an error and the execution
would still be in running state. Refactored the updateJobExecution method.
Decided against updating the status of job executions with a completed state
(Finished, Skipped, Cancelled, Failed).

what: Added a check in the update method which checks if the execution is
in completed and if it is, don't update it. Added another check in the
K8S call to check if the API returns 404. If it does, we mark the execution
as cancelled in the database. Added unit tests. Changed a unit test because
it was trying to update an execution in a completed state (Finished) to Running.

type: Bugfix

testing: CI/CD, ran tests locally, added/updated tests

Signed-off-by: Momchil Zhivkov [email protected]
  • Loading branch information
Momchil Z authored Sep 16, 2021
1 parent f04a309 commit 2b304d8
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,13 @@ public void cancelRunningCronJob(String teamName, String jobName, String executi
log.debug("Catching exception because of issue https://github.com/kubernetes-client/java/issues/86", e);
else throw e;
} else throw e;

} catch (ApiException e) {
//If no response body is present this might be a transport layer failure.
if (e.getCode() == 404) {
log.debug("Job execution: {} team: {}, job: {} cannot be found. K8S response body {}. Will set its status to Cancelled in the DB.",
executionId, teamName, jobName, e.getResponseBody());
} else throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,50 +187,56 @@ public DataJobExecution readJobExecution(String teamName, String jobName, String
.orElseThrow(() -> new DataJobExecutionNotFoundException(executionId));
}

public void updateJobExecution(DataJob dataJob, KubernetesService.JobExecution jobExecution) {
public void updateJobExecution(final DataJob dataJob, final KubernetesService.JobExecution jobExecution) {
if (StringUtils.isBlank(jobExecution.getExecutionId())) {
log.warn("Could not store Data Job execution due to the missing execution id: {}", jobExecution);
return;
}

final Optional<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionPersistedOptional =
jobExecutionRepository.findById(jobExecution.getExecutionId());
final ExecutionStatus status = getJobExecutionStatus(jobExecution);
//This set contains all the statuses that should not be changed to something else if present in the DB.
//Using a hash set, because it allows null elements, no NullPointer when contains method called with null.
var finalStatusSet = new HashSet<>(List.of(ExecutionStatus.CANCELLED, ExecutionStatus.FAILED,
ExecutionStatus.FINISHED, ExecutionStatus.SKIPPED));

if (dataJobExecutionPersistedOptional.isPresent() &&
(dataJobExecutionPersistedOptional.get().getStatus() == status ||
finalStatusSet.contains(dataJobExecutionPersistedOptional.get().getStatus()))) {
return;
}

Optional<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionPersistedOptional =
jobExecutionRepository.findById(jobExecution.getExecutionId());
com.vmware.taurus.service.model.DataJobExecution.DataJobExecutionBuilder dataJobExecutionBuilder;
ExecutionStatus status = getJobExecutionStatus(jobExecution);
// Optimization:
// if there is an existing execution in the database and
// the status has not changed (the new status is equal to the old one)
// do not update the record
if (dataJobExecutionPersistedOptional.isPresent() && status != null && status.equals(dataJobExecutionPersistedOptional.get().getStatus())) {
return;
} else if (dataJobExecutionPersistedOptional.isPresent()) {
dataJobExecutionBuilder = dataJobExecutionPersistedOptional.get().toBuilder();
} else {
com.vmware.taurus.service.model.ExecutionType executionType =
ExecutionType.MANUAL.getValue().equals(jobExecution.getExecutionType()) ?
com.vmware.taurus.service.model.ExecutionType.MANUAL :
com.vmware.taurus.service.model.ExecutionType.SCHEDULED;
dataJobExecutionBuilder = com.vmware.taurus.service.model.DataJobExecution.builder()
.id(jobExecution.getExecutionId())
.dataJob(dataJob)
.type(executionType);
}
final com.vmware.taurus.service.model.DataJobExecution.DataJobExecutionBuilder dataJobExecutionBuilder =
dataJobExecutionPersistedOptional.isPresent() ?
dataJobExecutionPersistedOptional.get().toBuilder() :
com.vmware.taurus.service.model.DataJobExecution.builder()
.id(jobExecution.getExecutionId())
.dataJob(dataJob)
.type(ExecutionType.MANUAL.getValue().equals(jobExecution.getExecutionType()) ?
com.vmware.taurus.service.model.ExecutionType.MANUAL :
com.vmware.taurus.service.model.ExecutionType.SCHEDULED);

com.vmware.taurus.service.model.DataJobExecution dataJobExecution = dataJobExecutionBuilder
.status(status)
.message(getJobExecutionApiMessage(status, jobExecution))
.opId(jobExecution.getOpId())
.startTime(jobExecution.getStartTime())
.endTime(jobExecution.getEndTime())
.vdkVersion("") // TODO [miroslavi] VDK version should come from the termination message
.jobVersion(jobExecution.getJobVersion())
.jobSchedule(jobExecution.getJobSchedule())
.resourcesCpuRequest(jobExecution.getResourcesCpuRequest())
.resourcesCpuLimit(jobExecution.getResourcesCpuLimit())
.resourcesMemoryRequest(jobExecution.getResourcesMemoryRequest())
.resourcesMemoryLimit(jobExecution.getResourcesMemoryLimit())
.lastDeployedDate(jobExecution.getDeployedDate())
.lastDeployedBy(jobExecution.getDeployedBy())
.build();
.status(status)
.message(getJobExecutionApiMessage(status, jobExecution))
.opId(jobExecution.getOpId())
.startTime(jobExecution.getStartTime())
.endTime(jobExecution.getEndTime())
.vdkVersion("") // TODO [miroslavi] VDK version should come from the termination message
.jobVersion(jobExecution.getJobVersion())
.jobSchedule(jobExecution.getJobSchedule())
.resourcesCpuRequest(jobExecution.getResourcesCpuRequest())
.resourcesCpuLimit(jobExecution.getResourcesCpuLimit())
.resourcesMemoryRequest(jobExecution.getResourcesMemoryRequest())
.resourcesMemoryLimit(jobExecution.getResourcesMemoryLimit())
.lastDeployedDate(jobExecution.getDeployedDate())
.lastDeployedBy(jobExecution.getDeployedBy())
.build();
jobExecutionRepository.save(dataJobExecution);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,25 @@

package com.vmware.taurus.execution;

import java.time.OffsetDateTime;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.RepositoryUtil;
import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.service.JobExecutionRepository;
import com.vmware.taurus.service.JobsRepository;
import com.vmware.taurus.service.KubernetesService;
import com.vmware.taurus.service.execution.JobExecutionService;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.model.ExecutionStatus;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.RepositoryUtil;
import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.service.JobsRepository;
import com.vmware.taurus.service.KubernetesService;
import com.vmware.taurus.service.execution.JobExecutionService;
import com.vmware.taurus.service.model.DataJob;
import java.time.OffsetDateTime;

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = ControlplaneApplication.class)
Expand All @@ -33,6 +35,9 @@ public class JobExecutionServiceUpdateExecutionIT {
@Autowired
private JobExecutionService jobExecutionService;

@Autowired
private JobExecutionRepository jobExecutionRepository;

@AfterEach
public void cleanUp() {
jobsRepository.deleteAll();
Expand Down Expand Up @@ -137,4 +142,81 @@ private void assertDataJobExecutionValid(
Assert.assertEquals(expectedJobExecution.getDeployedDate(), actualJobExecution.getDeployment().getDeployedDate());
Assert.assertEquals(expectedJobExecution.getDeployedBy(), actualJobExecution.getDeployment().getDeployedBy());
}

@Test
public void testUpdateJobExecution_DbStatusSubmittedAndUpdateStatusRunning_UpdateExpected() {
//control test which makes sure the status gets updated if in submitted
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.SUBMITTED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.RUNNING);
}

@Test
public void testUpdateJobExecution_DbStatusRunningAndUpdateStatusFinished_UpdateExpected() {
//control test which makes sure the status gets updated if in running
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.RUNNING, KubernetesService.JobExecution.Status.FINISHED, ExecutionStatus.FINISHED);
}

@Test
public void testUpdateJobExecution_DbStatusFinishedAndUpdateStatusRunning_NoUpdateExpected() {
//test which makes sure the status doesn't get updated if in finished
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.FINISHED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.FINISHED);
}

@Test
public void testUpdateJobExecution_DbStatusCancelledAndUpdateStatusRunning_NoUpdateExpected() {
//test which makes sure the status doesn't get updated if in cancelled
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.CANCELLED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.CANCELLED);
}

@Test
public void testUpdateJobExecution_DbStatusSkippedAndUpdateStatusRunning_NoUpdateExpected() {
//test which makes sure the status doesn't get updated if in skipped
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.SKIPPED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.SKIPPED);
}

@Test
public void testUpdateJobExecution_DbStatusFailedAndUpdateStatusRunning_NoUpdateExpected() {
//test which makes sure the status doesn't get updated if in failed
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.FAILED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.FAILED);
}

/**
* Helper method which tests if a data job execution status changes through the updateJobExecutionMethod when
* a data job already has an execution status written to the database. Writes a status to a data job
* execution and then attempts to update it with the provided status. Checks if status matches the expectedStatus
* param.
*
* @param statusPresentInDb the "previous" execution status
* @param attemptedStatusChange the attempted change execution status
* @param expectedStatus the expected execution status
*/
private void testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus statusPresentInDb,
KubernetesService.JobExecution.Status attemptedStatusChange,
ExecutionStatus expectedStatus) {
var actualDataJob = RepositoryUtil.createDataJob(jobsRepository);
var execution = RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-id", actualDataJob, statusPresentInDb);

KubernetesService.JobExecution attemptedExecutionUpdate = KubernetesService.JobExecution.builder()
.status(attemptedStatusChange)
.opId(execution.getOpId())
.executionId(execution.getId())
.executionType("manual")
.jobName(actualDataJob.getName())
.jobVersion("test_job_version")
.jobSchedule("test_job_schedule")
.startTime(OffsetDateTime.now())
.endTime(OffsetDateTime.now())
.resourcesCpuLimit(1f)
.resourcesCpuRequest(1f)
.resourcesMemoryLimit(1)
.resourcesMemoryRequest(1)
.deployedBy("test_deployed_by")
.deployedDate(OffsetDateTime.now())
.terminationMessage("TestMessage").build();

jobExecutionService.updateJobExecution(actualDataJob, attemptedExecutionUpdate);
var actualJobExecution = jobExecutionRepository.findById(attemptedExecutionUpdate.getExecutionId()).get();

Assertions.assertEquals(expectedStatus, actualJobExecution.getStatus());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,27 @@ public void testRecordJobExecutionStatus_existingDataJobAndExistingExecution_sho

@Test
@Order(17)
public void testRecordJobExecutionStatusSkipped_existingDataJobAndNonExistingExecution_shouldRecordExecution() {
JobExecution expectedJobExecution = buildJobExecutionStatus("data-job", "different-execution-id", null, JobExecution.Status.RUNNING);
dataJobStatusMonitor.recordJobExecutionStatus(expectedJobExecution);
Optional<DataJobExecution> actualJobExecution = jobExecutionRepository.findById(expectedJobExecution.getExecutionId());

assertDataJobExecutionValid(expectedJobExecution, actualJobExecution);
}

@Test
@Order(18)
public void testRecordJobExecutionStatusSkipped_existingDataJobAndExistingExecution_shouldRecordExecution() {
var expectedTerminationMessage = "Skipping job execution due to another parallel running execution.";
JobExecution expectedJobExecution = buildJobExecutionStatus("data-job", "execution-id", expectedTerminationMessage, JobExecution.Status.RUNNING);
JobExecution expectedJobExecution = buildJobExecutionStatus("data-job", "different-execution-id", expectedTerminationMessage, JobExecution.Status.SKIPPED);
dataJobStatusMonitor.recordJobExecutionStatus(expectedJobExecution);
Optional<DataJobExecution> actualJobExecution = jobExecutionRepository.findById(expectedJobExecution.getExecutionId());

assertDataJobExecutionValid(expectedJobExecution, actualJobExecution);
}

@Test
@Order(18)
@Order(19)
public void testRecordJobExecutionStatus_nonExistingDataJobAndNonExistingExecution_shouldNotRecordExecution() {
JobExecution jobExecution = buildJobExecutionStatus(randomId("data-job-"), randomId("job-"), PodTerminationMessage.SUCCESS.getValue());
dataJobStatusMonitor.recordJobExecutionStatus(jobExecution);
Expand Down

0 comments on commit 2b304d8

Please sign in to comment.