Skip to content

Commit

Permalink
control-service: handle init container OOM (#1658)
Browse files Browse the repository at this point in the history
# Why
Currently, if the init container fails with OOM
the control service will mark the job as successful execution.

# What
Changed the CS logic to the following:
if the init container fails with OOM
the control service will mark the job as Platform Error.

# Testing done
Integration test

Signed-off-by: Miroslav Ivanov [email protected]

---------

Signed-off-by: Miroslav Ivanov [email protected]
Co-authored-by: github-actions <>
  • Loading branch information
mivanov1988 authored Feb 21, 2023
1 parent 2b519ea commit 785d5b3
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2021-2023 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.datajobs.it;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.datajobs.it.common.BaseIT;
import com.vmware.taurus.datajobs.it.common.DataJobDeploymentExtension;
import com.vmware.taurus.datajobs.it.common.JobExecutionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.TestPropertySource;

@Slf4j
@TestPropertySource(
properties = {
"datajobs.deployment.initContainer.resources.requests.memory=6Mi",
"datajobs.deployment.initContainer.resources.limits.memory=6Mi",
})
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = ControlplaneApplication.class)
public class DataJobInitContainerOOMIT extends BaseIT {

@RegisterExtension
static DataJobDeploymentExtension dataJobDeploymentExtension = new DataJobDeploymentExtension();

@Test
public void testDataJob_causesOOM_shouldCompleteWithUserError(
String jobName, String teamName, String username, String deploymentId) throws Exception {
// manually start job execution
ImmutablePair<String, String> executeDataJobResult =
JobExecutionUtil.executeDataJob(jobName, teamName, username, deploymentId, mockMvc);
String opId = executeDataJobResult.getLeft();
String executionId = executeDataJobResult.getRight();

// Check the data job execution status
JobExecutionUtil.checkDataJobExecutionStatus(
executionId,
DataJobExecution.StatusEnum.PLATFORM_ERROR,
opId,
jobName,
teamName,
username,
mockMvc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = ControlplaneApplication.class)
public class DataJobOOMIT extends BaseIT {
public class DataJobMainContainerOOMIT extends BaseIT {

@RegisterExtension
static DataJobDeploymentExtension dataJobDeploymentExtension =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import lombok.*;
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -58,6 +59,7 @@
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.function.Predicate.not;
Expand Down Expand Up @@ -145,7 +147,7 @@ public static class JobExecution {
String executionId;
String executionType;
String jobName;
String podTerminationMessage;
String mainContainerTerminationMessage;
String jobTerminationReason;
Boolean succeeded;
String opId;
Expand All @@ -159,7 +161,8 @@ public static class JobExecution {
Integer resourcesMemoryLimit;
OffsetDateTime deployedDate;
String deployedBy;
String containerTerminationReason;
String mainContainerTerminationReason;
String initContainerTerminationReason;
}

@AllArgsConstructor
Expand Down Expand Up @@ -1456,15 +1459,16 @@ private static String getContainerName(V1Job job) {
return (containers != null && !containers.isEmpty()) ? containers.get(0).getName() : null;
}

private static Optional<V1ContainerStateTerminated> getTerminatedState(V1Pod pod) {
private static Optional<V1ContainerStateTerminated> getTerminatedState(
V1Pod pod, Function<V1PodStatus, List<V1ContainerStatus>> containerStatusFunction) {
Objects.requireNonNull(pod, "The pod cannot be null");
var status = pod.getStatus();
if (status == null
|| status.getContainerStatuses() == null
|| status.getContainerStatuses().isEmpty()) {
return Optional.empty();
}
final var containerStatus = status.getContainerStatuses().get(0);
final var containerStatus = containerStatusFunction.apply(status).get(0);
return getTerminatedState(containerStatus.getState())
.or(() -> getTerminatedState(containerStatus.getLastState()));
}
Expand All @@ -1483,7 +1487,8 @@ private static Optional<V1ContainerStateTerminated> getTerminatedState(V1Contain
* @return A {@link V1ContainerStateTerminated} object representing the termination status of the
* job.
*/
Optional<V1ContainerStateTerminated> getTerminationStatus(V1Job job) {
ImmutablePair<Optional<V1ContainerStateTerminated>, Optional<V1ContainerStateTerminated>>
getTerminationStatus(V1Job job) {
List<V1Pod> jobPods;

try {
Expand All @@ -1493,21 +1498,50 @@ Optional<V1ContainerStateTerminated> getTerminationStatus(V1Job job) {
"Could not list pods for job {}",
job.getMetadata().getName(),
new KubernetesException("", ex));
return Optional.empty();
return ImmutablePair.of(Optional.empty(), Optional.empty());
}

var lastTerminatedPodState =
var lastMainTerminatedPodState =
jobPods.stream()
.map(KubernetesService::getTerminatedState)
.map(
v1Pod ->
getTerminatedState(
v1Pod,
new Function<V1PodStatus, List<V1ContainerStatus>>() {
@Override
public List<V1ContainerStatus> apply(V1PodStatus v1PodStatus) {
return v1PodStatus.getContainerStatuses();
}
}))
.filter(Optional::isPresent)
.map(Optional::get)
.max(Comparator.comparing(V1ContainerStateTerminated::getFinishedAt));

if (lastTerminatedPodState.isEmpty()) {
log.info("Could not find a terminated pod for job {}", job.getMetadata().getName());
if (lastMainTerminatedPodState.isEmpty()) {
log.info("Could not find a main terminated pod for job {}", job.getMetadata().getName());
}

return lastTerminatedPodState;
var lastInitTerminatedPodState =
jobPods.stream()
.map(
v1Pod ->
getTerminatedState(
v1Pod,
new Function<V1PodStatus, List<V1ContainerStatus>>() {
@Override
public List<V1ContainerStatus> apply(V1PodStatus v1PodStatus) {
return v1PodStatus.getInitContainerStatuses();
}
}))
.filter(Optional::isPresent)
.map(Optional::get)
.max(Comparator.comparing(V1ContainerStateTerminated::getFinishedAt));

if (lastInitTerminatedPodState.isEmpty()) {
log.info("Could not find a data job terminated pod for job {}", job.getMetadata().getName());
}

return ImmutablePair.of(lastInitTerminatedPodState, lastMainTerminatedPodState);
}

/**
Expand All @@ -1521,23 +1555,35 @@ Optional<JobExecution> getJobExecutionStatus(V1Job job, JobStatusCondition jobSt
// jobCondition = null means that the K8S Job is still running
if (jobStatusCondition != null) {
// Job termination status
Optional<V1ContainerStateTerminated> lastTerminatedPodState = getTerminationStatus(job);
ImmutablePair<Optional<V1ContainerStateTerminated>, Optional<V1ContainerStateTerminated>>
podTerminationStatus = getTerminationStatus(job);

Optional<V1ContainerStateTerminated> lastInitContainerStateTerminated =
podTerminationStatus.getLeft();
// Termination Reason of the pod init container
lastInitContainerStateTerminated
.map(
v1ContainerStateTerminated ->
StringUtils.trim(v1ContainerStateTerminated.getReason()))
.ifPresent(s -> jobExecutionStatusBuilder.initContainerTerminationReason(s));

Optional<V1ContainerStateTerminated> lastMainContainerStateTerminated =
podTerminationStatus.getRight();
// If the job completed but its pod did not produce a termination message, we infer the
// termination
// status later, based on the status of the job itself.
lastTerminatedPodState
// termination status later, based on the status of the job itself.
lastMainContainerStateTerminated
.map(
v1ContainerStateTerminated ->
StringUtils.trim(v1ContainerStateTerminated.getMessage()))
.ifPresent(s -> jobExecutionStatusBuilder.podTerminationMessage(s));
.ifPresent(s -> jobExecutionStatusBuilder.mainContainerTerminationMessage(s));
jobExecutionStatusBuilder.jobTerminationReason(jobStatusCondition.getReason());

// Termination Reason of the data job pod container
lastTerminatedPodState
lastMainContainerStateTerminated
.map(
v1ContainerStateTerminated ->
StringUtils.trim(v1ContainerStateTerminated.getReason()))
.ifPresent(s -> jobExecutionStatusBuilder.containerTerminationReason(s));
.ifPresent(s -> jobExecutionStatusBuilder.mainContainerTerminationReason(s));
}

// Job resources
Expand Down Expand Up @@ -1654,7 +1700,7 @@ Optional<JobExecution> getJobExecutionStatus(V1Job job, JobStatusCondition jobSt

// omits events that come after the Data Job completion
if (jobExecutionStatusBuilder.succeeded != null
&& StringUtils.isBlank(jobExecutionStatusBuilder.containerTerminationReason)) {
&& StringUtils.isBlank(jobExecutionStatusBuilder.initContainerTerminationReason)) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ private static class PodTerminationMessage {
*/
public static ExecutionResult getResult(KubernetesService.JobExecution jobExecution) {
PodTerminationMessage podTerminationMessage =
parsePodTerminationMessage(jobExecution.getPodTerminationMessage());
parsePodTerminationMessage(jobExecution.getMainContainerTerminationMessage());
ExecutionStatus executionStatus =
getExecutionStatus(
jobExecution.getSucceeded(),
podTerminationMessage.getStatus(),
jobExecution.getJobTerminationReason(),
jobExecution.getContainerTerminationReason(),
jobExecution.getMainContainerTerminationReason(),
jobExecution.getStartTime());

return ExecutionResult.builder()
Expand All @@ -78,37 +78,37 @@ public static ExecutionResult getResult(KubernetesService.JobExecution jobExecut
* </ul>
*
* @param executionSucceeded K8s Job status (true - succeeded, false - failed, null - running)
* @param podTerminationStatus termination status returned from K8S Pod (e.g. "Success", "User
* error", etc.)
* @param mainContainerTerminationMessage termination status returned from K8S Pod (e.g.
* "Success", "User error", etc.)
* @param jobTerminationReason condition reason as reported by K8s Job (e.g. "DeadlineExceeded",
* "BackoffLimitExceeded", etc.)
* @param containerTerminationReason termination reason for pod container as returned by K8s pod
* container (e.g., "OOMKilled", etc.)
* @param mainContainerTerminationReason termination reason for pod container as returned by K8s
* pod container (e.g., "OOMKilled", etc.)
* @param executionStarTime K8S Job execution start time
* @return if there is no termination message due to the missing K8S Pod returns execution status
* based on K8S Job status otherwise returns execution status based on the K8S Pod termination
* status.
*/
private static ExecutionStatus getExecutionStatus(
Boolean executionSucceeded,
String podTerminationStatus,
String mainContainerTerminationMessage,
String jobTerminationReason,
String containerTerminationReason,
String mainContainerTerminationReason,
OffsetDateTime executionStarTime) {

ExecutionStatus executionStatus;

if (executionSucceeded == null) {
executionStatus =
executionStarTime == null ? ExecutionStatus.SUBMITTED : ExecutionStatus.RUNNING;
} else if (executionSucceeded && StringUtils.isEmpty(podTerminationStatus)) {
} else if (executionSucceeded && StringUtils.isEmpty(mainContainerTerminationMessage)) {
executionStatus = ExecutionStatus.SUCCEEDED;
} else if (!executionSucceeded && StringUtils.isEmpty(podTerminationStatus)) {
executionStatus = inferError(jobTerminationReason, containerTerminationReason);
} else if (!executionSucceeded && StringUtils.isEmpty(mainContainerTerminationMessage)) {
executionStatus = inferError(jobTerminationReason, mainContainerTerminationReason);
} else {
executionStatus =
Arrays.stream(ExecutionStatus.values())
.filter(status -> status.getPodStatus().equals(podTerminationStatus))
.filter(status -> status.getPodStatus().equals(mainContainerTerminationMessage))
.findAny()
.orElse(ExecutionStatus.PLATFORM_ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public Optional<com.vmware.taurus.service.model.DataJobExecution> updateJobExecu
.status(executionStatus)
.message(
getJobExecutionApiMessage(
executionStatus, jobExecution.getContainerTerminationReason()))
executionStatus, jobExecution.getMainContainerTerminationReason()))
.opId(jobExecution.getOpId())
.endTime(jobExecution.getEndTime())
.vdkVersion(executionResult.getVdkVersion())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void watchJobs() {
"Termination message of Data Job {} with execution {}: {}",
s.getJobName(),
s.getExecutionId(),
s.getPodTerminationMessage());
s.getMainContainerTerminationMessage());
recordJobExecutionStatus(s);
},
runningJobExecutionIds -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.vmware.taurus.service.model.JobLabel;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.openapi.models.*;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
Expand Down Expand Up @@ -88,7 +89,8 @@ public void testGetJobExecutionStatus_emptyJob_shouldReturnEmptyJobExecutionStat
V1Job v1Job = new V1Job();
var mock = Mockito.mock(KubernetesService.class);
Mockito.when(mock.getK8sSupportsV1CronJob()).thenReturn(false);
Mockito.when(mock.getTerminationStatus(v1Job)).thenReturn(Optional.empty());
Mockito.when(mock.getTerminationStatus(v1Job))
.thenReturn(ImmutablePair.of(Optional.empty(), Optional.empty()));
Mockito.when(mock.getJobExecutionStatus(v1Job, null)).thenCallRealMethod();
Optional<KubernetesService.JobExecution> actualJobExecutionStatusOptional =
mock.getJobExecutionStatus(v1Job, null);
Expand Down Expand Up @@ -175,7 +177,10 @@ public void testGetJobExecutionStatus_notEmptyJob_shouldReturnNotEmptyJobExecuti
KubernetesService mock = Mockito.mock(KubernetesService.class);
Mockito.when(mock.getK8sSupportsV1CronJob()).thenReturn(false);
Mockito.when(mock.getTerminationStatus(expectedJob))
.thenReturn(Optional.ofNullable(new V1ContainerStateTerminated().reason("test")));
.thenReturn(
ImmutablePair.of(
Optional.ofNullable(new V1ContainerStateTerminated().reason("test")),
Optional.ofNullable(new V1ContainerStateTerminated().reason("test"))));
Mockito.when(mock.getJobExecutionStatus(expectedJob, condition)).thenCallRealMethod();
Optional<KubernetesService.JobExecution> actualJobExecutionStatusOptional =
mock.getJobExecutionStatus(expectedJob, condition);
Expand Down
Loading

0 comments on commit 785d5b3

Please sign in to comment.