Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control-service: enable scheduled execution for data jobs' synchronizer #2771

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ spec:
value: "{{ .Values.dataJob.deployment.configuration.persistence.writeTos }}"
- name: DATAJOBS_DEPLOYMENT_CONFIGURATION_PERSISTENCE_READ_DATA_SOURCE
value: "{{ .Values.dataJob.deployment.configuration.persistence.readDataSource }}"
- name: DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_ENABLED
value: "{{ .Values.dataJob.deployment.configuration.synchronizationTask.enabled }}"
- name: DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_INTERVAL
value: "{{ .Values.dataJob.deployment.configuration.synchronizationTask.interval }}"
- name: DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_INITIAL_DELAY
value: "{{ .Values.dataJob.deployment.configuration.synchronizationTask.initialDelay }}"
- name: KRB5_CONFIG
value: "/etc/secrets/krb5.conf"
- name: VDK_OPTIONS_INI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,11 +1050,37 @@ dataJob:
persistence:
# Case sensitive CSV list of write to's sources. Default is "DB,K8S" The control service
# will write data job properties to all sources present in this list.
# See the synchronizationTask for more information.
writeTos: "DB,K8S"
# Case-sensitive variable to select the truth data source for reading properties.
# Options are "DB" for database and "K8S" for kubernetes. The control service will read
# properties from the specified source.
readDataSource: "K8S"
# Synchronization Task Behavior:
# Configure the behavior of synchronization task based on the following configurations:

# Case 1: If persistence.writeTo = "DB" and synchronizationTask.enabled = true:
# The Deployment API will write data to the database, and the synchronization task
# will sync the data jobs in Kubernetes.

# Case 2: If persistence.writeTo = "DB" and synchronizationTask.enabled = false:
# The Deployment API will update the database, but the synchronization task will be disabled,
# resulting in no updates in Kubernetes.

# Case 3: If persistence.writeTo = "K8s" and synchronizationTask.enabled = true:
# The Deployment API will update data jobs in Kubernetes, and the synchronization task may override some of them.

# Case 4: If persistence.writeTo = "K8s" and synchronizationTask.enabled = false:
# The Deployment API will update data jobs in Kubernetes, but the synchronization tasks will be disabled,
# preventing any further synchronization with Kubernetes.
synchronizationTask:
enabled: false
# The data job deployments' synchronization task interval is the time period (expressed in milliseconds)
# after a synchronization process has completed and before a new one is started
interval: 1000
# The data job deployments' synchronization task initial delay is the period (expressed in milliseconds) between control service
# start and the first time a synchronization process is started by the control service instance
initialDelay: 10000
jobImagePullPolicy: "IfNotPresent"
initContainer:
## Resources set on Data Job initContainer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.model.DesiredDataJobDeployment;
import io.kubernetes.client.openapi.ApiException;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

Expand All @@ -31,14 +34,19 @@
* initiate the synchronization process.
*/
@Slf4j
@AllArgsConstructor
@RequiredArgsConstructor
@Component
public class DataJobsSynchronizer {

private final JobsService jobsService;

private final DeploymentServiceV2 deploymentService;

private final DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig;

@Value("${datajobs.deployment.configuration.synchronization.task.enabled:false}")
private boolean synchronizationEnabled;

/**
* Synchronizes Kubernetes CronJobs from the database to ensure that the cluster's state matches
* the desired state defined in the database records.
Expand All @@ -47,7 +55,26 @@ public class DataJobsSynchronizer {
* of Kubernetes CronJobs in the cluster, and takes the necessary actions to synchronize them.
* This can include creating new CronJobs, updating existing CronJobs, etc.
*/
@Scheduled(
fixedDelayString = "${datajobs.deployment.configuration.synchronization.task.interval:1000}",
initialDelayString =
"${datajobs.deployment.configuration.synchronization.task.initial.delay:10000}")
@SchedulerLock(name = "synchronizeDataJobsTask")
public void synchronizeDataJobs() {
if (!synchronizationEnabled) {
log.debug("Skipping the synchronization of data job deployments since it is disabled.");
return;
}

if (!dataJobDeploymentPropertiesConfig
.getWriteTos()
.contains(DataJobDeploymentPropertiesConfig.WriteTo.DB)) {
log.debug(
"Skipping data job deployments' synchronization due to the disabled writes to the"
+ " database.");
return;
}

ThreadPoolTaskExecutor taskExecutor = initializeTaskExecutor();
Iterable<DataJob> dataJobsFromDB = jobsService.findAllDataJobs();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ datajobs.deployment.configuration.persistence.writeTos=${DATAJOBS_DEPLOYMENT_CON
# "K8S" for kubernetes.
datajobs.deployment.configuration.persistence.readDataSource=${DATAJOBS_DEPLOYMENT_CONFIGURATION_PERSISTENCE_READ_DATA_SOURCE:"K8S"}

# The data job deployments' synchronization task enabled
datajobs.deployment.configuration.synchronization.task.enabled=${DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_ENABLED:false}

# The data job deployments' synchronization task interval is the time period (expressed in milliseconds)
# after a synchronization process has completed and before a new one is started
datajobs.deployment.configuration.synchronization.task.interval=${DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_INTERVAL:1000}
# The data job deployments' synchronization task initial delay is the period (expressed in milliseconds) between control service
# start and the first time a synchronization process is started by the control service instance
datajobs.deployment.configuration.synchronization.task.initial.delay=${DATAJOBS_DEPLOYMENT_CONFIGURATION_SYNCHRONIZATION_TASK_INTERVAL_DELAY:10000}

# The JSON Web Key Set (JWKS) is a set of keys which contains the public keys
# used to verify any JSON Web Token (JWT) issued by the authorization server
# It is required.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.util.ReflectionTestUtils;

import java.util.Collections;
import java.util.Set;

@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK, classes = ServiceApp.class)
Expand All @@ -25,10 +27,14 @@ public class DataJobsSynchronizerTest {

@MockBean private DeploymentServiceV2 deploymentService;

@MockBean private DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig;

@Test
void
synchronizeDataJobs_loadDeploymentNamesFromKubernetesReturnsValue_shouldFinishSynchronization()
throws ApiException {
enableSynchronizationProcess();

Mockito.when(deploymentService.findAllActualDeploymentNamesFromKubernetes())
.thenReturn(Collections.emptySet());

Expand All @@ -44,6 +50,8 @@ public class DataJobsSynchronizerTest {
void
synchronizeDataJobs_loadDeploymentNamesFromKubernetesThrowsApiException_shouldSkipSynchronization()
throws ApiException {
enableSynchronizationProcess();

Mockito.when(deploymentService.findAllActualDeploymentNamesFromKubernetes())
.thenThrow(new ApiException());

Expand All @@ -54,4 +62,62 @@ public class DataJobsSynchronizerTest {

Mockito.verify(deploymentService, Mockito.times(0)).findAllActualDataJobDeployments();
}

@Test
void synchronizeDataJobs_synchronizationEnabledFalseAndWriteToDbTrue_shouldSkipSynchronization()
throws ApiException {
initSynchronizationProcessConfig(false, true);

dataJobsSynchronizer.synchronizeDataJobs();

Mockito.verify(deploymentService, Mockito.times(0))
.findAllActualDeploymentNamesFromKubernetes();
}

@Test
void synchronizeDataJobs_synchronizationEnabledFalseAndWriteToDbFalse_shouldSkipSynchronization()
throws ApiException {
initSynchronizationProcessConfig(false, false);

dataJobsSynchronizer.synchronizeDataJobs();

Mockito.verify(deploymentService, Mockito.times(0))
.findAllActualDeploymentNamesFromKubernetes();
}

@Test
void synchronizeDataJobs_synchronizationEnabledTrueAndWriteToDbTrue_shouldFinishSynchronization()
throws ApiException {
initSynchronizationProcessConfig(true, true);

dataJobsSynchronizer.synchronizeDataJobs();

Mockito.verify(deploymentService, Mockito.times(1))
.findAllActualDeploymentNamesFromKubernetes();
}

@Test
void synchronizeDataJobs_synchronizationEnabledTrueAndWriteToDbFalse_shouldSkipSynchronization()
throws ApiException {
initSynchronizationProcessConfig(true, false);

dataJobsSynchronizer.synchronizeDataJobs();

Mockito.verify(deploymentService, Mockito.times(0))
.findAllActualDeploymentNamesFromKubernetes();
}

void enableSynchronizationProcess() {
initSynchronizationProcessConfig(true, true);
}

void initSynchronizationProcessConfig(boolean synchronizationEnabled, boolean writeToDB) {
ReflectionTestUtils.setField(
dataJobsSynchronizer, "synchronizationEnabled", synchronizationEnabled);
Mockito.when(dataJobDeploymentPropertiesConfig.getWriteTos())
.thenReturn(
writeToDB
? Set.of(DataJobDeploymentPropertiesConfig.WriteTo.DB)
: Collections.emptySet());
}
}