Skip to content

Commit

Permalink
control-service: update the K8S job monitoring logic (#563)
Browse files Browse the repository at this point in the history
Previously, the logic that monitors the K8S jobs was
ignoring the ADDED event, which is received when a K8S
job is created but before the pod execution is started.
The reason for this is unclear, but this was affecting the
notification logic in the following manner:

* Assume we have a previous execution of a data job, which
completed with a user error
* When the next execution is started, first a K8S job
is created. This emits the ADDED event and a metrics
for this job is immediatelly exposed by the kube-state-metrics
* Since we are ignoring the ADDED event, our termination
status still reflects the previous execution (i.e. user error)
* We remain in this state until the actual execution starts,
i.e. a pod is up and running, in which case we receive a
MODIFIED event for the K8S job and update the termination
status in response
* However, during the period from the creation of the K8S
job until it actually starts, the kube-state-metrics exposes
information about the new execution, but our termination
metrics reflect the old one. Because we are joining against
the kube-state-metrics (in order to get the execution id),
it happens that for a period of time we have an active alert
for the new execution (even though it is still running) with
the termination status of the old one. Luckily this alert
rarely fires because we have a 1 minute `for` time before
an active alert becomes firing. However, this can produce
false positives if the pod is delayed too much and is also
ugly and misleading when looking at the Prometheus graphs.

This commit aims to fix this by responding to the ADDED
event by creating a SUBMITTED execution with a start time
equal to the current time.

Testing done: unit and integration tests pass

Signed-off-by: Tsvetomir Palashki <[email protected]>
  • Loading branch information
tpalashki authored and mrMoZ1 committed Dec 6, 2021
1 parent 58e3b4a commit 8e31dd3
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ public void watchJobs(

log.debug("Job {} is {}", job.getMetadata().getName(), response.type);

if (!"ADDED".equals(response.type) && !"DELETED".equals(response.type)) {
if (!"DELETED".equals(response.type)) {
// Occasionally events arrive for jobs that have completed into the past.
// Ignore events that have arrived later than one hour after the job's completion time
var condition = getJobCondition(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package com.vmware.taurus.service.execution;

import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Map;

Expand Down Expand Up @@ -48,7 +49,7 @@ private static class TerminationMessage {
*/
public static ExecutionResult getResult(KubernetesService.JobExecution jobExecution) {
TerminationMessage terminationMessage = parseTerminationMessage(jobExecution.getTerminationMessage());
ExecutionStatus executionStatus = getExecutionStatus(jobExecution.getSucceeded());
ExecutionStatus executionStatus = getExecutionStatus(jobExecution.getSucceeded(), jobExecution.getStartTime());
ExecutionTerminationStatus terminationStatus = getTerminationStatus(terminationMessage.getTerminationStatus());

terminationStatus = updateTerminationStatusBasedOnExecutionStatus(
Expand All @@ -66,19 +67,26 @@ public static ExecutionResult getResult(KubernetesService.JobExecution jobExecut
* Determines the execution status based on K8S Job status as follows:
* <ul>
* <li>If K8S Job succeeded is null (which means there is no K8S Job condition
* because the job is already running), then the execution status will be RUNNING</li>
* because the job has still not finished), then the execution status will be either RUNNING,
* if the K8S Job has start time, or SUBMITTED, if the K8S Job does not have start time,
* i.e. it was created but the execution has not started yet.</li>
* <li>If the K8S Job succeeded is true, then the execution status will be FINISHED</li>
* <li>If K8S Job succeeded is false, then the execution status will be FAILED</li>
* </ul>
*
* @param executionSucceeded
* @param startTime
* @return
*/
private static ExecutionStatus getExecutionStatus(Boolean executionSucceeded) {
private static ExecutionStatus getExecutionStatus(Boolean executionSucceeded, OffsetDateTime startTime) {
ExecutionStatus executionStatus;

if (executionSucceeded == null) {
executionStatus = ExecutionStatus.RUNNING;
if (startTime == null) {
executionStatus = ExecutionStatus.SUBMITTED;
} else {
executionStatus = ExecutionStatus.RUNNING;
}
} else if (executionSucceeded) {
executionStatus = ExecutionStatus.FINISHED;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ public Optional<com.vmware.taurus.service.model.DataJobExecution> updateJobExecu
com.vmware.taurus.service.model.DataJobExecution.builder()
.id(jobExecution.getExecutionId())
.dataJob(dataJob)
.startTime(jobExecution.getStartTime())
.startTime(jobExecution.getStartTime() != null ?
jobExecution.getStartTime() :
OffsetDateTime.now())
.type(ExecutionType.MANUAL.getValue().equals(jobExecution.getExecutionType()) ?
com.vmware.taurus.service.model.ExecutionType.MANUAL :
com.vmware.taurus.service.model.ExecutionType.SCHEDULED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.vmware.taurus.service.model.ExecutionResult;
import com.vmware.taurus.service.model.ExecutionTerminationStatus;

import java.time.OffsetDateTime;

public class JobExecutionResultManagerTest {

@Test
Expand Down Expand Up @@ -108,11 +110,20 @@ public void testGetResult_emptyTerminationMessageAndExecutionStatusFailed_should
}

@Test
public void testGetResult_executionSucceededNull_shouldReturnExecutionStatusRunning() {
public void testGetResult_executionSucceededNullAndStartTimeNull_shouldReturnExecutionStatusSubmitted() {
KubernetesService.JobExecution jobExecution =
KubernetesService.JobExecution.builder().succeeded(null).build();
ExecutionResult actualResult = JobExecutionResultManager.getResult(jobExecution);

Assertions.assertEquals(ExecutionStatus.SUBMITTED, actualResult.getExecutionStatus());
}

@Test
public void testGetResult_executionSucceededNullAndStartTimeNotNull_shouldReturnExecutionStatusRunning() {
KubernetesService.JobExecution jobExecution =
KubernetesService.JobExecution.builder().succeeded(null).startTime(OffsetDateTime.now()).build();
ExecutionResult actualResult = JobExecutionResultManager.getResult(jobExecution);

Assertions.assertEquals(ExecutionStatus.RUNNING, actualResult.getExecutionStatus());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package com.vmware.taurus.service.execution;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Random;

Expand Down Expand Up @@ -147,6 +148,56 @@ public void testUpdateJobExecution_statusFinishedAndTerminationMessageSuccess_sh
"");
}

@Test
void testUpdateJobExecution_withoutStartTime_shouldRecordExecutionStartTimeNow() {
DataJob actualDataJob = RepositoryUtil.createDataJob(jobsRepository);
KubernetesService.JobExecution expectedJobExecution = createJobExecution(
actualDataJob,
null,
null,
null,
null);
DataJobExecution actualJobExecution = jobExecutionService.readJobExecution(
actualDataJob.getJobConfig().getTeam(),
actualDataJob.getName(),
expectedJobExecution.getExecutionId());

Assert.assertNotNull(actualJobExecution.getStartTime());
// Start time should be close to the current time (within 10 seconds) because it is set to the current time when null
Assert.assertTrue(Duration.between(OffsetDateTime.now(), actualJobExecution.getStartTime()).toMillis() < 10000);
Assert.assertEquals(DataJobExecution.StatusEnum.SUBMITTED, actualJobExecution.getStatus());
}

private KubernetesService.JobExecution createJobExecution(
DataJob dataJob,
Boolean succeeded,
String terminationMessage,
OffsetDateTime startTime,
OffsetDateTime endTime) {

KubernetesService.JobExecution jobExecution = KubernetesService.JobExecution.builder()
.succeeded(succeeded)
.opId("test_op_id")
.executionId("test_execution_id_" + new Random().nextInt())
.executionType("manual")
.jobName(dataJob.getName())
.jobVersion("test_job_version")
.jobSchedule("test_job_schedule")
.startTime(startTime)
.endTime(endTime)
.resourcesCpuLimit(1f)
.resourcesCpuRequest(1f)
.resourcesMemoryLimit(1)
.resourcesMemoryRequest(1)
.deployedBy("test_deployed_by")
.deployedDate(OffsetDateTime.now())
.terminationMessage(terminationMessage).build();
ExecutionResult executionResult = JobExecutionResultManager.getResult(jobExecution);
jobExecutionService.updateJobExecution(dataJob, jobExecution, executionResult);

return jobExecution;
}

private void testUpdateJobExecution(
Boolean actualExecutionSucceeded,
String actualTerminationMessage,
Expand All @@ -155,30 +206,16 @@ private void testUpdateJobExecution(
String expectedVdkVersion) {

DataJob actualDataJob = RepositoryUtil.createDataJob(jobsRepository);
KubernetesService.JobExecution expectedJobExecution = KubernetesService.JobExecution.builder()
.succeeded(actualExecutionSucceeded)
.opId("test_op_id")
.executionId("test_execution_id_" + new Random().nextInt())
.executionType("manual")
.jobName(actualDataJob.getName())
.jobVersion("test_job_version")
.jobSchedule("test_job_schedule")
.startTime(OffsetDateTime.now())
.endTime(OffsetDateTime.now())
.resourcesCpuLimit(1f)
.resourcesCpuRequest(1f)
.resourcesMemoryLimit(1)
.resourcesMemoryRequest(1)
.deployedBy("test_deployed_by")
.deployedDate(OffsetDateTime.now())
.terminationMessage(actualTerminationMessage).build();
ExecutionResult executionResult = JobExecutionResultManager.getResult(expectedJobExecution);
jobExecutionService.updateJobExecution(actualDataJob, expectedJobExecution, executionResult);

KubernetesService.JobExecution expectedJobExecution = createJobExecution(
actualDataJob,
actualExecutionSucceeded,
actualTerminationMessage,
OffsetDateTime.now(),
OffsetDateTime.now());
DataJobExecution actualJobExecution = jobExecutionService.readJobExecution(
actualDataJob.getJobConfig().getTeam(),
actualDataJob.getName(),
expectedJobExecution.getExecutionId());
actualDataJob.getJobConfig().getTeam(),
actualDataJob.getName(),
expectedJobExecution.getExecutionId());

expectedJobExecutionMessage = expectedJobExecutionMessage != null ? expectedJobExecutionMessage : expectedJobExecution.getTerminationMessage();
assertDataJobExecutionValid(expectedJobExecution, expectedJobExecutionStatus, expectedJobExecutionMessage, actualJobExecution, expectedVdkVersion);
Expand Down

0 comments on commit 8e31dd3

Please sign in to comment.