Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control-service: move cron jobs methods to the data jobs class #2291

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -719,45 +719,6 @@ public Set<String> listCronJobs() throws ApiException {
.collect(Collectors.toSet());
}

public void createCronJob(
String name,
String image,
String schedule,
boolean enable,
V1Container jobContainer,
V1Container initContainer,
List<V1Volume> volumes,
Map<String, String> jobAnnotations,
Map<String, String> jobLabels,
List<String> imagePullSecrets)
throws ApiException {
if (getK8sSupportsV1CronJob()) {
createV1CronJob(
name,
image,
schedule,
enable,
jobContainer,
initContainer,
volumes,
jobAnnotations,
jobLabels,
imagePullSecrets);
} else {
createV1beta1CronJob(
name,
image,
schedule,
enable,
jobContainer,
initContainer,
volumes,
jobAnnotations,
jobLabels,
imagePullSecrets);
}
}

// TODO: container/volume args are breaking a bit abstraction of KubernetesService by leaking
// impl. details
public void createV1beta1CronJob(
Expand Down Expand Up @@ -832,45 +793,6 @@ public void createV1CronJob(
nsJob.getMetadata().getSelfLink());
}

public void updateCronJob(
String name,
String image,
String schedule,
boolean enable,
V1Container jobContainer,
V1Container initContainer,
List<V1Volume> volumes,
Map<String, String> jobAnnotations,
Map<String, String> jobLabels,
List<String> imagePullSecrets)
throws ApiException {
if (getK8sSupportsV1CronJob()) {
updateV1CronJob(
name,
image,
schedule,
enable,
jobContainer,
initContainer,
volumes,
jobAnnotations,
jobLabels,
imagePullSecrets);
} else {
updateV1beta1CronJob(
name,
image,
schedule,
enable,
jobContainer,
initContainer,
volumes,
jobAnnotations,
jobLabels,
imagePullSecrets);
}
}

public void updateV1beta1CronJob(
String name,
String image,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
import com.vmware.taurus.service.KubernetesService;
import com.vmware.taurus.service.deploy.JobCommandProvider;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.BatchV1Api;
import io.kubernetes.client.openapi.apis.BatchV1beta1Api;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1Volume;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;

/**
* Kubernetes service used for serving data jobs deployments. All deployed data jobs are executed in
* this environment and all other necessary resources that are used only during the execution of a
Expand All @@ -40,4 +46,82 @@ public DataJobsKubernetesService(
batchV1beta1Api,
jobCommandProvider);
}

public void createCronJob(
String name,
String image,
String schedule,
boolean enable,
V1Container jobContainer,
V1Container initContainer,
List<V1Volume> volumes,
Map<String, String> jobAnnotations,
Map<String, String> jobLabels,
List<String> imagePullSecrets)
throws ApiException {
if (getK8sSupportsV1CronJob()) {
createV1CronJob(
name,
image,
schedule,
enable,
jobContainer,
initContainer,
volumes,
jobAnnotations,
jobLabels,
imagePullSecrets);
} else {
createV1beta1CronJob(
name,
image,
schedule,
enable,
jobContainer,
initContainer,
volumes,
jobAnnotations,
jobLabels,
imagePullSecrets);
}
}

public void updateCronJob(
String name,
String image,
String schedule,
boolean enable,
V1Container jobContainer,
V1Container initContainer,
List<V1Volume> volumes,
Map<String, String> jobAnnotations,
Map<String, String> jobLabels,
List<String> imagePullSecrets)
throws ApiException {
if (getK8sSupportsV1CronJob()) {
updateV1CronJob(
name,
image,
schedule,
enable,
jobContainer,
initContainer,
volumes,
jobAnnotations,
jobLabels,
imagePullSecrets);
} else {
updateV1beta1CronJob(
name,
image,
schedule,
enable,
jobContainer,
initContainer,
volumes,
jobAnnotations,
jobLabels,
imagePullSecrets);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class MockKubernetes {
public DataJobsKubernetesService mockDataJobsKubernetesService()
throws ApiException, IOException, InterruptedException {
DataJobsKubernetesService mock = mock(DataJobsKubernetesService.class);
mockKubernetesService(mock);
mockDataJobsKubernetesService(mock);
return mock;
}

Expand All @@ -71,13 +71,12 @@ public TaskExecutor taskExecutor() {
* <p>NOTES: If job name starts with 'failure-' (e.g failure-my-job) - then Job status will be
* fail otherwise it's success.
*/
private void mockKubernetesService(KubernetesService mock)
private void mockDataJobsKubernetesService(DataJobsKubernetesService mock)
throws ApiException, IOException, InterruptedException {
// By defautl beans are singleton scoped so we are sure this will be called once
// hence it's safe to keep the variables here isntead of static.
final Map<String, Map<String, byte[]>> secrets = new ConcurrentHashMap<>();
final Map<String, InvocationOnMock> crons = new ConcurrentHashMap<>();
final Map<String, InvocationOnMock> jobs = new ConcurrentHashMap<>();

when(mock.getSecretData(any()))
.thenAnswer(inv -> secrets.getOrDefault(inv.getArgument(0), Collections.emptyMap()));
Expand Down Expand Up @@ -139,7 +138,13 @@ private void mockKubernetesService(KubernetesService mock)
})
.when(mock)
.readCronJob(anyString());
mockKubernetesService(mock);
}

private void mockKubernetesService(KubernetesService mock)
throws ApiException, IOException, InterruptedException {

final Map<String, InvocationOnMock> jobs = new ConcurrentHashMap<>();
doAnswer(inv -> jobs.put(inv.getArgument(0), inv))
.when(mock)
.createJob(
Expand Down