Skip to content

Commit

Permalink
control-service: enable scheduled execution for data jobs' synchronizer
Browse files Browse the repository at this point in the history
Why
Currently, we don't have the ability to schedule the deployment synchronization process.

What
We have scheduled the process. Only one CS instance at a time can run this process.

Testing done
Unit tests

Signed-off-by: Miroslav Ivanov [email protected]
  • Loading branch information
mivanov1988 committed Oct 9, 2023
1 parent 0823610 commit c134a3e
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,14 @@ dataJob:
# Options are "DB" for database and "K8S" for kubernetes. The control service will read
# properties from the specified source.
readDataSource: "K8S"
synchronizationTask:
enabled: false
# The data job deployments' synchronization task interval is the time period (expressed in milliseconds)
# after a synchronization process has completed and before a new one is started
interval: 1000
# The data job deployments' synchronization task initial delay is the period (expressed in milliseconds) between control service
# start and the first time a synchronization process is started by the control service instance
initialDelay: 10000
jobImagePullPolicy: "IfNotPresent"
initContainer:
## Resources set on Data Job initContainer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.model.DesiredDataJobDeployment;
import io.kubernetes.client.openapi.ApiException;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

Expand All @@ -31,14 +34,19 @@
* initiate the synchronization process.
*/
@Slf4j
@AllArgsConstructor
@RequiredArgsConstructor
@Component
public class DataJobsSynchronizer {

private final JobsService jobsService;

private final DeploymentServiceV2 deploymentService;

private final DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig;

@Value("${datajobs.deployment.configuration.synchronization.task.enabled:false}")
private boolean synchronizationEnabled;

/**
* Synchronizes Kubernetes CronJobs from the database to ensure that the cluster's state matches
* the desired state defined in the database records.
Expand All @@ -47,7 +55,23 @@ public class DataJobsSynchronizer {
* of Kubernetes CronJobs in the cluster, and takes the necessary actions to synchronize them.
* This can include creating new CronJobs, updating existing CronJobs, etc.
*/
@Scheduled(
fixedDelayString = "${datajobs.deployment.configuration.synchronization.task.interval:1000}",
initialDelayString = "${datajobs.deployment.configuration.synchronization.task.initial.delay:10000}")
@SchedulerLock(name = "synchronizeDataJobsTask")
public void synchronizeDataJobs() {
if (!synchronizationEnabled) {
log.debug("Skipping the synchronization of data job deployments since it is disabled.");
return;
}

if (!dataJobDeploymentPropertiesConfig
.getWriteTos()
.contains(DataJobDeploymentPropertiesConfig.WriteTo.DB)) {
log.debug("Skipping data job deployments' synchronization due to the disabled writes to the database.");
return;
}

ThreadPoolTaskExecutor taskExecutor = initializeTaskExecutor();
Iterable<DataJob> dataJobsFromDB = jobsService.findAllDataJobs();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ datajobs.deployment.configuration.persistence.writeTos=${DATAJOBS_DEPLOYMENT_CON
# "K8S" for kubernetes.
datajobs.deployment.configuration.persistence.readDataSource=${DATAJOBS_DEPLOYMENT_CONFIGURATION_PERSISTENCE_READ_DATA_SOURCE:"K8S"}

# The data job deployments' synchronization task enabled
datajobs.deployment.configuration.synchronization.task.enabled=${DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_ENABLED:false}

# The data job deployments' synchronization task interval is the time period (expressed in milliseconds)
# after a synchronization process has completed and before a new one is started
datajobs.deployment.configuration.synchronization.task.interval=${DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_INTERVAL:1000}
# The data job deployments' synchronization task initial delay is the period (expressed in milliseconds) between control service
# start and the first time a synchronization process is started by the control service instance
datajobs.deployment.configuration.synchronization.task.initial.delay=${DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_INTERVAL_DELAY:10000}

# The JSON Web Key Set (JWKS) is a set of keys which contains the public keys
# used to verify any JSON Web Token (JWT) issued by the authorization server
# It is required.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.util.ReflectionTestUtils;

import java.util.Collections;
import java.util.Set;

@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK, classes = ServiceApp.class)
Expand All @@ -25,10 +27,14 @@ public class DataJobsSynchronizerTest {

@MockBean private DeploymentServiceV2 deploymentService;

@MockBean private DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig;

@Test
void
synchronizeDataJobs_loadDeploymentNamesFromKubernetesReturnsValue_shouldFinishSynchronization()
throws ApiException {
enableSynchronizationProcess();

Mockito.when(deploymentService.findAllActualDeploymentNamesFromKubernetes())
.thenReturn(Collections.emptySet());

Expand All @@ -44,6 +50,8 @@ public class DataJobsSynchronizerTest {
void
synchronizeDataJobs_loadDeploymentNamesFromKubernetesThrowsApiException_shouldSkipSynchronization()
throws ApiException {
enableSynchronizationProcess();

Mockito.when(deploymentService.findAllActualDeploymentNamesFromKubernetes())
.thenThrow(new ApiException());

Expand All @@ -54,4 +62,59 @@ public class DataJobsSynchronizerTest {

Mockito.verify(deploymentService, Mockito.times(0)).findAllActualDataJobDeployments();
}

@Test
void synchronizeDataJobs_synchronizationEnabledFalseAndWriteToDbTrue_shouldSkipSynchronization()
throws ApiException {
initSynchronizationProcessConfig(false, true);

dataJobsSynchronizer.synchronizeDataJobs();

Mockito.verify(deploymentService, Mockito.times(0))
.findAllActualDeploymentNamesFromKubernetes();
}

@Test
void synchronizeDataJobs_synchronizationEnabledFalseAndWriteToDbFalse_shouldSkipSynchronization()
throws ApiException {
initSynchronizationProcessConfig(false, false);

dataJobsSynchronizer.synchronizeDataJobs();

Mockito.verify(deploymentService, Mockito.times(0))
.findAllActualDeploymentNamesFromKubernetes();
}

@Test
void
synchronizeDataJobs_synchronizationEnabledTrueAndWriteToDbTrue_shouldFinishSynchronization()
throws ApiException {
initSynchronizationProcessConfig(true, true);

dataJobsSynchronizer.synchronizeDataJobs();

Mockito.verify(deploymentService, Mockito.times(1))
.findAllActualDeploymentNamesFromKubernetes();
}

@Test
void
synchronizeDataJobs_synchronizationEnabledTrueAndWriteToDbFalse_shouldSkipSynchronization()
throws ApiException {
initSynchronizationProcessConfig(true, false);

dataJobsSynchronizer.synchronizeDataJobs();

Mockito.verify(deploymentService, Mockito.times(0))
.findAllActualDeploymentNamesFromKubernetes();
}

void enableSynchronizationProcess() {
initSynchronizationProcessConfig(true, true);
}

void initSynchronizationProcessConfig(boolean synchronizationEnabled, boolean writeToDB) {
ReflectionTestUtils.setField(dataJobsSynchronizer, "synchronizationEnabled", synchronizationEnabled);
Mockito.when(dataJobDeploymentPropertiesConfig.getWriteTos()).thenReturn(writeToDB ? Set.of(DataJobDeploymentPropertiesConfig.WriteTo.DB) : Collections.emptySet());
}
}

0 comments on commit c134a3e

Please sign in to comment.