Skip to content

Commit

Permalink
control-service: deployment controller reads from db (#2800)
Browse files Browse the repository at this point in the history
what: DataJobDeploymentController now has the functionality to read from
the DB.

why: Part of the persistence VEP which can be found here -
https://github.com/vmware/versatile-data-kit/tree/main/specs/vep-2272-complete-data-job-configuration-persistence
This PR will reduce the overhead on K8S when making API calls to the
control-service.

testing: added unit tests.

---------

Signed-off-by: mrMoZ1 <[email protected]>
Co-authored-by: github-actions <>
  • Loading branch information
Momchil Z authored Oct 18, 2023
1 parent 286f5e7 commit e9af4fa
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
import com.vmware.taurus.exception.ValidationException;
import com.vmware.taurus.service.JobsService;
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig;
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig.ReadFrom;
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig.WriteTo;
import com.vmware.taurus.service.deploy.DeploymentService;
import com.vmware.taurus.service.deploy.DeploymentServiceV2;
import com.vmware.taurus.service.diag.OperationContext;
import com.vmware.taurus.service.model.JobDeploymentStatus;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.slf4j.Logger;
Expand All @@ -27,13 +31,6 @@
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig.*;

/**
* REST controller for operations on data job deployments
*
Expand Down Expand Up @@ -112,39 +109,61 @@ public ResponseEntity<Void> deploymentPatch(
@Override
public ResponseEntity<List<DataJobDeploymentStatus>> deploymentList(
String teamName, String jobName, String deploymentId, DataJobMode dataJobMode) {
// TODO: deploymentId and mode not implemented
if (jobsService.jobWithTeamExists(jobName, teamName)) {
// TODO: deploymentId and mode not implemented
List<DataJobDeploymentStatus> deployments = Collections.emptyList();
Optional<JobDeploymentStatus> jobDeploymentStatus = Optional.empty();
if (dataJobDeploymentPropertiesConfig.getReadDataSource().equals(ReadFrom.K8S)) {
jobDeploymentStatus = deploymentService.readDeployment(jobName.toLowerCase());
}
if (jobDeploymentStatus.isPresent()) {
deployments =
Arrays.asList(ToApiModelConverter.toDataJobDeploymentStatus(jobDeploymentStatus.get()));
}
return ResponseEntity.ok(deployments);
return deploymentAsList(jobName.toLowerCase());
}
return ResponseEntity.notFound().build();
}

private ResponseEntity<List<DataJobDeploymentStatus>> deploymentAsList(String jobName) {
ResponseEntity<DataJobDeploymentStatus> jobDeploymentStatus;
if (dataJobDeploymentPropertiesConfig.getReadDataSource().equals(ReadFrom.DB)) {
jobDeploymentStatus = readFromDB(jobName);
} else if (dataJobDeploymentPropertiesConfig.getReadDataSource().equals(ReadFrom.K8S)) {
jobDeploymentStatus = readFromK8S(jobName);
} else {
jobDeploymentStatus = ResponseEntity.notFound().build();
}
var response = Arrays.asList(jobDeploymentStatus.getBody());
return ResponseEntity.status(jobDeploymentStatus.getStatusCode()).body(response);
}

@Override
public ResponseEntity<DataJobDeploymentStatus> deploymentRead(
String teamName, String jobName, String deploymentId) {
if (jobsService.jobWithTeamExists(jobName, teamName)) {
// TODO: deploymentId are not implemented.
Optional<JobDeploymentStatus> jobDeploymentStatus = Optional.empty();
if (dataJobDeploymentPropertiesConfig.getReadDataSource().equals(ReadFrom.K8S)) {
jobDeploymentStatus = deploymentService.readDeployment(jobName.toLowerCase());
}
if (jobDeploymentStatus.isPresent()) {
return ResponseEntity.ok(
ToApiModelConverter.toDataJobDeploymentStatus(jobDeploymentStatus.get()));
if (dataJobDeploymentPropertiesConfig.getReadDataSource().equals(ReadFrom.DB)) {
return readFromDB(jobName.toLowerCase());
} else if (dataJobDeploymentPropertiesConfig.getReadDataSource().equals(ReadFrom.K8S)) {
return readFromK8S(jobName.toLowerCase());
}
}
return ResponseEntity.notFound().build();
}

private ResponseEntity<DataJobDeploymentStatus> readFromK8S(String jobName) {
Optional<JobDeploymentStatus> jobDeploymentStatus =
deploymentService.readDeployment(jobName.toLowerCase());
if (jobDeploymentStatus.isPresent()) {
return ResponseEntity.ok(
ToApiModelConverter.toDataJobDeploymentStatus(jobDeploymentStatus.get()));
}
return ResponseEntity.notFound().build();
}

private ResponseEntity<DataJobDeploymentStatus> readFromDB(String dataJobName) {
var jobDeploymentOptional = deploymentServiceV2.readDeployment(dataJobName);
var jobOptional = jobsService.getByName(dataJobName);
if (jobDeploymentOptional.isPresent()) {
var deploymentResponse =
DeploymentModelConverter.toJobDeploymentStatus(
jobDeploymentOptional.get(), jobOptional.get());
return ResponseEntity.ok(deploymentResponse);
}
return ResponseEntity.notFound().build();
}

@Override
public ResponseEntity<Void> deploymentUpdate(
String teamName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@

package com.vmware.taurus.datajobs;

import com.vmware.taurus.controlplane.model.data.DataJobContacts;
import com.vmware.taurus.controlplane.model.data.DataJobDeploymentStatus;
import com.vmware.taurus.controlplane.model.data.DataJobResources;
import com.vmware.taurus.controlplane.model.data.DataJobSchedule;
import com.vmware.taurus.service.model.ActualDataJobDeployment;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.model.DataJobDeploymentResources;
import com.vmware.taurus.service.model.DesiredDataJobDeployment;
import com.vmware.taurus.service.model.JobDeployment;
Expand Down Expand Up @@ -282,4 +286,47 @@ private static void mergeDeploymentResources(
: oldDeployment.getResources().getMemoryLimitMi());
mergedDeployment.setResources(resources);
}

public static DataJobDeploymentStatus toJobDeploymentStatus(
ActualDataJobDeployment actualDataJobDeployment, DataJob job) {
var deploymentStatus = new DataJobDeploymentStatus();
deploymentStatus.setJobVersion(actualDataJobDeployment.getDeploymentVersionSha());
deploymentStatus.setPythonVersion(actualDataJobDeployment.getPythonVersion());
deploymentStatus.setId(actualDataJobDeployment.getDataJobName());
deploymentStatus.setEnabled(actualDataJobDeployment.getEnabled());
deploymentStatus.setContacts(getContactsFromJob(job));
deploymentStatus.setSchedule(
new DataJobSchedule().scheduleCron(actualDataJobDeployment.getSchedule()));
deploymentStatus.setResources(getResourcesFromDeployment(actualDataJobDeployment));
deploymentStatus.setLastDeployedDate(
actualDataJobDeployment.getLastDeployedDate() == null
? null
: actualDataJobDeployment.getLastDeployedDate().toString());
deploymentStatus.setLastDeployedBy(actualDataJobDeployment.getLastDeployedBy());
return deploymentStatus;
}

private static DataJobContacts getContactsFromJob(DataJob job) {
DataJobContacts contacts = new DataJobContacts();
if (job.getJobConfig() != null) {
var config = job.getJobConfig();
contacts.setNotifiedOnJobDeploy(config.getNotifiedOnJobDeploy());
contacts.setNotifiedOnJobFailurePlatformError(config.getNotifiedOnJobFailurePlatformError());
contacts.setNotifiedOnJobSuccess(config.getNotifiedOnJobSuccess());
contacts.setNotifiedOnJobFailureUserError(config.getNotifiedOnJobFailureUserError());
}
return contacts;
}

private static DataJobResources getResourcesFromDeployment(ActualDataJobDeployment deployment) {
DataJobResources resources = new DataJobResources();
var deploymentResources = deployment.getResources();
if (deploymentResources != null) {
resources.setCpuRequest(deploymentResources.getCpuRequestCores());
resources.setCpuLimit(deploymentResources.getCpuLimitCores());
resources.setMemoryRequest(deploymentResources.getMemoryRequestMi());
resources.setMemoryLimit(deploymentResources.getMemoryLimitMi());
}
return resources;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2021-2023 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.datajobs;

import com.vmware.taurus.service.model.ActualDataJobDeployment;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.model.DataJobDeploymentResources;
import com.vmware.taurus.service.model.JobConfig;
import java.time.OffsetDateTime;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class ModelApiConverterTest {

@Test
public void testToJobDeploymentStatus() {
var job = createTestJob("name", "team");
var deployment = createActualJobDeployment(job);
var status = DeploymentModelConverter.toJobDeploymentStatus(deployment, job);

Assertions.assertEquals("test-sha", status.getJobVersion());
Assertions.assertEquals("3.9-secure", status.getPythonVersion());
Assertions.assertEquals("name", status.getId());
Assertions.assertEquals(true, status.getEnabled());
Assertions.assertEquals("user", status.getLastDeployedBy());
Assertions.assertEquals(OffsetDateTime.MIN.toString(), status.getLastDeployedDate());
Assertions.assertEquals("[email protected]", status.getContacts().getNotifiedOnJobDeploy().get(0));
Assertions.assertEquals(1, status.getResources().getMemoryLimit());
Assertions.assertEquals(1, status.getResources().getMemoryRequest());
Assertions.assertEquals(1f, status.getResources().getCpuLimit());
Assertions.assertEquals(1f, status.getResources().getCpuRequest());
}

@Test
public void testToJobDeploymentStatus_emptyValues_expectNoExceptions() {
var job = new DataJob();
var deployment = new ActualDataJobDeployment();
DeploymentModelConverter.toJobDeploymentStatus(deployment, job);
}

private DataJob createTestJob(String jobName, String teamName) {
var dataJob = ToModelApiConverter.toDataJob(TestUtils.getDataJob(teamName, jobName));
var jobConfig = new JobConfig();
jobConfig.setTeam(teamName);
jobConfig.setNotifiedOnJobDeploy(List.of("[email protected]"));
dataJob.setJobConfig(jobConfig);
return dataJob;
}

private ActualDataJobDeployment createActualJobDeployment(DataJob dataJob) {
var deployment = new ActualDataJobDeployment();
deployment.setGitCommitSha("actualSha");
deployment.setDataJob(dataJob);
deployment.setDataJobName(dataJob.getName());
deployment.setPythonVersion("3.9-secure");
deployment.setEnabled(true);
deployment.setLastDeployedBy("user");
deployment.setSchedule("sched");
deployment.setDeploymentVersionSha("test-sha");
deployment.setLastDeployedDate(OffsetDateTime.MIN);
var resources = new DataJobDeploymentResources();
resources.setMemoryLimitMi(1);
resources.setMemoryRequestMi(1);
resources.setCpuLimitCores(1f);
resources.setCpuRequestCores(1f);
deployment.setResources(resources);
return deployment;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2021-2023 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.service.deploy;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.controlplane.model.data.DataJobMode;
import com.vmware.taurus.datajobs.DataJobsDeploymentController;
import com.vmware.taurus.datajobs.TestUtils;
import com.vmware.taurus.datajobs.ToModelApiConverter;
import com.vmware.taurus.service.model.ActualDataJobDeployment;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.model.DataJobDeploymentResources;
import com.vmware.taurus.service.model.JobConfig;
import com.vmware.taurus.service.repository.ActualJobDeploymentRepository;
import com.vmware.taurus.service.repository.JobsRepository;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.security.test.context.support.WithMockUser;
import org.springframework.test.context.TestPropertySource;

@SpringBootTest(classes = ControlplaneApplication.class)
@TestPropertySource(
properties = {"datajobs.deployment.configuration.persistence.readDataSource=DB"})
public class DataJobDeploymentControllerReadTest {

@Autowired JobsRepository jobsRepository;
@Autowired DataJobsDeploymentController dataJobsDeploymentController;
@Autowired ActualJobDeploymentRepository actualJobDeploymentRepository;

@WithMockUser
@Test
public void testReadJob_expectResponse() {
var job = createTestJob("test-job", "team");
createActualJobDeployment(job);
var retrievedDeployment = dataJobsDeploymentController.deploymentRead("team", "test-job", "");
Assertions.assertEquals(200, retrievedDeployment.getStatusCodeValue());
Assertions.assertEquals("test-job", retrievedDeployment.getBody().getId());
}

@WithMockUser
@Test
public void testReadJob_expectNoResponse() {
var retrievedDeployment = dataJobsDeploymentController.deploymentRead("team", "test-job", "");
Assertions.assertEquals(404, retrievedDeployment.getStatusCodeValue());
}

@WithMockUser
@Test
public void testReadJob_asList_expectResponse() {
var job = createTestJob("test-job", "team");
createActualJobDeployment(job);
var retrievedDeploymentList =
dataJobsDeploymentController.deploymentList("team", "test-job", "", DataJobMode.RELEASE);
Assertions.assertEquals(200, retrievedDeploymentList.getStatusCodeValue());
Assertions.assertEquals("test-job", retrievedDeploymentList.getBody().get(0).getId());
}

@WithMockUser
@Test
public void testReadJob_asList_expectNoResponse() {
var retrievedDeploymentList =
dataJobsDeploymentController.deploymentList("team", "test-job", "", DataJobMode.RELEASE);
Assertions.assertEquals(404, retrievedDeploymentList.getStatusCodeValue());
}

@AfterEach
public void cleanup() {
jobsRepository.deleteAll();
actualJobDeploymentRepository.deleteAll();
}

private DataJob createTestJob(String jobName, String teamName) {
var dataJob = ToModelApiConverter.toDataJob(TestUtils.getDataJob(teamName, jobName));
var jobConfig = new JobConfig();
jobConfig.setTeam(teamName);
dataJob.setJobConfig(jobConfig);
jobsRepository.save(dataJob);
return dataJob;
}

private ActualDataJobDeployment createActualJobDeployment(DataJob dataJob) {
var deployment = new ActualDataJobDeployment();
deployment.setGitCommitSha("actualSha");
deployment.setDataJob(dataJob);
deployment.setDataJobName(dataJob.getName());
deployment.setPythonVersion("3.9-secure");
deployment.setEnabled(true);
deployment.setLastDeployedBy("user");
deployment.setSchedule("sched");
var resources = new DataJobDeploymentResources();
resources.setMemoryLimitMi(1);
resources.setMemoryRequestMi(1);
resources.setCpuLimitCores(1f);
resources.setCpuRequestCores(1f);
deployment.setResources(resources);
actualJobDeploymentRepository.save(deployment);
return deployment;
}
}

0 comments on commit e9af4fa

Please sign in to comment.