Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control-service: integration test for async job deploy #2829

Merged
merged 22 commits into from
Nov 9, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public abstract class BaseDataJobDeploymentCrudIT extends BaseIT {

protected abstract void beforeDeploymentDeletion() throws Exception;

protected abstract void afterDeploymentDeletion() throws Exception;

@BeforeEach
public void setup() throws Exception {
String dataJobRequestBody = getDataJobRequestBody(TEST_TEAM_NAME, testJobName);
Expand Down Expand Up @@ -109,7 +111,7 @@ public void testDataJobDeploymentCrud() throws Exception {
Assertions.assertFalse(StringUtils.isBlank(testJobVersionSha));

// Setup
String dataJobDeploymentRequestBody = getDataJobDeploymentRequestBody(testJobVersionSha);
String dataJobDeploymentRequestBody = getDataJobDeploymentRequestBody(testJobVersionSha, "3.9");

// Execute job upload with wrong team name and user
mockMvc
Expand Down Expand Up @@ -302,6 +304,7 @@ public void testDataJobDeploymentCrud() throws Exception {

// Verify deployment deleted
waitUntil(() -> dataJobsKubernetesService.readCronJob(jobDeploymentName).isEmpty());
afterDeploymentDeletion();
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
package com.vmware.taurus.datajobs.it;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.service.model.ActualDataJobDeployment;
import com.vmware.taurus.service.model.DeploymentStatus;
import com.vmware.taurus.service.model.DesiredDataJobDeployment;
import com.vmware.taurus.service.repository.ActualJobDeploymentRepository;
import com.vmware.taurus.service.repository.DesiredJobDeploymentRepository;
import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.TestPropertySource;

import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*;

@TestPropertySource(
properties = {
"datajobs.control.k8s.k8sSupportsV1CronJob=true",
Expand All @@ -24,6 +29,61 @@
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = ControlplaneApplication.class)
public class DataJobDeploymentCrudAsyncIT extends BaseDataJobDeploymentCrudIT {

@Autowired private DesiredJobDeploymentRepository desiredJobDeploymentRepository;
@Autowired private ActualJobDeploymentRepository actualJobDeploymentRepository;

@Override
protected void beforeDeploymentDeletion() {
var desiredDataJobDeployment = desiredJobDeploymentRepository.findById(testJobName);
var actualDataJobDeployment = actualJobDeploymentRepository.findById(testJobName);

checkDesiredDeployment(desiredDataJobDeployment);
checkActualDeployment(actualDataJobDeployment);
}

@Override
protected void beforeDeploymentDeletion() {}
protected void afterDeploymentDeletion() {
var desiredDataJobDeployment = desiredJobDeploymentRepository.findById(testJobName);
var actualDataJobDeployment = actualJobDeploymentRepository.findById(testJobName);

checkDesiredDeploymentDeleted(desiredDataJobDeployment);
checkActualDeploymentDeleted(actualDataJobDeployment);
}

private void checkDesiredDeployment(Optional<DesiredDataJobDeployment> desiredDataJobDeployment) {
Assertions.assertTrue(desiredDataJobDeployment.isPresent());
var deployment = desiredDataJobDeployment.get();
Assertions.assertEquals(DeploymentStatus.SUCCESS, deployment.getStatus());
Assertions.assertEquals(TEST_JOB_SCHEDULE, deployment.getSchedule());
Assertions.assertEquals("test-team", deployment.getDataJob().getJobConfig().getTeam());
Assertions.assertFalse(deployment.getEnabled());
Assertions.assertEquals(testJobName, deployment.getDataJobName());
Assertions.assertFalse(deployment.getUserInitiated());
Assertions.assertEquals("user", deployment.getLastDeployedBy());
Assertions.assertNotNull(deployment.getGitCommitSha());
Assertions.assertEquals("3.9", deployment.getPythonVersion());
}

private void checkActualDeployment(Optional<ActualDataJobDeployment> actualDataJobDeployment) {
Assertions.assertTrue(actualDataJobDeployment.isPresent());
var deployment = actualDataJobDeployment.get();
Assertions.assertEquals("user", deployment.getLastDeployedBy());
Assertions.assertEquals(testJobName, deployment.getDataJobName());
Assertions.assertNotNull(deployment.getLastDeployedDate());
Assertions.assertFalse(deployment.getEnabled());
Assertions.assertEquals(testJobName, deployment.getDataJobName());
Assertions.assertEquals(TEST_JOB_SCHEDULE, deployment.getSchedule());
Assertions.assertEquals("3.9", deployment.getPythonVersion());
}

private void checkDesiredDeploymentDeleted(
Optional<DesiredDataJobDeployment> desiredDataJobDeployment) {
Assertions.assertFalse(desiredDataJobDeployment.isPresent());
}

private void checkActualDeploymentDeleted(
Optional<ActualDataJobDeployment> actualDataJobDeployment) {
Assertions.assertFalse(actualDataJobDeployment.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,9 @@ protected void beforeDeploymentDeletion() throws Exception {
.andExpect(status().isOk())
.andExpect(jsonPath("$.vdk_version", is("release")));
}

@Override
protected void afterDeploymentDeletion() {
// do nothing.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@

package com.vmware.taurus.service.deploy;

import static com.vmware.taurus.datajobs.it.common.WebHookServerMockExtension.TEST_TEAM_NAME;
import static org.awaitility.Awaitility.await;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.user;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.header;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.controlplane.model.data.DataJobVersion;
Expand All @@ -17,6 +25,9 @@
import com.vmware.taurus.service.repository.DesiredJobDeploymentRepository;
import com.vmware.taurus.service.repository.JobsRepository;
import io.kubernetes.client.openapi.ApiException;
import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand All @@ -36,18 +47,6 @@
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.web.servlet.ResultActions;

import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static com.vmware.taurus.datajobs.it.common.WebHookServerMockExtension.TEST_TEAM_NAME;
import static org.awaitility.Awaitility.await;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.user;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.header;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

@Import({DataJobDeploymentCrudITV2.TaskExecutorConfig.class})
@TestPropertySource(
properties = {
Expand All @@ -74,6 +73,14 @@ public class DataJobDeploymentCrudITV2 extends BaseIT {

@Autowired private DeploymentService deploymentService;

private boolean jobEnabled;
private DataJob dataJob;
private DesiredDataJobDeployment desiredDataJobDeployment;
private ActualDataJobDeployment actualDataJobDeployment;
private String deploymentVersionShaShouldNotBeChanged;
private OffsetDateTime lastDeployedDateInitial;
private String testJobVersionSha;

@TestConfiguration
static class TaskExecutorConfig {

Expand Down Expand Up @@ -110,45 +117,46 @@ public void setup() throws Exception {
TEST_TEAM_NAME, testJobName)))));
}

@Test
public void testSynchronizeDataJob() throws Exception {
public void uploadJob() throws Exception {
DataJobVersion testDataJobVersion = uploadDataJob();
Assertions.assertNotNull(testDataJobVersion);

String testJobVersionSha = testDataJobVersion.getVersionSha();
testJobVersionSha = testDataJobVersion.getVersionSha();
Assertions.assertFalse(StringUtils.isBlank(testJobVersionSha));
}

boolean jobEnabled = false;
DesiredDataJobDeployment desiredDataJobDeployment =
createDesiredDataJobDeployment(testJobVersionSha, jobEnabled);

// Checks if the deployment exist
public void checkDeploymentExists() {
Optional<JobDeploymentStatus> jobDeploymentStatusOptional =
deploymentService.readDeployment(testJobName);
Assertions.assertFalse(jobDeploymentStatusOptional.isPresent());
Assertions.assertFalse(actualJobDeploymentRepository.findById(testJobName).isPresent());
DataJob dataJob = jobsRepository.findById(testJobName).get();
}

// Deploys data job for the very first time
private ActualDataJobDeployment deployJobForFirstTime() {
dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, null, false);
ActualDataJobDeployment actualDataJobDeployment = verifyDeploymentStatus(jobEnabled);
String deploymentVersionShaInitial = actualDataJobDeployment.getDeploymentVersionSha();
OffsetDateTime lastDeployedDateInitial = actualDataJobDeployment.getLastDeployedDate();
Assertions.assertNotNull(deploymentVersionShaInitial);
Assertions.assertNotNull(lastDeployedDateInitial);
return actualDataJobDeployment;
}

// Tries to redeploy job without any changes
private String deployJobWithNoChanges() {
lastDeployedDateInitial = actualDataJobDeployment.getLastDeployedDate();
dataJobsSynchronizer.synchronizeDataJob(
dataJob, desiredDataJobDeployment, actualDataJobDeployment, true);
String deploymentVersionShaInitial = actualDataJobDeployment.getDeploymentVersionSha();
actualDataJobDeployment = verifyDeploymentStatus(jobEnabled);
String deploymentVersionShaShouldNotBeChanged =
actualDataJobDeployment.getDeploymentVersionSha();
OffsetDateTime lastDeployedDateShouldNotBeChanged =
actualDataJobDeployment.getLastDeployedDate();
Assertions.assertEquals(deploymentVersionShaInitial, deploymentVersionShaShouldNotBeChanged);
Assertions.assertEquals(lastDeployedDateInitial, lastDeployedDateShouldNotBeChanged);
return deploymentVersionShaShouldNotBeChanged;
}

// Tries to redeploy job with changes
private void deployJobWithChanges() {
jobEnabled = true;
desiredDataJobDeployment = updateDataJobDeployment(jobEnabled);
dataJobsSynchronizer.synchronizeDataJob(
Expand All @@ -159,62 +167,84 @@ public void testSynchronizeDataJob() throws Exception {
Assertions.assertNotEquals(lastDeployedDateInitial, lastDeployedDateShouldBeChanged);
Assertions.assertNotEquals(
deploymentVersionShaShouldNotBeChanged, deploymentVersionShaShouldBeChanged);
}

// Deletes deployment
private void deleteDeployment() {
desiredJobDeploymentRepository.deleteById(testJobName);
dataJobsSynchronizer.synchronizeDataJob(dataJob, null, actualDataJobDeployment, true);
Assertions.assertFalse(deploymentService.readDeployment(testJobName).isPresent());
Assertions.assertFalse(actualJobDeploymentRepository.findById(testJobName).isPresent());
}

@Test
public void testSynchronizeDataJobs() throws Exception {
DataJobVersion testDataJobVersion = uploadDataJob();
Assertions.assertNotNull(testDataJobVersion);
public void testSynchronizeDataJob() throws Exception {
uploadJob();

String testJobVersionSha = testDataJobVersion.getVersionSha();
Assertions.assertFalse(StringUtils.isBlank(testJobVersionSha));
jobEnabled = false;

boolean jobEnabled = false;
createDesiredDataJobDeployment(testJobVersionSha, jobEnabled);
desiredDataJobDeployment = createDesiredDataJobDeployment(testJobVersionSha, jobEnabled);

// Checks if the deployment exist
Optional<JobDeploymentStatus> jobDeploymentStatusOptional =
deploymentService.readDeployment(testJobName);
Assertions.assertFalse(jobDeploymentStatusOptional.isPresent());
Assertions.assertFalse(actualJobDeploymentRepository.findById(testJobName).isPresent());
checkDeploymentExists();

// Deploys data job for the very first time
dataJobsSynchronizer.synchronizeDataJobs();
dataJob = jobsRepository.findById(testJobName).get();

actualDataJobDeployment = deployJobForFirstTime();

// Wait for the job deployment to complete, polling every 15 seconds
// See: https://github.com/awaitility/awaitility/wiki/Usage
deploymentVersionShaShouldNotBeChanged = deployJobWithNoChanges();

deployJobWithChanges();

deleteDeployment();
}

private void waitDeploymentCompletion() {
await()
.atMost(10, TimeUnit.MINUTES)
.with()
.pollInterval(15, TimeUnit.SECONDS)
.until(() -> actualJobDeploymentRepository.findById(testJobName).isPresent());
}

private void checkDeployment() {
ActualDataJobDeployment actualDataJobDeployment = verifyDeploymentStatus(jobEnabled);
String deploymentVersionShaInitial = actualDataJobDeployment.getDeploymentVersionSha();
OffsetDateTime lastDeployedDateInitial = actualDataJobDeployment.getLastDeployedDate();
Assertions.assertNotNull(deploymentVersionShaInitial);
Assertions.assertNotNull(lastDeployedDateInitial);
}

jobsRepository.deleteById(testJobName);

// Re-deploys data job
dataJobsSynchronizer.synchronizeDataJobs();

// Wait for the job deployment to complete, polling every 15 seconds
// See: https://github.com/awaitility/awaitility/wiki/Usage
private void waitDeploymentDeletion() {
await()
.atMost(10, TimeUnit.MINUTES)
.with()
.pollInterval(15, TimeUnit.SECONDS)
.until(() -> deploymentService.readDeployment(testJobName).isEmpty());
}

@Test
public void testSynchronizeDataJobs() throws Exception {
uploadJob();

jobEnabled = false;
createDesiredDataJobDeployment(testJobVersionSha, jobEnabled);

checkDeploymentExists();

// Deploys data job for the very first time
dataJobsSynchronizer.synchronizeDataJobs();

waitDeploymentCompletion();

checkDeployment();

jobsRepository.deleteById(testJobName);

// Re-deploys data job
dataJobsSynchronizer.synchronizeDataJobs();

waitDeploymentDeletion();
}

private ActualDataJobDeployment verifyDeploymentStatus(boolean enabled) {
Optional<JobDeploymentStatus> deploymentStatusOptional =
deploymentService.readDeployment(testJobName);
Expand Down