Skip to content

Commit

Permalink
comments and refactor
Browse files Browse the repository at this point in the history
Signed-off-by: mrMoZ1 <[email protected]>
  • Loading branch information
mrMoZ1 committed Sep 15, 2021
1 parent 43c57f0 commit 3591dcf
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,9 @@ public void cancelRunningCronJob(String teamName, String jobName, String executi

} catch (ApiException e) {
//If no response body is present this might be a transport layer failure.
if (e.getCode() == 404 && e.getResponseBody() != null) {
log.debug("Job execution: {} team: {}, job: {} cannot be found in K8S. Will set its status to Cancelled in the DB.",
executionId, teamName, jobName);
log.debug("Catching ApiException, because response code is 404 and has response body.", e);
if (e.getCode() == 404) {
log.debug("Job execution: {} team: {}, job: {} cannot be found. K8S response body {}. Will set its status to Cancelled in the DB.",
executionId, teamName, jobName, e.getResponseBody());
} else throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,56 +187,56 @@ public DataJobExecution readJobExecution(String teamName, String jobName, String
.orElseThrow(() -> new DataJobExecutionNotFoundException(executionId));
}

public void updateJobExecution(DataJob dataJob, KubernetesService.JobExecution jobExecution) {
public void updateJobExecution(final DataJob dataJob, final KubernetesService.JobExecution jobExecution) {
if (StringUtils.isBlank(jobExecution.getExecutionId())) {
log.warn("Could not store Data Job execution due to the missing execution id: {}", jobExecution);
return;
}

final Optional<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionPersistedOptional =
jobExecutionRepository.findById(jobExecution.getExecutionId());
final ExecutionStatus status = getJobExecutionStatus(jobExecution);
//This set contains all the statuses that should not be changed to something else if present in the DB.
//Using a hash set, because it allows null elements, no NullPointer when contains method called with null.
var finalStatusSet = new HashSet<>(List.of(ExecutionStatus.CANCELLED, ExecutionStatus.FAILED,
ExecutionStatus.FINISHED, ExecutionStatus.SKIPPED));

if (dataJobExecutionPersistedOptional.isPresent() &&
(dataJobExecutionPersistedOptional.get().getStatus() == status ||
finalStatusSet.contains(dataJobExecutionPersistedOptional.get().getStatus()))) {
return;
}

Optional<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionPersistedOptional =
jobExecutionRepository.findById(jobExecution.getExecutionId());
com.vmware.taurus.service.model.DataJobExecution.DataJobExecutionBuilder dataJobExecutionBuilder;
ExecutionStatus status = getJobExecutionStatus(jobExecution);
// Optimization:
// if there is an existing execution in the database and
// the status has not changed (the new status is equal to the old one)
// do not update the record. Also, do not update record in case ExecutionStatus is Cancelled
// as it can only be set in the control-service. This way we don't override with older statuses.
if (dataJobExecutionPersistedOptional.isPresent()) {

if ((status != null && status.equals(dataJobExecutionPersistedOptional.get().getStatus()))
|| (dataJobExecutionPersistedOptional.get().getStatus() == ExecutionStatus.CANCELLED)) {
return;
} else {
dataJobExecutionBuilder = dataJobExecutionPersistedOptional.get().toBuilder();
}

} else {
com.vmware.taurus.service.model.ExecutionType executionType =
ExecutionType.MANUAL.getValue().equals(jobExecution.getExecutionType()) ?
com.vmware.taurus.service.model.ExecutionType.MANUAL :
com.vmware.taurus.service.model.ExecutionType.SCHEDULED;
dataJobExecutionBuilder = com.vmware.taurus.service.model.DataJobExecution.builder()
.id(jobExecution.getExecutionId())
.dataJob(dataJob)
.type(executionType);
}
// do not update the record
final com.vmware.taurus.service.model.DataJobExecution.DataJobExecutionBuilder dataJobExecutionBuilder =
dataJobExecutionPersistedOptional.isPresent() ?
dataJobExecutionPersistedOptional.get().toBuilder() :
com.vmware.taurus.service.model.DataJobExecution.builder()
.id(jobExecution.getExecutionId())
.dataJob(dataJob)
.type(ExecutionType.MANUAL.getValue().equals(jobExecution.getExecutionType()) ?
com.vmware.taurus.service.model.ExecutionType.MANUAL :
com.vmware.taurus.service.model.ExecutionType.SCHEDULED);

com.vmware.taurus.service.model.DataJobExecution dataJobExecution = dataJobExecutionBuilder
.status(status)
.message(getJobExecutionApiMessage(status, jobExecution))
.opId(jobExecution.getOpId())
.startTime(jobExecution.getStartTime())
.endTime(jobExecution.getEndTime())
.vdkVersion("") // TODO [miroslavi] VDK version should come from the termination message
.jobVersion(jobExecution.getJobVersion())
.jobSchedule(jobExecution.getJobSchedule())
.resourcesCpuRequest(jobExecution.getResourcesCpuRequest())
.resourcesCpuLimit(jobExecution.getResourcesCpuLimit())
.resourcesMemoryRequest(jobExecution.getResourcesMemoryRequest())
.resourcesMemoryLimit(jobExecution.getResourcesMemoryLimit())
.lastDeployedDate(jobExecution.getDeployedDate())
.lastDeployedBy(jobExecution.getDeployedBy())
.build();
.status(status)
.message(getJobExecutionApiMessage(status, jobExecution))
.opId(jobExecution.getOpId())
.startTime(jobExecution.getStartTime())
.endTime(jobExecution.getEndTime())
.vdkVersion("") // TODO [miroslavi] VDK version should come from the termination message
.jobVersion(jobExecution.getJobVersion())
.jobSchedule(jobExecution.getJobSchedule())
.resourcesCpuRequest(jobExecution.getResourcesCpuRequest())
.resourcesCpuLimit(jobExecution.getResourcesCpuLimit())
.resourcesMemoryRequest(jobExecution.getResourcesMemoryRequest())
.resourcesMemoryLimit(jobExecution.getResourcesMemoryLimit())
.lastDeployedDate(jobExecution.getDeployedDate())
.lastDeployedBy(jobExecution.getDeployedBy())
.build();
jobExecutionRepository.save(dataJobExecution);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,25 @@

package com.vmware.taurus.execution;

import java.time.OffsetDateTime;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.RepositoryUtil;
import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.service.JobExecutionRepository;
import com.vmware.taurus.service.JobsRepository;
import com.vmware.taurus.service.KubernetesService;
import com.vmware.taurus.service.execution.JobExecutionService;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.model.ExecutionStatus;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.RepositoryUtil;
import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.service.JobsRepository;
import com.vmware.taurus.service.KubernetesService;
import com.vmware.taurus.service.execution.JobExecutionService;
import com.vmware.taurus.service.model.DataJob;
import java.time.OffsetDateTime;

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = ControlplaneApplication.class)
Expand All @@ -33,9 +35,13 @@ public class JobExecutionServiceUpdateExecutionIT {
@Autowired
private JobExecutionService jobExecutionService;

@Autowired
private JobExecutionRepository jobExecutionRepository;

@AfterEach
public void cleanUp() {
jobsRepository.deleteAll();
jobExecutionRepository.deleteAll();
}

@Test
Expand Down Expand Up @@ -137,4 +143,81 @@ private void assertDataJobExecutionValid(
Assert.assertEquals(expectedJobExecution.getDeployedDate(), actualJobExecution.getDeployment().getDeployedDate());
Assert.assertEquals(expectedJobExecution.getDeployedBy(), actualJobExecution.getDeployment().getDeployedBy());
}

@Test
public void testUpdateJobExecution_DnStatusSubmittedAndUpdateStatusRunning_UpdateExpected() {
//control test which makes sure the status gets updated if in submitted
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.SUBMITTED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.RUNNING);
}

@Test
public void testUpdateJobExecution_DbStatusRunningAndUpdateStatusFinished_UpdateExpected() {
//control test which makes sure the status gets updated if in running
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.RUNNING, KubernetesService.JobExecution.Status.FINISHED, ExecutionStatus.FINISHED);
}

@Test
public void testUpdateJobExecution_DbStatusFinishedAndUpdateStatusRunning_NoUpdateExpected() {
//test which makes sure the status doesn't get updated if in finished
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.FINISHED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.FINISHED);
}

@Test
public void testUpdateJobExecution_DbStatusCancelledAndUpdateStatusRunning_NoUpdateExpected() {
//test which makes sure the status doesn't get updated if in cancelled
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.CANCELLED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.CANCELLED);
}

@Test
public void testUpdateJobExecution_DbStatusSkippedAndUpdateStatusRunning_NoUpdateExpected() {
//test which makes sure the status doesn't get updated if in skipped
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.SKIPPED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.SKIPPED);
}

@Test
public void testUpdateJobExecution_DbStatusFailedAndUpdateStatusRunning_NoUpdateExpected() {
//test which makes sure the status doesn't get updated if in failed
testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus.FAILED, KubernetesService.JobExecution.Status.RUNNING, ExecutionStatus.FAILED);
}

/**
* Helper method which tests if a data job execution status changes through the updateJobExecutionMethod when
* a data job already has an execution status written to the database. Writes a status to a data job
* execution and then attempts to update it with the provided status. Checks if status matches the expectedStatus
* param.
*
* @param statusPresentInDb the "previous" execution status
* @param attemptedStatusChange the attempted change execution status
* @param expectedStatus the expected execution status
*/
private void testUpdateJobExecutionWithPreviousStatusInDatabase(ExecutionStatus statusPresentInDb,
KubernetesService.JobExecution.Status attemptedStatusChange,
ExecutionStatus expectedStatus) {
var actualDataJob = RepositoryUtil.createDataJob(jobsRepository);
var execution = RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-id", actualDataJob, statusPresentInDb);

KubernetesService.JobExecution attemptedExecutionUpdate = KubernetesService.JobExecution.builder()
.status(attemptedStatusChange)
.opId(execution.getOpId())
.executionId(execution.getId())
.executionType("manual")
.jobName(actualDataJob.getName())
.jobVersion("test_job_version")
.jobSchedule("test_job_schedule")
.startTime(OffsetDateTime.now())
.endTime(OffsetDateTime.now())
.resourcesCpuLimit(1f)
.resourcesCpuRequest(1f)
.resourcesMemoryLimit(1)
.resourcesMemoryRequest(1)
.deployedBy("test_deployed_by")
.deployedDate(OffsetDateTime.now())
.terminationMessage("TestMessage").build();

jobExecutionService.updateJobExecution(actualDataJob, attemptedExecutionUpdate);
var actualJobExecution = jobExecutionRepository.findById(attemptedExecutionUpdate.getExecutionId()).get();

Assertions.assertEquals(expectedStatus, actualJobExecution.getStatus());
}

}

0 comments on commit 3591dcf

Please sign in to comment.