Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control-service: handle init container OOM #1658

Merged
merged 2 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2021-2023 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.datajobs.it;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.datajobs.it.common.BaseIT;
import com.vmware.taurus.datajobs.it.common.DataJobDeploymentExtension;
import com.vmware.taurus.datajobs.it.common.JobExecutionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.TestPropertySource;

@Slf4j
@TestPropertySource(
properties = {
"datajobs.deployment.initContainer.resources.requests.memory=6Mi",
"datajobs.deployment.initContainer.resources.limits.memory=6Mi",
})
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = ControlplaneApplication.class)
public class DataJobInitContainerOOMIT extends BaseIT {

@RegisterExtension
static DataJobDeploymentExtension dataJobDeploymentExtension = new DataJobDeploymentExtension();

@Test
public void testDataJob_causesOOM_shouldCompleteWithUserError(
String jobName, String teamName, String username, String deploymentId) throws Exception {
// manually start job execution
ImmutablePair<String, String> executeDataJobResult =
JobExecutionUtil.executeDataJob(jobName, teamName, username, deploymentId, mockMvc);
String opId = executeDataJobResult.getLeft();
String executionId = executeDataJobResult.getRight();

// Check the data job execution status
JobExecutionUtil.checkDataJobExecutionStatus(
executionId,
DataJobExecution.StatusEnum.PLATFORM_ERROR,
opId,
jobName,
teamName,
username,
mockMvc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = ControlplaneApplication.class)
public class DataJobOOMIT extends BaseIT {
public class DataJobMainContainerOOMIT extends BaseIT {

@RegisterExtension
static DataJobDeploymentExtension dataJobDeploymentExtension =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import lombok.*;
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -58,6 +59,7 @@
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.function.Predicate.not;
Expand Down Expand Up @@ -145,7 +147,7 @@ public static class JobExecution {
String executionId;
String executionType;
String jobName;
String podTerminationMessage;
String mainContainerTerminationMessage;
String jobTerminationReason;
Boolean succeeded;
String opId;
Expand All @@ -159,7 +161,8 @@ public static class JobExecution {
Integer resourcesMemoryLimit;
OffsetDateTime deployedDate;
String deployedBy;
String containerTerminationReason;
String mainContainerTerminationReason;
String initContainerTerminationReason;
}

@AllArgsConstructor
Expand Down Expand Up @@ -1456,15 +1459,16 @@ private static String getContainerName(V1Job job) {
return (containers != null && !containers.isEmpty()) ? containers.get(0).getName() : null;
}

private static Optional<V1ContainerStateTerminated> getTerminatedState(V1Pod pod) {
private static Optional<V1ContainerStateTerminated> getTerminatedState(
V1Pod pod, Function<V1PodStatus, List<V1ContainerStatus>> containerStatusFunction) {
Objects.requireNonNull(pod, "The pod cannot be null");
var status = pod.getStatus();
if (status == null
|| status.getContainerStatuses() == null
|| status.getContainerStatuses().isEmpty()) {
return Optional.empty();
}
final var containerStatus = status.getContainerStatuses().get(0);
final var containerStatus = containerStatusFunction.apply(status).get(0);
return getTerminatedState(containerStatus.getState())
.or(() -> getTerminatedState(containerStatus.getLastState()));
}
Expand All @@ -1483,7 +1487,8 @@ private static Optional<V1ContainerStateTerminated> getTerminatedState(V1Contain
* @return A {@link V1ContainerStateTerminated} object representing the termination status of the
* job.
*/
Optional<V1ContainerStateTerminated> getTerminationStatus(V1Job job) {
ImmutablePair<Optional<V1ContainerStateTerminated>, Optional<V1ContainerStateTerminated>>
getTerminationStatus(V1Job job) {
List<V1Pod> jobPods;

try {
Expand All @@ -1493,21 +1498,50 @@ Optional<V1ContainerStateTerminated> getTerminationStatus(V1Job job) {
"Could not list pods for job {}",
job.getMetadata().getName(),
new KubernetesException("", ex));
return Optional.empty();
return ImmutablePair.of(Optional.empty(), Optional.empty());
}

var lastTerminatedPodState =
var lastMainTerminatedPodState =
jobPods.stream()
.map(KubernetesService::getTerminatedState)
.map(
v1Pod ->
getTerminatedState(
v1Pod,
new Function<V1PodStatus, List<V1ContainerStatus>>() {
@Override
public List<V1ContainerStatus> apply(V1PodStatus v1PodStatus) {
return v1PodStatus.getContainerStatuses();
}
}))
.filter(Optional::isPresent)
.map(Optional::get)
.max(Comparator.comparing(V1ContainerStateTerminated::getFinishedAt));

if (lastTerminatedPodState.isEmpty()) {
log.info("Could not find a terminated pod for job {}", job.getMetadata().getName());
if (lastMainTerminatedPodState.isEmpty()) {
log.info("Could not find a main terminated pod for job {}", job.getMetadata().getName());
}

return lastTerminatedPodState;
var lastInitTerminatedPodState =
jobPods.stream()
.map(
v1Pod ->
getTerminatedState(
v1Pod,
new Function<V1PodStatus, List<V1ContainerStatus>>() {
@Override
public List<V1ContainerStatus> apply(V1PodStatus v1PodStatus) {
return v1PodStatus.getInitContainerStatuses();
}
}))
.filter(Optional::isPresent)
.map(Optional::get)
.max(Comparator.comparing(V1ContainerStateTerminated::getFinishedAt));

if (lastInitTerminatedPodState.isEmpty()) {
log.info("Could not find a data job terminated pod for job {}", job.getMetadata().getName());
}

return ImmutablePair.of(lastInitTerminatedPodState, lastMainTerminatedPodState);
}

/**
Expand All @@ -1521,23 +1555,35 @@ Optional<JobExecution> getJobExecutionStatus(V1Job job, JobStatusCondition jobSt
// jobCondition = null means that the K8S Job is still running
if (jobStatusCondition != null) {
// Job termination status
Optional<V1ContainerStateTerminated> lastTerminatedPodState = getTerminationStatus(job);
ImmutablePair<Optional<V1ContainerStateTerminated>, Optional<V1ContainerStateTerminated>>
podTerminationStatus = getTerminationStatus(job);

Optional<V1ContainerStateTerminated> lastInitContainerStateTerminated =
podTerminationStatus.getLeft();
// Termination Reason of the pod init container
lastInitContainerStateTerminated
.map(
v1ContainerStateTerminated ->
StringUtils.trim(v1ContainerStateTerminated.getReason()))
.ifPresent(s -> jobExecutionStatusBuilder.initContainerTerminationReason(s));

Optional<V1ContainerStateTerminated> lastMainContainerStateTerminated =
podTerminationStatus.getRight();
// If the job completed but its pod did not produce a termination message, we infer the
// termination
// status later, based on the status of the job itself.
lastTerminatedPodState
// termination status later, based on the status of the job itself.
lastMainContainerStateTerminated
.map(
v1ContainerStateTerminated ->
StringUtils.trim(v1ContainerStateTerminated.getMessage()))
.ifPresent(s -> jobExecutionStatusBuilder.podTerminationMessage(s));
.ifPresent(s -> jobExecutionStatusBuilder.mainContainerTerminationMessage(s));
jobExecutionStatusBuilder.jobTerminationReason(jobStatusCondition.getReason());

// Termination Reason of the data job pod container
lastTerminatedPodState
lastMainContainerStateTerminated
.map(
v1ContainerStateTerminated ->
StringUtils.trim(v1ContainerStateTerminated.getReason()))
.ifPresent(s -> jobExecutionStatusBuilder.containerTerminationReason(s));
.ifPresent(s -> jobExecutionStatusBuilder.mainContainerTerminationReason(s));
}

// Job resources
Expand Down Expand Up @@ -1654,7 +1700,7 @@ Optional<JobExecution> getJobExecutionStatus(V1Job job, JobStatusCondition jobSt

// omits events that come after the Data Job completion
if (jobExecutionStatusBuilder.succeeded != null
&& StringUtils.isBlank(jobExecutionStatusBuilder.containerTerminationReason)) {
&& StringUtils.isBlank(jobExecutionStatusBuilder.initContainerTerminationReason)) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ private static class PodTerminationMessage {
*/
public static ExecutionResult getResult(KubernetesService.JobExecution jobExecution) {
PodTerminationMessage podTerminationMessage =
parsePodTerminationMessage(jobExecution.getPodTerminationMessage());
parsePodTerminationMessage(jobExecution.getMainContainerTerminationMessage());
ExecutionStatus executionStatus =
getExecutionStatus(
jobExecution.getSucceeded(),
podTerminationMessage.getStatus(),
jobExecution.getJobTerminationReason(),
jobExecution.getContainerTerminationReason(),
jobExecution.getMainContainerTerminationReason(),
jobExecution.getStartTime());

return ExecutionResult.builder()
Expand All @@ -78,37 +78,37 @@ public static ExecutionResult getResult(KubernetesService.JobExecution jobExecut
* </ul>
*
* @param executionSucceeded K8s Job status (true - succeeded, false - failed, null - running)
* @param podTerminationStatus termination status returned from K8S Pod (e.g. "Success", "User
* error", etc.)
* @param mainContainerTerminationMessage termination status returned from K8S Pod (e.g.
* "Success", "User error", etc.)
* @param jobTerminationReason condition reason as reported by K8s Job (e.g. "DeadlineExceeded",
* "BackoffLimitExceeded", etc.)
* @param containerTerminationReason termination reason for pod container as returned by K8s pod
* container (e.g., "OOMKilled", etc.)
* @param mainContainerTerminationReason termination reason for pod container as returned by K8s
* pod container (e.g., "OOMKilled", etc.)
* @param executionStarTime K8S Job execution start time
* @return if there is no termination message due to the missing K8S Pod returns execution status
* based on K8S Job status otherwise returns execution status based on the K8S Pod termination
* status.
*/
private static ExecutionStatus getExecutionStatus(
Boolean executionSucceeded,
String podTerminationStatus,
String mainContainerTerminationMessage,
String jobTerminationReason,
String containerTerminationReason,
String mainContainerTerminationReason,
OffsetDateTime executionStarTime) {

ExecutionStatus executionStatus;

if (executionSucceeded == null) {
executionStatus =
executionStarTime == null ? ExecutionStatus.SUBMITTED : ExecutionStatus.RUNNING;
} else if (executionSucceeded && StringUtils.isEmpty(podTerminationStatus)) {
} else if (executionSucceeded && StringUtils.isEmpty(mainContainerTerminationMessage)) {
executionStatus = ExecutionStatus.SUCCEEDED;
} else if (!executionSucceeded && StringUtils.isEmpty(podTerminationStatus)) {
executionStatus = inferError(jobTerminationReason, containerTerminationReason);
} else if (!executionSucceeded && StringUtils.isEmpty(mainContainerTerminationMessage)) {
executionStatus = inferError(jobTerminationReason, mainContainerTerminationReason);
} else {
executionStatus =
Arrays.stream(ExecutionStatus.values())
.filter(status -> status.getPodStatus().equals(podTerminationStatus))
.filter(status -> status.getPodStatus().equals(mainContainerTerminationMessage))
.findAny()
.orElse(ExecutionStatus.PLATFORM_ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public Optional<com.vmware.taurus.service.model.DataJobExecution> updateJobExecu
.status(executionStatus)
.message(
getJobExecutionApiMessage(
executionStatus, jobExecution.getContainerTerminationReason()))
executionStatus, jobExecution.getMainContainerTerminationReason()))
.opId(jobExecution.getOpId())
.endTime(jobExecution.getEndTime())
.vdkVersion(executionResult.getVdkVersion())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void watchJobs() {
"Termination message of Data Job {} with execution {}: {}",
s.getJobName(),
s.getExecutionId(),
s.getPodTerminationMessage());
s.getMainContainerTerminationMessage());
recordJobExecutionStatus(s);
},
runningJobExecutionIds -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.vmware.taurus.service.model.JobLabel;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.openapi.models.*;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
Expand Down Expand Up @@ -88,7 +89,8 @@ public void testGetJobExecutionStatus_emptyJob_shouldReturnEmptyJobExecutionStat
V1Job v1Job = new V1Job();
var mock = Mockito.mock(KubernetesService.class);
Mockito.when(mock.getK8sSupportsV1CronJob()).thenReturn(false);
Mockito.when(mock.getTerminationStatus(v1Job)).thenReturn(Optional.empty());
Mockito.when(mock.getTerminationStatus(v1Job))
.thenReturn(ImmutablePair.of(Optional.empty(), Optional.empty()));
Mockito.when(mock.getJobExecutionStatus(v1Job, null)).thenCallRealMethod();
Optional<KubernetesService.JobExecution> actualJobExecutionStatusOptional =
mock.getJobExecutionStatus(v1Job, null);
Expand Down Expand Up @@ -175,7 +177,10 @@ public void testGetJobExecutionStatus_notEmptyJob_shouldReturnNotEmptyJobExecuti
KubernetesService mock = Mockito.mock(KubernetesService.class);
Mockito.when(mock.getK8sSupportsV1CronJob()).thenReturn(false);
Mockito.when(mock.getTerminationStatus(expectedJob))
.thenReturn(Optional.ofNullable(new V1ContainerStateTerminated().reason("test")));
.thenReturn(
ImmutablePair.of(
Optional.ofNullable(new V1ContainerStateTerminated().reason("test")),
Optional.ofNullable(new V1ContainerStateTerminated().reason("test"))));
Mockito.when(mock.getJobExecutionStatus(expectedJob, condition)).thenCallRealMethod();
Optional<KubernetesService.JobExecution> actualJobExecutionStatusOptional =
mock.getJobExecutionStatus(expectedJob, condition);
Expand Down
Loading