diff --git a/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/datajobs/DeploymentModelConverter.java b/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/datajobs/DeploymentModelConverter.java index f1380a5c49..26f1a2d079 100644 --- a/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/datajobs/DeploymentModelConverter.java +++ b/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/datajobs/DeploymentModelConverter.java @@ -317,6 +317,28 @@ private static DataJobContacts getContactsFromJob(DataJob job) { return contacts; } + public static JobDeploymentStatus toJobDeploymentStatus( + ActualDataJobDeployment deploymentStatus) { + JobDeploymentStatus jobDeploymentStatus = new JobDeploymentStatus(); + + jobDeploymentStatus.setDataJobName(deploymentStatus.getDataJobName()); + jobDeploymentStatus.setPythonVersion(deploymentStatus.getPythonVersion()); + jobDeploymentStatus.setGitCommitSha(deploymentStatus.getGitCommitSha()); + jobDeploymentStatus.setEnabled(deploymentStatus.getEnabled()); + jobDeploymentStatus.setLastDeployedBy(deploymentStatus.getLastDeployedBy()); + jobDeploymentStatus.setLastDeployedDate( + deploymentStatus.getLastDeployedDate() == null + ? null + : deploymentStatus.getLastDeployedDate().toString()); + jobDeploymentStatus.setResources(getResourcesFromDeployment(deploymentStatus)); + // The ActualDataJobDeployment does not have a mode attribute, which is required by the + // JobDeploymentStatus, + // so we need to set something in order to avoid errors. + jobDeploymentStatus.setMode("release"); + + return jobDeploymentStatus; + } + private static DataJobResources getResourcesFromDeployment(ActualDataJobDeployment deployment) { DataJobResources resources = new DataJobResources(); var deploymentResources = deployment.getResources(); diff --git a/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/graphql/GraphQLDataFetchers.java b/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/graphql/GraphQLDataFetchers.java index c366481982..f203ee9a9c 100644 --- a/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/graphql/GraphQLDataFetchers.java +++ b/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/graphql/GraphQLDataFetchers.java @@ -5,8 +5,12 @@ package com.vmware.taurus.service.graphql; +import com.vmware.taurus.datajobs.DeploymentModelConverter; import com.vmware.taurus.datajobs.ToApiModelConverter; +import com.vmware.taurus.service.deploy.DeploymentServiceV2; import com.vmware.taurus.service.repository.JobsRepository; +import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig; +import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig.ReadFrom; import com.vmware.taurus.service.deploy.DeploymentService; import com.vmware.taurus.service.graphql.model.Criteria; import com.vmware.taurus.service.graphql.model.DataJobPage; @@ -26,13 +30,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; +import java.util.*; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -50,6 +48,8 @@ public class GraphQLDataFetchers { private final JobsRepository jobsRepository; private final DeploymentService deploymentService; private final ExecutionDataFetcher executionDataFetcher; + private final DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig; + private final DeploymentServiceV2 deploymentServiceV2; public DataFetcher findAllAndBuildDataJobPage() { return dataFetchingEnvironment -> { @@ -206,9 +206,15 @@ private Predicate computeSearch( private List populateDeployments( List allDataJob, Map dataJobs) { - Map deploymentStatuses = - deploymentService.readDeployments().stream() - .collect(Collectors.toMap(JobDeploymentStatus::getDataJobName, cronJob -> cronJob)); + Map deploymentStatuses; + + if (dataJobDeploymentPropertiesConfig.getReadDataSource().equals(ReadFrom.DB)) { + deploymentStatuses = readJobDeploymentsFromDb(); + } else if (dataJobDeploymentPropertiesConfig.getReadDataSource().equals(ReadFrom.K8S)) { + deploymentStatuses = readJobDeploymentsFromK8s(); + } else { + deploymentStatuses = Collections.emptyMap(); + } allDataJob.forEach( dataJob -> { @@ -227,6 +233,19 @@ private List populateDeployments( return allDataJob; } + private Map readJobDeploymentsFromK8s() { + return deploymentService.readDeployments().stream() + .collect(Collectors.toMap(JobDeploymentStatus::getDataJobName, cronJob -> cronJob)); + } + + private Map readJobDeploymentsFromDb() { + return deploymentServiceV2.findAllActualDataJobDeployments().entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> DeploymentModelConverter.toJobDeploymentStatus(entry.getValue()))); + } + private static DataJobPage buildResponse(int pageSize, int count, List pageList) { return DataJobPage.builder() diff --git a/projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/graphql/GraphQLDataFetchersTest.java b/projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/graphql/GraphQLDataFetchersTest.java index bcac27b8a6..efc41d3c92 100644 --- a/projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/graphql/GraphQLDataFetchersTest.java +++ b/projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/graphql/GraphQLDataFetchersTest.java @@ -6,6 +6,10 @@ package com.vmware.taurus.service.graphql; import com.vmware.taurus.controlplane.model.data.DataJobExecution; +import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig; +import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig.ReadFrom; +import com.vmware.taurus.service.deploy.DeploymentServiceV2; +import com.vmware.taurus.service.model.*; import com.vmware.taurus.service.repository.JobsRepository; import com.vmware.taurus.service.deploy.DeploymentService; import com.vmware.taurus.service.graphql.model.Filter; @@ -22,11 +26,7 @@ import com.vmware.taurus.service.graphql.strategy.datajob.JobFieldStrategyByScheduleCron; import com.vmware.taurus.service.graphql.strategy.datajob.JobFieldStrategyBySourceUrl; import com.vmware.taurus.service.graphql.strategy.datajob.JobFieldStrategyByTeam; -import com.vmware.taurus.service.model.DataJob; import com.vmware.taurus.service.graphql.model.DataJobPage; -import com.vmware.taurus.service.model.ExecutionStatus; -import com.vmware.taurus.service.model.JobConfig; -import com.vmware.taurus.service.model.JobDeploymentStatus; import graphql.GraphQLException; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; @@ -47,6 +47,7 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -68,6 +69,10 @@ class GraphQLDataFetchersTest { @Mock private DataFetchingFieldSelectionSet dataFetchingFieldSelectionSet; + @Mock private DeploymentServiceV2 deploymentServiceV2; + + @Mock private DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig; + private DataFetcher findDataJobs; @BeforeEach @@ -76,12 +81,18 @@ public void before() { new JobFieldStrategyFactory(collectSupportedFieldStrategies()); GraphQLDataFetchers graphQLDataFetchers = new GraphQLDataFetchers( - strategyFactory, jobsRepository, deploymentService, executionDataFetcher); + strategyFactory, + jobsRepository, + deploymentService, + executionDataFetcher, + dataJobDeploymentPropertiesConfig, + deploymentServiceV2); findDataJobs = graphQLDataFetchers.findAllAndBuildDataJobPage(); } @Test void testDataFetcherOfJobs_whenGettingFullList_shouldReturnAllDataJobs() throws Exception { + when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S); when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1); when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(10); when(jobsRepository.findAll()).thenReturn(mockListOfDataJobs()); @@ -99,6 +110,7 @@ void testDataFetcherOfJobs_whenGettingFullList_shouldReturnAllDataJobs() throws @Test void testDataFetcherOfJobs_whenGettingPagedResult_shouldReturnPagedJobs() throws Exception { + when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S); when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(2); when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(2); when(jobsRepository.findAll()).thenReturn(mockListOfDataJobs()); @@ -136,6 +148,7 @@ void testDataFetcherOfJobs_whenSupportedFieldProvidedWithSorting_shouldReturnJob @Test void testDataFetcherOfJobs_whenSearchingSpecificJob_shouldReturnSearchedJob() throws Exception { + when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S); when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1); when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(10); when(dataFetchingEnvironment.getArgument("search")).thenReturn("sample-job-2"); @@ -156,6 +169,7 @@ void testDataFetcherOfJobs_whenSearchingSpecificJob_shouldReturnSearchedJob() th @Test void testDataFetcherOfJobs_whenSearchingByPattern_shouldReturnMatchingJobs() throws Exception { + when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S); when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1); when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(10); when(dataFetchingEnvironment.getArgument("search")).thenReturn("sample-job-2"); @@ -213,6 +227,7 @@ void testDataFetcherOfJobs_whenValidPageNumberIsProvided_shouldNotThrowException @Test void testPopulateDeployments() throws Exception { + when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S); when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution()); when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments()); when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1); @@ -238,8 +253,39 @@ void testPopulateDeployments() throws Exception { assertThat(job2.getDeployments().get(0).getLastExecutionDuration()).isNull(); } + @Test + void testPopulateDeployments_readFromDB() throws Exception { + when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.DB); + when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution()); + when(deploymentServiceV2.findAllActualDataJobDeployments()) + .thenReturn(mockMapOfActualJobDeployments()); + when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1); + when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(100); + when(dataFetchingEnvironment.getSelectionSet()).thenReturn(dataFetchingFieldSelectionSet); + when(dataFetchingFieldSelectionSet.contains(JobFieldStrategyBy.DEPLOYMENT.getPath())) + .thenReturn(true); + + DataJobPage dataJobPage = (DataJobPage) findDataJobs.get(dataFetchingEnvironment); + + assertThat(dataJobPage.getContent()).hasSize(5); + var job2 = (V2DataJob) dataJobPage.getContent().get(1); + assertThat(job2.getDeployments()).hasSize(1); + assertThat(job2.getDeployments().get(0).getLastExecutionStatus()).isNull(); + assertThat(job2.getDeployments().get(0).getLastExecutionTime()).isNull(); + assertThat(job2.getDeployments().get(0).getLastExecutionDuration()).isNull(); + assertThat(job2.getDeployments().get(0).getJobPythonVersion()).isEqualTo("3.8-secure"); + var job4 = (V2DataJob) dataJobPage.getContent().get(3); + assertThat(job4.getDeployments()).hasSize(1); + assertThat(job4.getDeployments().get(0).getLastExecutionStatus()) + .isEqualTo(DataJobExecution.StatusEnum.SUCCEEDED); + assertThat(job4.getDeployments().get(0).getLastExecutionTime()).isNull(); + assertThat(job4.getDeployments().get(0).getLastExecutionDuration()).isEqualTo(0); + assertThat(job4.getDeployments().get(0).getJobPythonVersion()).isEqualTo("3.9-secure"); + } + @Test void testFilterByLastExecutionStatus() throws Exception { + when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S); when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution()); when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments()); when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1); @@ -263,6 +309,7 @@ void testFilterByLastExecutionStatus() throws Exception { @Test void testSortingByLastExecutionStatus() throws Exception { + when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S); when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution()); when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments()); when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1); @@ -296,6 +343,7 @@ void testSortingByLastExecutionStatus() throws Exception { @Test void testSortingByLastExecutionTime() throws Exception { + when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S); when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution()); when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments()); when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1); @@ -327,6 +375,7 @@ void testSortingByLastExecutionTime() throws Exception { @Test void testSortingByLastExecutionDuration() throws Exception { + when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S); when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution()); when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments()); when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1); @@ -376,6 +425,15 @@ private List mockListOfDataJobs() { return dataJobs; } + private Map mockMapOfActualJobDeployments() { + return Map.of( + "sample-job-1", mockSampleActualJobDeployment("sample-job-1", true, "3.8-secure"), + "sample-job-2", mockSampleActualJobDeployment("sample-job-2", false, "3.8-secure"), + "sample-job-3", mockSampleActualJobDeployment("sample-job-3", true, "3.9-secure"), + "sample-job-4", mockSampleActualJobDeployment("sample-job-4", false, "3.9-secure"), + "sample-job-5", mockSampleActualJobDeployment("sample-job-5", true, "3.9-secure")); + } + private List mockListOfDataJobsWithLastExecution() { List dataJobs = new ArrayList<>(); @@ -444,6 +502,25 @@ private JobDeploymentStatus mockSampleDeployment(String jobName, boolean enabled return status; } + private ActualDataJobDeployment mockSampleActualJobDeployment( + String jobName, boolean enabled, String pythonVersion) { + ActualDataJobDeployment actualJobDeployment = new ActualDataJobDeployment(); + actualJobDeployment.setDataJobName(jobName); + actualJobDeployment.setEnabled(enabled); + actualJobDeployment.setPythonVersion(pythonVersion); + actualJobDeployment.setLastDeployedDate( + OffsetDateTime.of(2023, 10, 25, 16, 30, 42, 42, ZoneOffset.UTC)); + + DataJobDeploymentResources resources = new DataJobDeploymentResources(); + resources.setCpuLimitCores(1f); + resources.setCpuRequestCores(1f); + resources.setMemoryLimitMi(100); + resources.setMemoryRequestMi(100); + + actualJobDeployment.setResources(resources); + return actualJobDeployment; + } + static ArrayList> constructFilter(Filter... filters) { ArrayList> rawFilters = new ArrayList<>(); Arrays.stream(filters)