Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control-service: bugfix on the execution cancellation api #215

Merged
merged 7 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,13 @@ public void cancelRunningCronJob(String teamName, String jobName, String executi
log.debug("Catching exception because of issue https://github.com/kubernetes-client/java/issues/86", e);
else throw e;
} else throw e;

} catch (ApiException e) {
//If no response body is present this might be a transport layer failure.
if (e.getCode() == 404) {
log.debug("Job execution: {} team: {}, job: {} cannot be found. K8S response body {}. Will set its status to Cancelled in the DB.",
executionId, teamName, jobName, e.getResponseBody());
} else throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,50 +187,56 @@ public DataJobExecution readJobExecution(String teamName, String jobName, String
.orElseThrow(() -> new DataJobExecutionNotFoundException(executionId));
}

public void updateJobExecution(DataJob dataJob, KubernetesService.JobExecution jobExecution) {
public void updateJobExecution(final DataJob dataJob, final KubernetesService.JobExecution jobExecution) {
if (StringUtils.isBlank(jobExecution.getExecutionId())) {
log.warn("Could not store Data Job execution due to the missing execution id: {}", jobExecution);
return;
}

final Optional<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionPersistedOptional =
jobExecutionRepository.findById(jobExecution.getExecutionId());
final ExecutionStatus status = getJobExecutionStatus(jobExecution);
//This set contains all the statuses that should not be changed to something else if present in the DB.
//Using a hash set, because it allows null elements, no NullPointer when contains method called with null.
var finalStatusSet = new HashSet<>(List.of(ExecutionStatus.CANCELLED, ExecutionStatus.FAILED,
ExecutionStatus.FINISHED, ExecutionStatus.SKIPPED));

if (dataJobExecutionPersistedOptional.isPresent() &&
(dataJobExecutionPersistedOptional.get().getStatus() == status ||
finalStatusSet.contains(dataJobExecutionPersistedOptional.get().getStatus()))) {
return;
}

Optional<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionPersistedOptional =
jobExecutionRepository.findById(jobExecution.getExecutionId());
com.vmware.taurus.service.model.DataJobExecution.DataJobExecutionBuilder dataJobExecutionBuilder;
ExecutionStatus status = getJobExecutionStatus(jobExecution);
// Optimization:
// if there is an existing execution in the database and
// the status has not changed (the new status is equal to the old one)
// do not update the record
if (dataJobExecutionPersistedOptional.isPresent() && status != null && status.equals(dataJobExecutionPersistedOptional.get().getStatus())) {
return;
} else if (dataJobExecutionPersistedOptional.isPresent()) {
dataJobExecutionBuilder = dataJobExecutionPersistedOptional.get().toBuilder();
} else {
com.vmware.taurus.service.model.ExecutionType executionType =
ExecutionType.MANUAL.getValue().equals(jobExecution.getExecutionType()) ?
com.vmware.taurus.service.model.ExecutionType.MANUAL :
com.vmware.taurus.service.model.ExecutionType.SCHEDULED;
dataJobExecutionBuilder = com.vmware.taurus.service.model.DataJobExecution.builder()
.id(jobExecution.getExecutionId())
.dataJob(dataJob)
.type(executionType);
}
final com.vmware.taurus.service.model.DataJobExecution.DataJobExecutionBuilder dataJobExecutionBuilder =
dataJobExecutionPersistedOptional.isPresent() ?
dataJobExecutionPersistedOptional.get().toBuilder() :
com.vmware.taurus.service.model.DataJobExecution.builder()
.id(jobExecution.getExecutionId())
.dataJob(dataJob)
.type(ExecutionType.MANUAL.getValue().equals(jobExecution.getExecutionType()) ?
com.vmware.taurus.service.model.ExecutionType.MANUAL :
com.vmware.taurus.service.model.ExecutionType.SCHEDULED);

com.vmware.taurus.service.model.DataJobExecution dataJobExecution = dataJobExecutionBuilder
.status(status)
.message(getJobExecutionApiMessage(status, jobExecution))
.opId(jobExecution.getOpId())
.startTime(jobExecution.getStartTime())
.endTime(jobExecution.getEndTime())
.vdkVersion("") // TODO [miroslavi] VDK version should come from the termination message
.jobVersion(jobExecution.getJobVersion())
.jobSchedule(jobExecution.getJobSchedule())
.resourcesCpuRequest(jobExecution.getResourcesCpuRequest())
.resourcesCpuLimit(jobExecution.getResourcesCpuLimit())
.resourcesMemoryRequest(jobExecution.getResourcesMemoryRequest())
.resourcesMemoryLimit(jobExecution.getResourcesMemoryLimit())
.lastDeployedDate(jobExecution.getDeployedDate())
.lastDeployedBy(jobExecution.getDeployedBy())
.build();
.status(status)
.message(getJobExecutionApiMessage(status, jobExecution))
.opId(jobExecution.getOpId())
.startTime(jobExecution.getStartTime())
.endTime(jobExecution.getEndTime())
.vdkVersion("") // TODO [miroslavi] VDK version should come from the termination message
.jobVersion(jobExecution.getJobVersion())
.jobSchedule(jobExecution.getJobSchedule())
.resourcesCpuRequest(jobExecution.getResourcesCpuRequest())
.resourcesCpuLimit(jobExecution.getResourcesCpuLimit())
.resourcesMemoryRequest(jobExecution.getResourcesMemoryRequest())
.resourcesMemoryLimit(jobExecution.getResourcesMemoryLimit())
.lastDeployedDate(jobExecution.getDeployedDate())
.lastDeployedBy(jobExecution.getDeployedBy())
.build();
jobExecutionRepository.save(dataJobExecution);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,25 @@

package com.vmware.taurus.execution;

import java.time.OffsetDateTime;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.RepositoryUtil;
import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.service.JobExecutionRepository;
import com.vmware.taurus.service.JobsRepository;
import com.vmware.taurus.service.KubernetesService;
import com.vmware.taurus.service.execution.JobExecutionService;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.model.ExecutionStatus;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.RepositoryUtil;
import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.service.JobsRepository;
import com.vmware.taurus.service.KubernetesService;
import com.vmware.taurus.service.execution.JobExecutionService;
import com.vmware.taurus.service.model.DataJob;
import java.time.OffsetDateTime;

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = ControlplaneApplication.class)
Expand All @@ -33,6 +35,9 @@ public class JobExecutionServiceUpdateExecutionIT {
@Autowired
private JobExecutionService jobExecutionService;

@Autowired
private JobExecutionRepository jobExecutionRepository;

@AfterEach
public void cleanUp() {
jobsRepository.deleteAll();
Expand Down Expand Up @@ -137,4 +142,81 @@ private void assertDataJobExecutionValid(
Assert.assertEquals(expectedJobExecution.getDeployedDate(), actualJobExecution.getDeployment().getDeployedDate());
Assert.assertEquals(expectedJobExecution.getDeployedBy(), actualJobExecution.getDeployment().getDeployedBy());
}

@Test
public void testUpdateJobExecution_DnStatusSubmittedAndUpdateStatusRunning_UpdateExpected() {
//control test which makes sure the status gets updated if in submitted
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.SUBMITTED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.RUNNING);
}

@Test
public void testUpdateJobExecution_DbStatusRunningAndUpdateStatusFinished_UpdateExpected() {
//control test which makes sure the status gets updated if in running
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.RUNNING, KubernetesService.JobExecution.Status.FINISHED, ExecutionStatus.FINISHED);
}

@Test
public void testUpdateJobExecution_DbStatusFinishedAndUpdateStatusRunning_NoUpdateExpected() {
//test which makes sure the status doesn't get updated if in finished
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.FINISHED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.FINISHED);
}

@Test
public void testUpdateJobExecution_DbStatusCancelledAndUpdateStatusRunning_NoUpdateExpected() {
//test which makes sure the status doesn't get updated if in cancelled
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.CANCELLED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.CANCELLED);
}

@Test
public void testUpdateJobExecution_DbStatusSkippedAndUpdateStatusRunning_NoUpdateExpected() {
//test which makes sure the status doesn't get updated if in skipped
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.SKIPPED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.SKIPPED);
}

@Test
public void testUpdateJobExecution_DbStatusFailedAndUpdateStatusRunning_NoUpdateExpected() {
//test which makes sure the status doesn't get updated if in failed
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.FAILED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.FAILED);
}

/**
* Helper method which tests if a data job execution status changes through the updateJobExecutionMethod when
* a data job already has an execution status written to the database. Writes a status to a data job
* execution and then attempts to update it with the provided status. Checks if status matches the expectedStatus
* param.
*
* @param statusPresentInDb the "previous" execution status
* @param attemptedStatusChange the attempted change execution status
* @param expectedStatus the expected execution status
*/
private void testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus statusPresentInDb,
KubernetesService.JobExecution.Status attemptedStatusChange,
ExecutionStatus expectedStatus) {
var actualDataJob = RepositoryUtil.createDataJob(jobsRepository);
var execution = RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-id", actualDataJob, statusPresentInDb);

KubernetesService.JobExecution attemptedExecutionUpdate = KubernetesService.JobExecution.builder()
.status(attemptedStatusChange)
.opId(execution.getOpId())
.executionId(execution.getId())
.executionType("manual")
.jobName(actualDataJob.getName())
.jobVersion("test_job_version")
.jobSchedule("test_job_schedule")
.startTime(OffsetDateTime.now())
.endTime(OffsetDateTime.now())
.resourcesCpuLimit(1f)
.resourcesCpuRequest(1f)
.resourcesMemoryLimit(1)
.resourcesMemoryRequest(1)
.deployedBy("test_deployed_by")
.deployedDate(OffsetDateTime.now())
.terminationMessage("TestMessage").build();

jobExecutionService.updateJobExecution(actualDataJob, attemptedExecutionUpdate);
var actualJobExecution = jobExecutionRepository.findById(attemptedExecutionUpdate.getExecutionId()).get();

Assertions.assertEquals(expectedStatus, actualJobExecution.getStatus());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,27 @@ public void testRecordJobExecutionStatus_existingDataJobAndExistingExecution_sho

@Test
@Order(17)
public void testRecordJobExecutionStatusSkipped_existingDataJobAndNonExistingExecution_shouldRecordExecution() {
JobExecution expectedJobExecution = buildJobExecutionStatus("data-job", "different-execution-id", null, JobExecution.Status.RUNNING);
dataJobStatusMonitor.recordJobExecutionStatus(expectedJobExecution);
Optional<DataJobExecution> actualJobExecution = jobExecutionRepository.findById(expectedJobExecution.getExecutionId());

assertDataJobExecutionValid(expectedJobExecution, actualJobExecution);
}

@Test
@Order(18)
public void testRecordJobExecutionStatusSkipped_existingDataJobAndExistingExecution_shouldRecordExecution() {
var expectedTerminationMessage = "Skipping job execution due to another parallel running execution.";
JobExecution expectedJobExecution = buildJobExecutionStatus("data-job", "execution-id", expectedTerminationMessage, JobExecution.Status.RUNNING);
JobExecution expectedJobExecution = buildJobExecutionStatus("data-job", "different-execution-id", expectedTerminationMessage, JobExecution.Status.SKIPPED);
dataJobStatusMonitor.recordJobExecutionStatus(expectedJobExecution);
Optional<DataJobExecution> actualJobExecution = jobExecutionRepository.findById(expectedJobExecution.getExecutionId());

assertDataJobExecutionValid(expectedJobExecution, actualJobExecution);
}

@Test
@Order(18)
@Order(19)
public void testRecordJobExecutionStatus_nonExistingDataJobAndNonExistingExecution_shouldNotRecordExecution() {
JobExecution jobExecution = buildJobExecutionStatus(randomId("data-job-"), randomId("job-"), PodTerminationMessage.SUCCESS.getValue());
dataJobStatusMonitor.recordJobExecutionStatus(jobExecution);
Expand Down