diff --git a/projects/control-service/projects/helm_charts/pipelines-control-service/templates/deployment.yaml b/projects/control-service/projects/helm_charts/pipelines-control-service/templates/deployment.yaml index 965cdf3873..650e68616e 100644 --- a/projects/control-service/projects/helm_charts/pipelines-control-service/templates/deployment.yaml +++ b/projects/control-service/projects/helm_charts/pipelines-control-service/templates/deployment.yaml @@ -200,6 +200,12 @@ spec: value: "{{ .Values.dataJob.deployment.configuration.persistence.writeTos }}" - name: DATAJOBS_DEPLOYMENT_CONFIGURATION_PERSISTENCE_READ_DATA_SOURCE value: "{{ .Values.dataJob.deployment.configuration.persistence.readDataSource }}" + - name: DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_ENABLED + value: "{{ .Values.dataJob.deployment.configuration.synchronizationTask.enabled }}" + - name: DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_INTERVAL + value: "{{ .Values.dataJob.deployment.configuration.synchronizationTask.interval }}" + - name: DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_INITIAL_DELAY + value: "{{ .Values.dataJob.deployment.configuration.synchronizationTask.initialDelay }}" - name: KRB5_CONFIG value: "/etc/secrets/krb5.conf" - name: VDK_OPTIONS_INI diff --git a/projects/control-service/projects/helm_charts/pipelines-control-service/values.yaml b/projects/control-service/projects/helm_charts/pipelines-control-service/values.yaml index eae2f956d5..c04e6c53cb 100644 --- a/projects/control-service/projects/helm_charts/pipelines-control-service/values.yaml +++ b/projects/control-service/projects/helm_charts/pipelines-control-service/values.yaml @@ -1050,11 +1050,37 @@ dataJob: persistence: # Case sensitive CSV list of write to's sources. Default is "DB,K8S" The control service # will write data job properties to all sources present in this list. + # See the synchronizationTask for more information. writeTos: "DB,K8S" # Case-sensitive variable to select the truth data source for reading properties. # Options are "DB" for database and "K8S" for kubernetes. The control service will read # properties from the specified source. readDataSource: "K8S" + # Synchronization Task Behavior: + # Configure the behavior of synchronization task based on the following configurations: + + # Case 1: If persistence.writeTo = "DB" and synchronizationTask.enabled = true: + # The Deployment API will write data to the database, and the synchronization task + # will sync the data jobs in Kubernetes. + + # Case 2: If persistence.writeTo = "DB" and synchronizationTask.enabled = false: + # The Deployment API will update the database, but the synchronization task will be disabled, + # resulting in no updates in Kubernetes. + + # Case 3: If persistence.writeTo = "K8s" and synchronizationTask.enabled = true: + # The Deployment API will update data jobs in Kubernetes, and the synchronization task may override some of them. + + # Case 4: If persistence.writeTo = "K8s" and synchronizationTask.enabled = false: + # The Deployment API will update data jobs in Kubernetes, but the synchronization tasks will be disabled, + # preventing any further synchronization with Kubernetes. + synchronizationTask: + enabled: false + # The data job deployments' synchronization task interval is the time period (expressed in milliseconds) + # after a synchronization process has completed and before a new one is started + interval: 1000 + # The data job deployments' synchronization task initial delay is the period (expressed in milliseconds) between control service + # start and the first time a synchronization process is started by the control service instance + initialDelay: 10000 jobImagePullPolicy: "IfNotPresent" initContainer: ## Resources set on Data Job initContainer diff --git a/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/deploy/DataJobsSynchronizer.java b/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/deploy/DataJobsSynchronizer.java index f083734149..e0759642d0 100644 --- a/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/deploy/DataJobsSynchronizer.java +++ b/projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/deploy/DataJobsSynchronizer.java @@ -11,8 +11,11 @@ import com.vmware.taurus.service.model.DataJob; import com.vmware.taurus.service.model.DesiredDataJobDeployment; import io.kubernetes.client.openapi.ApiException; -import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import net.javacrumbs.shedlock.spring.annotation.SchedulerLock; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @@ -31,7 +34,7 @@ * initiate the synchronization process. */ @Slf4j -@AllArgsConstructor +@RequiredArgsConstructor @Component public class DataJobsSynchronizer { @@ -39,6 +42,11 @@ public class DataJobsSynchronizer { private final DeploymentServiceV2 deploymentService; + private final DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig; + + @Value("${datajobs.deployment.configuration.synchronization.task.enabled:false}") + private boolean synchronizationEnabled; + /** * Synchronizes Kubernetes CronJobs from the database to ensure that the cluster's state matches * the desired state defined in the database records. @@ -47,7 +55,26 @@ public class DataJobsSynchronizer { * of Kubernetes CronJobs in the cluster, and takes the necessary actions to synchronize them. * This can include creating new CronJobs, updating existing CronJobs, etc. */ + @Scheduled( + fixedDelayString = "${datajobs.deployment.configuration.synchronization.task.interval:1000}", + initialDelayString = + "${datajobs.deployment.configuration.synchronization.task.initial.delay:10000}") + @SchedulerLock(name = "synchronizeDataJobsTask") public void synchronizeDataJobs() { + if (!synchronizationEnabled) { + log.debug("Skipping the synchronization of data job deployments since it is disabled."); + return; + } + + if (!dataJobDeploymentPropertiesConfig + .getWriteTos() + .contains(DataJobDeploymentPropertiesConfig.WriteTo.DB)) { + log.debug( + "Skipping data job deployments' synchronization due to the disabled writes to the" + + " database."); + return; + } + ThreadPoolTaskExecutor taskExecutor = initializeTaskExecutor(); Iterable dataJobsFromDB = jobsService.findAllDataJobs(); diff --git a/projects/control-service/projects/pipelines_control_service/src/main/resources/application.properties b/projects/control-service/projects/pipelines_control_service/src/main/resources/application.properties index 1db742f4b1..c7b004ddaf 100644 --- a/projects/control-service/projects/pipelines_control_service/src/main/resources/application.properties +++ b/projects/control-service/projects/pipelines_control_service/src/main/resources/application.properties @@ -61,6 +61,16 @@ datajobs.deployment.configuration.persistence.writeTos=${DATAJOBS_DEPLOYMENT_CON # "K8S" for kubernetes. datajobs.deployment.configuration.persistence.readDataSource=${DATAJOBS_DEPLOYMENT_CONFIGURATION_PERSISTENCE_READ_DATA_SOURCE:"K8S"} +# The data job deployments' synchronization task enabled +datajobs.deployment.configuration.synchronization.task.enabled=${DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_ENABLED:false} + +# The data job deployments' synchronization task interval is the time period (expressed in milliseconds) +# after a synchronization process has completed and before a new one is started +datajobs.deployment.configuration.synchronization.task.interval=${DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_INTERVAL:1000} +# The data job deployments' synchronization task initial delay is the period (expressed in milliseconds) between control service +# start and the first time a synchronization process is started by the control service instance +datajobs.deployment.configuration.synchronization.task.initial.delay=${DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_INTERVAL_DELAY:10000} + # The JSON Web Key Set (JWKS) is a set of keys which contains the public keys # used to verify any JSON Web Token (JWT) issued by the authorization server # It is required. diff --git a/projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/deploy/DataJobsSynchronizerTest.java b/projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/deploy/DataJobsSynchronizerTest.java index ea160609dd..04882bc8b5 100644 --- a/projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/deploy/DataJobsSynchronizerTest.java +++ b/projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/deploy/DataJobsSynchronizerTest.java @@ -14,8 +14,10 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.test.util.ReflectionTestUtils; import java.util.Collections; +import java.util.Set; @ExtendWith(SpringExtension.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK, classes = ServiceApp.class) @@ -25,10 +27,14 @@ public class DataJobsSynchronizerTest { @MockBean private DeploymentServiceV2 deploymentService; + @MockBean private DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig; + @Test void synchronizeDataJobs_loadDeploymentNamesFromKubernetesReturnsValue_shouldFinishSynchronization() throws ApiException { + enableSynchronizationProcess(); + Mockito.when(deploymentService.findAllActualDeploymentNamesFromKubernetes()) .thenReturn(Collections.emptySet()); @@ -44,6 +50,8 @@ public class DataJobsSynchronizerTest { void synchronizeDataJobs_loadDeploymentNamesFromKubernetesThrowsApiException_shouldSkipSynchronization() throws ApiException { + enableSynchronizationProcess(); + Mockito.when(deploymentService.findAllActualDeploymentNamesFromKubernetes()) .thenThrow(new ApiException()); @@ -54,4 +62,62 @@ public class DataJobsSynchronizerTest { Mockito.verify(deploymentService, Mockito.times(0)).findAllActualDataJobDeployments(); } + + @Test + void synchronizeDataJobs_synchronizationEnabledFalseAndWriteToDbTrue_shouldSkipSynchronization() + throws ApiException { + initSynchronizationProcessConfig(false, true); + + dataJobsSynchronizer.synchronizeDataJobs(); + + Mockito.verify(deploymentService, Mockito.times(0)) + .findAllActualDeploymentNamesFromKubernetes(); + } + + @Test + void synchronizeDataJobs_synchronizationEnabledFalseAndWriteToDbFalse_shouldSkipSynchronization() + throws ApiException { + initSynchronizationProcessConfig(false, false); + + dataJobsSynchronizer.synchronizeDataJobs(); + + Mockito.verify(deploymentService, Mockito.times(0)) + .findAllActualDeploymentNamesFromKubernetes(); + } + + @Test + void synchronizeDataJobs_synchronizationEnabledTrueAndWriteToDbTrue_shouldFinishSynchronization() + throws ApiException { + initSynchronizationProcessConfig(true, true); + + dataJobsSynchronizer.synchronizeDataJobs(); + + Mockito.verify(deploymentService, Mockito.times(1)) + .findAllActualDeploymentNamesFromKubernetes(); + } + + @Test + void synchronizeDataJobs_synchronizationEnabledTrueAndWriteToDbFalse_shouldSkipSynchronization() + throws ApiException { + initSynchronizationProcessConfig(true, false); + + dataJobsSynchronizer.synchronizeDataJobs(); + + Mockito.verify(deploymentService, Mockito.times(0)) + .findAllActualDeploymentNamesFromKubernetes(); + } + + void enableSynchronizationProcess() { + initSynchronizationProcessConfig(true, true); + } + + void initSynchronizationProcessConfig(boolean synchronizationEnabled, boolean writeToDB) { + ReflectionTestUtils.setField( + dataJobsSynchronizer, "synchronizationEnabled", synchronizationEnabled); + Mockito.when(dataJobDeploymentPropertiesConfig.getWriteTos()) + .thenReturn( + writeToDB + ? Set.of(DataJobDeploymentPropertiesConfig.WriteTo.DB) + : Collections.emptySet()); + } }