Skip to content

Commit

Permalink
control-service: killed job was shown as successful (#2116)
Browse files Browse the repository at this point in the history
# Why
We recently got the following feedback from our internal client: A data
job was listed as successful even though it hit the 12 hour limit and
was killed; the logs do not show that either - the last entry in the log
just shows the last object that was sent for ingestion, but there is no
summary of the data job.

The problem is caused by the following fix -
#1586.

When the job hit the 12-hour limit the K8S Pod is terminated and we
construct partial JobExecutionStatus which enters in the following if
statement and returns Optional.empty() rather than the constructed
object.


https://github.com/vmware/versatile-data-kit/blob/4763ba877f43b270fbd4770bc1533216f7c5d618/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/KubernetesService.java#L1656

As a result, this job execution becomes stuck in the Running status
until it is detected by emergency logic, which marks such executions as
successful due to the lack of associated Pods to them.

# What
Added validation for an already completed job in a more appropriate
place.

# Testing Done
Added integration test

Signed-off-by: Miroslav Ivanov [email protected]

---------

Signed-off-by: Miroslav Ivanov [email protected]
Co-authored-by: github-actions <>
  • Loading branch information
mivanov1988 authored May 25, 2023
1 parent 6a4c358 commit 67e739c
Show file tree
Hide file tree
Showing 15 changed files with 301 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.vmware.taurus.controlplane.model.data.DataJobMode;
import com.vmware.taurus.controlplane.model.data.DataJobVersion;
import com.vmware.taurus.datajobs.it.common.BaseIT;
import com.vmware.taurus.datajobs.it.common.JobExecutionUtil;
import com.vmware.taurus.service.deploy.JobImageDeployer;
import com.vmware.taurus.service.model.JobDeploymentStatus;
import org.apache.commons.io.IOUtils;
Expand Down Expand Up @@ -51,9 +50,6 @@
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = ControlplaneApplication.class)
public class DataJobDeploymentCrudIT extends BaseIT {

private static final String TEST_JOB_NAME =
JobExecutionUtil.generateJobName(DataJobDeploymentCrudIT.class.getSimpleName());
private static final Object DEPLOYMENT_ID = "testing";

@TestConfiguration
Expand All @@ -70,7 +66,7 @@ public TaskExecutor taskExecutor() {

@BeforeEach
public void setup() throws Exception {
String dataJobRequestBody = getDataJobRequestBody(TEST_TEAM_NAME, TEST_JOB_NAME);
String dataJobRequestBody = getDataJobRequestBody(TEST_TEAM_NAME, testJobName);

// Execute create job
mockMvc
Expand All @@ -89,7 +85,7 @@ public void setup() throws Exception {
s.endsWith(
String.format(
"/data-jobs/for-team/%s/jobs/%s",
TEST_TEAM_NAME, TEST_JOB_NAME)))));
TEST_TEAM_NAME, testJobName)))));
}

@Test
Expand All @@ -104,7 +100,7 @@ public void testDataJobDeploymentCrud() throws Exception {
mockMvc
.perform(
post(String.format(
"/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_NAME, TEST_JOB_NAME))
"/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_NAME, testJobName))
.content(jobZipBinary)
.contentType(MediaType.APPLICATION_OCTET_STREAM))
.andExpect(status().isUnauthorized());
Expand All @@ -114,7 +110,7 @@ public void testDataJobDeploymentCrud() throws Exception {
mockMvc
.perform(
post(String.format(
"/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_NAME, TEST_JOB_NAME))
"/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_NAME, testJobName))
.with(user("user"))
.content(jobZipBinary)
.contentType(MediaType.APPLICATION_OCTET_STREAM))
Expand All @@ -136,7 +132,7 @@ public void testDataJobDeploymentCrud() throws Exception {
mockMvc
.perform(
post(String.format(
"/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_WRONG_NAME, TEST_JOB_NAME))
"/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_WRONG_NAME, testJobName))
.with(user("user"))
.content(jobZipBinary)
.contentType(MediaType.APPLICATION_OCTET_STREAM))
Expand All @@ -146,7 +142,7 @@ public void testDataJobDeploymentCrud() throws Exception {
mockMvc
.perform(
post(String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments", TEST_TEAM_NAME, TEST_JOB_NAME))
"/data-jobs/for-team/%s/jobs/%s/deployments", TEST_TEAM_NAME, testJobName))
.content(dataJobDeploymentRequestBody)
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isUnauthorized());
Expand All @@ -155,7 +151,7 @@ public void testDataJobDeploymentCrud() throws Exception {
mockMvc
.perform(
post(String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments", TEST_TEAM_NAME, TEST_JOB_NAME))
"/data-jobs/for-team/%s/jobs/%s/deployments", TEST_TEAM_NAME, testJobName))
.with(user("user"))
.content(dataJobDeploymentRequestBody)
.contentType(MediaType.APPLICATION_JSON))
Expand All @@ -166,13 +162,13 @@ public void testDataJobDeploymentCrud() throws Exception {
.perform(
post(String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments",
TEST_TEAM_WRONG_NAME, TEST_JOB_NAME))
TEST_TEAM_WRONG_NAME, testJobName))
.with(user("user"))
.content(dataJobDeploymentRequestBody)
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isNotFound());

String jobDeploymentName = JobImageDeployer.getCronJobName(TEST_JOB_NAME);
String jobDeploymentName = JobImageDeployer.getCronJobName(testJobName);
// Verify job deployment created
Optional<JobDeploymentStatus> cronJobOptional =
dataJobsKubernetesService.readCronJob(jobDeploymentName);
Expand All @@ -189,7 +185,7 @@ public void testDataJobDeploymentCrud() throws Exception {
.perform(
get(String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_NAME, testJobName, DEPLOYMENT_ID))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isUnauthorized());

Expand All @@ -199,7 +195,7 @@ public void testDataJobDeploymentCrud() throws Exception {
.perform(
get(String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_NAME, testJobName, DEPLOYMENT_ID))
.with(user("user"))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
Expand All @@ -225,7 +221,7 @@ public void testDataJobDeploymentCrud() throws Exception {
.perform(
get(String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_WRONG_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_WRONG_NAME, testJobName, DEPLOYMENT_ID))
.with(user("user"))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isNotFound());
Expand All @@ -236,7 +232,7 @@ public void testDataJobDeploymentCrud() throws Exception {
patch(
String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_NAME, testJobName, DEPLOYMENT_ID))
.content(getDataJobDeploymentEnableRequestBody(false))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isUnauthorized());
Expand All @@ -247,7 +243,7 @@ public void testDataJobDeploymentCrud() throws Exception {
patch(
String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_NAME, testJobName, DEPLOYMENT_ID))
.with(user("user"))
.content(getDataJobDeploymentEnableRequestBody(false))
.contentType(MediaType.APPLICATION_JSON))
Expand All @@ -259,7 +255,7 @@ public void testDataJobDeploymentCrud() throws Exception {
patch(
String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_WRONG_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_WRONG_NAME, testJobName, DEPLOYMENT_ID))
.with(user("user"))
.content(getDataJobDeploymentEnableRequestBody(false))
.contentType(MediaType.APPLICATION_JSON))
Expand All @@ -277,7 +273,7 @@ public void testDataJobDeploymentCrud() throws Exception {
patch(
String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_NAME, testJobName, DEPLOYMENT_ID))
.with(user("user"))
.content(getDataJobDeploymentVdkVersionRequestBody("new_vdk_version_tag"))
.contentType(MediaType.APPLICATION_JSON))
Expand All @@ -289,7 +285,7 @@ public void testDataJobDeploymentCrud() throws Exception {
patch(
String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_NAME, testJobName, DEPLOYMENT_ID))
.with(user("user"))
.content(getDataJobDeploymentEnableRequestBody(false))
.contentType(MediaType.APPLICATION_JSON))
Expand All @@ -301,7 +297,7 @@ public void testDataJobDeploymentCrud() throws Exception {
patch(
String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_NAME, testJobName, DEPLOYMENT_ID))
.with(user("user"))
.content(getDataJobDeploymentVdkVersionRequestBody(""))
.contentType(MediaType.APPLICATION_JSON))
Expand All @@ -312,7 +308,7 @@ public void testDataJobDeploymentCrud() throws Exception {
.perform(
get(String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_NAME, testJobName, DEPLOYMENT_ID))
.with(user("user"))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
Expand All @@ -324,7 +320,7 @@ public void testDataJobDeploymentCrud() throws Exception {
delete(
String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_NAME, testJobName, DEPLOYMENT_ID))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isUnauthorized());

Expand All @@ -334,7 +330,7 @@ public void testDataJobDeploymentCrud() throws Exception {
delete(
String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_WRONG_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_WRONG_NAME, testJobName, DEPLOYMENT_ID))
.with(user("user"))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isNotFound());
Expand All @@ -345,7 +341,7 @@ public void testDataJobDeploymentCrud() throws Exception {
delete(
String.format(
"/data-jobs/for-team/%s/jobs/%s/deployments/%s",
TEST_TEAM_NAME, TEST_JOB_NAME, DEPLOYMENT_ID))
TEST_TEAM_NAME, testJobName, DEPLOYMENT_ID))
.with(user("user"))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isAccepted());
Expand All @@ -363,7 +359,7 @@ public void cleanUp() throws Exception {
.perform(
delete(
String.format(
"/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_NAME, TEST_JOB_NAME))
"/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_NAME, testJobName))
.with(user("user")))
.andExpect(status().isOk());
}
Expand All @@ -377,7 +373,7 @@ public void testDataJobDeleteSource() throws Exception {
mockMvc
.perform(
post(String.format(
"/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_NAME, TEST_JOB_NAME))
"/data-jobs/for-team/%s/jobs/%s/sources", TEST_TEAM_NAME, testJobName))
.with(user("user"))
.content(jobZipBinary)
.contentType(MediaType.APPLICATION_OCTET_STREAM))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.datajobs.it.common.BaseIT;
import com.vmware.taurus.datajobs.it.common.DataJobDeploymentExtension;
import com.vmware.taurus.datajobs.it.common.JobExecutionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Test;
Expand All @@ -27,7 +26,7 @@
// This is a standard cron job template except restartPolicy is set to never so that when a
// job runs out of memory it is
// not retied but instead reports more quickly that it is a platform error
"datajobs.control.k8s.data.job.template.file=fast_failing_cron_job.yaml"
"datajobs.control.k8s.data.job.template.file=data_job_templates/fast_failing_cron_job.yaml"
})
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
Expand All @@ -42,7 +41,7 @@ public void testDataJob_causesOOM_shouldCompleteWithUserError(
String jobName, String teamName, String username, String deploymentId) throws Exception {
// manually start job execution
ImmutablePair<String, String> executeDataJobResult =
JobExecutionUtil.executeDataJob(jobName, teamName, username, deploymentId, mockMvc);
executeDataJob(jobName, teamName, username, deploymentId, mockMvc);
String opId = executeDataJobResult.getLeft();
String executionId = executeDataJobResult.getRight();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,31 @@
import com.vmware.taurus.datajobs.it.common.JobExecutionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.TestPropertySource;

@Slf4j
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = ControlplaneApplication.class)
@TestPropertySource(
properties = {
"datajobs.job.resources.requests.memory=6Mi",
"datajobs.job.resources.limits.memory=6Mi",
// This is a standard cron job template except restartPolicy is set to never so that when a
// job runs out of memory it is
// not retied but instead reports more quickly that it is a platform error
"datajobs.control.k8s.data.job.template.file=data_job_templates/fast_failing_cron_job.yaml"
})
public class DataJobMainContainerOOMIT extends BaseIT {

@RegisterExtension
static DataJobDeploymentExtension dataJobDeploymentExtension =
new DataJobDeploymentExtension("oom_job.zip");

// @Test
@Test
public void testDataJob_causesOOM_shouldCompleteWithUserError(
String jobName, String teamName, String username, String deploymentId) throws Exception {
// manually start job execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.datajobs.it.common.BaseIT;
import com.vmware.taurus.properties.service.PropertiesRepository;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
Expand All @@ -27,8 +25,6 @@
classes = ControlplaneApplication.class)
public class DataJobPropertiesIT extends BaseIT {

@Autowired private PropertiesRepository propertiesRepository;

@Test
public void testDataJobProperties() throws Exception {
// Setup
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2021-2023 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.datajobs.it;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.datajobs.it.common.BaseIT;
import com.vmware.taurus.datajobs.it.common.DataJobDeploymentExtension;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.TestPropertySource;

import static com.vmware.taurus.datajobs.it.common.JobExecutionUtil.*;

@Slf4j
@TestPropertySource(
properties = {
// This is a standard cron job template except activeDeadlineSeconds is set to 1
"datajobs.control.k8s.data.job.template.file=data_job_templates/backoff_limit_exceeded_cron_job.yaml"
})
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = ControlplaneApplication.class)
public class TestDataJobBackoffLimitExceededIT extends BaseIT {

@RegisterExtension
static DataJobDeploymentExtension dataJobDeploymentExtension = new DataJobDeploymentExtension();

@Test
public void testDataJob_causesBackoffLimitExceeded_shouldCompleteWithUserError(
String jobName, String teamName, String username, String deploymentId) throws Exception {
// manually start job execution
ImmutablePair<String, String> executeDataJobResult =
executeDataJob(jobName, teamName, username, deploymentId, mockMvc);
String opId = executeDataJobResult.getLeft();
String executionId = executeDataJobResult.getRight();

// Check the data job execution status
testDataJobExecutionRead(
executionId,
DataJobExecution.StatusEnum.USER_ERROR,
opId,
jobName,
teamName,
username,
mockMvc);
testDataJobExecutionList(
executionId,
DataJobExecution.StatusEnum.USER_ERROR,
opId,
jobName,
teamName,
username,
mockMvc);
testDataJobDeploymentExecutionList(
executionId,
DataJobExecution.StatusEnum.USER_ERROR,
opId,
jobName,
teamName,
username,
mockMvc);
}
}
Loading

0 comments on commit 67e739c

Please sign in to comment.