Skip to content

Commit

Permalink
control-service: synchronize job executions in status submitted (#312)
Browse files Browse the repository at this point in the history
We need to keep in sync all job executions in the database
in case of Control Service downtime or missed Kubernetes Job Event.

This change aims to synchronize Data Job Executions in the database that
have status SUBMITTED with the actual running jobs in Kubernetes.

Testing done: unit tests

Signed-off-by: Miroslav Ivanov [email protected]
  • Loading branch information
mivanov1988 authored Sep 29, 2021
1 parent 1720d8b commit 7268a83
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,23 @@ public String startDataJobExecution(String teamName, String jobName, String depl
envs.put(JobEnvVar.VDK_OP_ID.getValue(), opId);

// Start K8S Job
dataJobsKubernetesService.startNewCronJobExecution(jobDeploymentStatus.getCronJobName(), executionId, annotations, envs, extraJobArguments, jobName);
dataJobsKubernetesService.startNewCronJobExecution(
jobDeploymentStatus.getCronJobName(),
executionId,
annotations,
envs,
extraJobArguments,
jobName);

// Save Data Job execution
saveDataJobExecution(dataJob, executionId, opId, com.vmware.taurus.service.model.ExecutionType.MANUAL, ExecutionStatus.SUBMITTED, startedByBuilt);
saveDataJobExecution(
dataJob,
executionId,
opId,
com.vmware.taurus.service.model.ExecutionType.MANUAL,
ExecutionStatus.SUBMITTED,
startedByBuilt,
OffsetDateTime.now());

return executionId;
} catch (ApiException e) {
Expand Down Expand Up @@ -292,7 +305,7 @@ public void syncJobExecutionStatuses(List<String> runningJobExecutionIds) {

List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsToBeUpdated =
jobExecutionRepository.findDataJobExecutionsByStatusInAndStartTimeBefore(
List.of(ExecutionStatus.RUNNING), OffsetDateTime.now().minusMinutes(3))
List.of(ExecutionStatus.SUBMITTED, ExecutionStatus.RUNNING), OffsetDateTime.now().minusMinutes(3))
.stream()
.filter(dataJobExecution -> !runningJobExecutionIds.contains(dataJobExecution.getId()))
.map(dataJobExecution -> {
Expand Down Expand Up @@ -353,7 +366,8 @@ private void saveDataJobExecution(
String opId,
com.vmware.taurus.service.model.ExecutionType executionType,
ExecutionStatus executionStatus,
String startedBy) {
String startedBy,
OffsetDateTime startTime) {

com.vmware.taurus.service.model.DataJobExecution dataJobExecution = com.vmware.taurus.service.model.DataJobExecution.builder()
.id(executionId)
Expand All @@ -362,6 +376,7 @@ private void saveDataJobExecution(
.type(executionType)
.status(executionStatus)
.startedBy(startedBy)
.startTime(startTime)
.build();

jobExecutionRepository.save(dataJobExecution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,14 @@ public void testSyncJobExecutionStatuses_fourRunningExecutionsWithStartTimeBefor
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-2", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
com.vmware.taurus.service.model.DataJobExecution expectedJobExecution3 =
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-3", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-4", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-4", actualDataJob, ExecutionStatus.SUBMITTED, OffsetDateTime.now().minusMinutes(5));

List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsBeforeSync =
jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(actualDataJob.getName(), List.of(ExecutionStatus.RUNNING));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsBeforeSync = findRunningDataJobExecutions(actualDataJob.getName());

Assert.assertEquals(4, dataJobExecutionsBeforeSync.size());
jobExecutionService.syncJobExecutionStatuses(List.of(expectedJobExecution1.getId(), expectedJobExecution3.getId()));

List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsAfterSync =
jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(actualDataJob.getName(), List.of(ExecutionStatus.RUNNING));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsAfterSync = findRunningDataJobExecutions(actualDataJob.getName());

Assert.assertEquals(2, dataJobExecutionsAfterSync.size());
Assert.assertEquals(expectedJobExecution1.getId(), dataJobExecutionsAfterSync.get(0).getId());
Expand All @@ -78,17 +76,15 @@ public void testSyncJobExecutionStatuses_twoRunningExecutionsWithStartTimeBefore
com.vmware.taurus.service.model.DataJobExecution expectedJobExecution2 =
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-2", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(2));
com.vmware.taurus.service.model.DataJobExecution expectedJobExecution3 =
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-3", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(2));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-4", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-3", actualDataJob, ExecutionStatus.SUBMITTED, OffsetDateTime.now().minusMinutes(2));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-4", actualDataJob, ExecutionStatus.SUBMITTED, OffsetDateTime.now().minusMinutes(5));

List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsBeforeSync =
jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(actualDataJob.getName(), List.of(ExecutionStatus.RUNNING));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsBeforeSync = findRunningDataJobExecutions(actualDataJob.getName());

Assert.assertEquals(4, dataJobExecutionsBeforeSync.size());
jobExecutionService.syncJobExecutionStatuses(List.of(expectedJobExecution1.getId(), expectedJobExecution3.getId()));

List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsAfterSync =
jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(actualDataJob.getName(), List.of(ExecutionStatus.RUNNING));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsAfterSync = findRunningDataJobExecutions(actualDataJob.getName());

Assert.assertEquals(3, dataJobExecutionsAfterSync.size());
Assert.assertEquals(expectedJobExecution1.getId(), dataJobExecutionsAfterSync.get(0).getId());
Expand All @@ -104,17 +100,15 @@ public void testSyncJobExecutionStatuses_fourRunningExecutionsWithStartTimeBefor
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-1", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-2", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
com.vmware.taurus.service.model.DataJobExecution expectedJobExecution3 =
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-3", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-4", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-3", actualDataJob, ExecutionStatus.SUBMITTED, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-4", actualDataJob, ExecutionStatus.SUBMITTED, OffsetDateTime.now().minusMinutes(5));

List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsBeforeSync =
jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(actualDataJob.getName(), List.of(ExecutionStatus.RUNNING));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsBeforeSync = findRunningDataJobExecutions(actualDataJob.getName());

Assert.assertEquals(4, dataJobExecutionsBeforeSync.size());
jobExecutionService.syncJobExecutionStatuses(Collections.emptyList());

List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsAfterSync =
jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(actualDataJob.getName(), List.of(ExecutionStatus.RUNNING));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsAfterSync = findRunningDataJobExecutions(actualDataJob.getName());

Assert.assertEquals(0, dataJobExecutionsAfterSync.size());
}
Expand All @@ -128,12 +122,11 @@ public void testSyncJobExecutionStatuses_fourRunningExecutionsWithStartTimeBefor
com.vmware.taurus.service.model.DataJobExecution expectedJobExecution2 =
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-2", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
com.vmware.taurus.service.model.DataJobExecution expectedJobExecution3 =
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-3", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-3", actualDataJob, ExecutionStatus.SUBMITTED, OffsetDateTime.now().minusMinutes(5));
com.vmware.taurus.service.model.DataJobExecution expectedJobExecution4 =
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-4", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-4", actualDataJob, ExecutionStatus.SUBMITTED, OffsetDateTime.now().minusMinutes(5));

List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsBeforeSync =
jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(actualDataJob.getName(), List.of(ExecutionStatus.RUNNING));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsBeforeSync = findRunningDataJobExecutions(actualDataJob.getName());

Assert.assertEquals(4, dataJobExecutionsBeforeSync.size());
jobExecutionService.syncJobExecutionStatuses(List.of(
Expand All @@ -142,8 +135,7 @@ public void testSyncJobExecutionStatuses_fourRunningExecutionsWithStartTimeBefor
expectedJobExecution3.getId(),
expectedJobExecution4.getId()));

List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsAfterSync =
jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(actualDataJob.getName(), List.of(ExecutionStatus.RUNNING));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsAfterSync = findRunningDataJobExecutions(actualDataJob.getName());

Assert.assertEquals(4, dataJobExecutionsAfterSync.size());
}
Expand All @@ -154,17 +146,15 @@ public void testSyncJobExecutionStatuses_fourRunningExecutionsWithStartTimeBefor

RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-1", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-2", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-3", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-4", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-3", actualDataJob, ExecutionStatus.SUBMITTED, OffsetDateTime.now().minusMinutes(5));
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-4", actualDataJob, ExecutionStatus.SUBMITTED, OffsetDateTime.now().minusMinutes(5));

List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsBeforeSync =
jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(actualDataJob.getName(), List.of(ExecutionStatus.RUNNING));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsBeforeSync = findRunningDataJobExecutions(actualDataJob.getName());

Assert.assertEquals(4, dataJobExecutionsBeforeSync.size());
jobExecutionService.syncJobExecutionStatuses(null);

List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsAfterSync =
jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(actualDataJob.getName(), List.of(ExecutionStatus.RUNNING));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsAfterSync = findRunningDataJobExecutions(actualDataJob.getName());

Assert.assertEquals(4, dataJobExecutionsAfterSync.size());
}
Expand All @@ -181,16 +171,14 @@ public void testSyncJobExecutionStatuses_twoRunningExecutionsWithStartTimeBefore
com.vmware.taurus.service.model.DataJobExecution expectedJobExecution4 =
RepositoryUtil.createDataJobExecution(jobExecutionRepository, "test-execution-id-4", actualDataJob, ExecutionStatus.RUNNING, OffsetDateTime.now().minusMinutes(5));

List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsBeforeSync =
jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(actualDataJob.getName(), List.of(ExecutionStatus.RUNNING));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsBeforeSync = findRunningDataJobExecutions(actualDataJob.getName());

Assert.assertEquals(2, dataJobExecutionsBeforeSync.size());
Assert.assertEquals(expectedJobExecution3.getId(), dataJobExecutionsBeforeSync.get(0).getId());
Assert.assertEquals(expectedJobExecution4.getId(), dataJobExecutionsBeforeSync.get(1).getId());

jobExecutionService.syncJobExecutionStatuses(List.of(expectedJobExecution1.getId(), expectedJobExecution3.getId()));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsAfterSync =
jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(actualDataJob.getName(), List.of(ExecutionStatus.RUNNING));
List<com.vmware.taurus.service.model.DataJobExecution> dataJobExecutionsAfterSync = findRunningDataJobExecutions(actualDataJob.getName());

Assert.assertEquals(1, dataJobExecutionsAfterSync.size());
Assert.assertEquals(expectedJobExecution3.getId(), dataJobExecutionsAfterSync.get(0).getId());
Expand All @@ -199,4 +187,8 @@ public void testSyncJobExecutionStatuses_twoRunningExecutionsWithStartTimeBefore
Assert.assertEquals("Status is set by VDK Control Service", actualFinishedExecution.getMessage());
Assert.assertNotNull(actualFinishedExecution.getEndTime());
}

private List<com.vmware.taurus.service.model.DataJobExecution> findRunningDataJobExecutions(String dataJobName) {
return jobExecutionRepository.findDataJobExecutionsByDataJobNameAndStatusIn(dataJobName, List.of(ExecutionStatus.SUBMITTED, ExecutionStatus.RUNNING));
}
}

0 comments on commit 7268a83

Please sign in to comment.