Skip to content

Commit

Permalink
control-service: Cronjob API backwards compatibility (#1580)
Browse files Browse the repository at this point in the history
Currently there is no backwards compatibility support when a user
switches to the V1 Kubernetes Cronjob API (from the currently default
V1beta1). This results in situations, where if there are deployed data
jobs which use the v1beta1 API, and a switch is made to the V1 API,
these jobs are suddenly shown as `NOT DEPLOYED`, and cannot be properly
managed.

This change adds backwards compatibility support in the Kubernetes
Service, to allow for a switch to V1 Cronjob API in clusters, where
V1beta1 API cronjobs are deployed.

Testing Done: Unit and Integration tests (new and existing).

Signed-off-by: Andon Andonov <[email protected]>

---------

Signed-off-by: Andon Andonov <[email protected]>
Co-authored-by: github-actions <>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
doks5 and pre-commit-ci[bot] authored Feb 22, 2023
1 parent 72a6098 commit e538bc7
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.web.servlet.MvcResult;

import java.time.format.DateTimeFormatter;
Expand All @@ -42,6 +43,10 @@
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*;

@Import({DataJobDeploymentCrudIT.TaskExecutorConfig.class})
@TestPropertySource(
properties = {
"datajobs.control.k8s.k8sSupportsV1CronJob=true",
})
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = ControlplaneApplication.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.PodLogs;
import io.kubernetes.client.openapi.apis.BatchV1Api;
import io.kubernetes.client.openapi.apis.BatchV1beta1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.apis.VersionApi;
import io.kubernetes.client.openapi.apis.*;
import io.kubernetes.client.custom.IntOrString;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.openapi.models.*;
Expand Down Expand Up @@ -61,6 +58,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.function.Predicate.not;

Expand All @@ -87,6 +85,7 @@ public abstract class KubernetesService implements InitializingBean {
public static final String LABEL_PREFIX = "com.vmware.taurus";
private static final int WATCH_JOBS_TIMEOUT_SECONDS = 300;
private static final String K8S_DATA_JOB_TEMPLATE_RESOURCE = "k8s-data-job-template.yaml";
private static final String V1_K8S_DATA_JOB_TEMPLATE_RESOURCE = "v1-k8s-data-job-template.yaml";

private static int fromInteger(Integer value) {
return Optional.ofNullable(value).orElse(0);
Expand Down Expand Up @@ -242,7 +241,7 @@ public void afterPropertiesSet() throws Exception {
// Step 1 - load the internal datajob template in order to validate it.
try {
if (getK8sSupportsV1CronJob()) {
loadV1CronjobTemplate(new ClassPathResource(K8S_DATA_JOB_TEMPLATE_RESOURCE).getFile());
loadV1CronjobTemplate(new ClassPathResource(V1_K8S_DATA_JOB_TEMPLATE_RESOURCE).getFile());
} else {
loadV1beta1CronjobTemplate(new ClassPathResource(K8S_DATA_JOB_TEMPLATE_RESOURCE).getFile());
}
Expand Down Expand Up @@ -315,7 +314,8 @@ private V1beta1CronJob loadInternalV1beta1CronjobTemplate() {

private V1CronJob loadInternalV1CronjobTemplate() {
try {
return loadV1CronjobTemplate(new ClassPathResource(K8S_DATA_JOB_TEMPLATE_RESOURCE).getFile());
return loadV1CronjobTemplate(
new ClassPathResource(V1_K8S_DATA_JOB_TEMPLATE_RESOURCE).getFile());
} catch (Exception e) {
// This should never happen unless we are testing locally and we've messed up
// with the internal template resource file.
Expand Down Expand Up @@ -449,15 +449,26 @@ public Set<String> listJobs() throws ApiException {
return set;
}

/**
* Reads the deployment status of a cron job in a Kubernetes cluster. The method first tries to
* read the cron job using the V1Beta API, and if it fails, it falls back to reading the cron job
* using the V1 API.
*
* @param cronJobName the name of the cron job to be read
* @return an Optional containing the deployment status of the cron job if it exists, or an empty
* Optional if the cron job does not exist or cannot be read
*/
public Optional<JobDeploymentStatus> readCronJob(String cronJobName) {
return getK8sSupportsV1CronJob() ? readV1CronJob(cronJobName) : readV1beta1CronJob(cronJobName);
var jobStatus = readV1beta1CronJob(cronJobName);

return jobStatus.isPresent() ? jobStatus : readV1CronJob(cronJobName);
}

public Optional<JobDeploymentStatus> readV1beta1CronJob(String cronJobName) {
log.debug("Reading k8s cron job: {}", cronJobName);
log.debug("Reading k8s V1beta1 cron job: {}", cronJobName);
V1beta1CronJob cronJob = null;
try {
cronJob = new BatchV1beta1Api(client).readNamespacedCronJob(cronJobName, namespace, null);
cronJob = initBatchV1beta1Api().readNamespacedCronJob(cronJobName, namespace, null);
} catch (ApiException e) {
log.warn(
"Could not read cron job: {}; reason: {}",
Expand All @@ -469,10 +480,10 @@ public Optional<JobDeploymentStatus> readV1beta1CronJob(String cronJobName) {
}

public Optional<JobDeploymentStatus> readV1CronJob(String cronJobName) {
log.debug("Reading k8s cron job: {}", cronJobName);
log.debug("Reading k8s V1 cron job: {}", cronJobName);
V1CronJob cronJob = null;
try {
cronJob = new BatchV1Api(client).readNamespacedCronJob(cronJobName, namespace, null);
cronJob = initBatchV1Api().readNamespacedCronJob(cronJobName, namespace, null);
} catch (ApiException e) {
log.warn(
"Could not read cron job: {}; reason: {}",
Expand All @@ -490,17 +501,22 @@ public Optional<JobDeploymentStatus> readV1CronJob(String cronJobName) {
* data
*/
public List<JobDeploymentStatus> readJobDeploymentStatuses() {
return getK8sSupportsV1CronJob()
? readV1CronJobDeploymentStatuses()
: readV1beta1CronJobDeploymentStatuses();
if (getK8sSupportsV1CronJob()) {
return Stream.concat(
readV1CronJobDeploymentStatuses().stream(),
readV1beta1CronJobDeploymentStatuses().stream())
.collect(Collectors.toList());
} else {
return readV1beta1CronJobDeploymentStatuses();
}
}

public List<JobDeploymentStatus> readV1beta1CronJobDeploymentStatuses() {
log.debug("Reading all k8s cron jobs");
log.debug("Reading all k8s V1beta1 cron jobs");
V1beta1CronJobList cronJobs = null;
try {
cronJobs =
new BatchV1beta1Api(client)
initBatchV1beta1Api()
.listNamespacedCronJob(
namespace, null, null, null, null, null, null, null, null, null, null);
} catch (ApiException e) {
Expand All @@ -516,11 +532,11 @@ public List<JobDeploymentStatus> readV1beta1CronJobDeploymentStatuses() {
}

public List<JobDeploymentStatus> readV1CronJobDeploymentStatuses() {
log.debug("Reading all k8s cron jobs");
log.debug("Reading all k8s V1 cron jobs");
V1CronJobList cronJobs = null;
try {
cronJobs =
new BatchV1Api(client)
initBatchV1Api()
.listNamespacedCronJob(
namespace, null, null, null, null, null, null, null, null, null, null);
} catch (ApiException e) {
Expand Down Expand Up @@ -759,18 +775,38 @@ public void cancelRunningCronJob(String teamName, String jobName, String executi
}
}

/**
* Returns a set of cron job names for a given namespace in a Kubernetes cluster. The cron jobs
* can be of version V1 or V1Beta.
*
* @return a set of cron job names
* @throws ApiException if there is a problem accessing the Kubernetes API
*/
public Set<String> listCronJobs() throws ApiException {
log.debug("Listing k8s cron jobs");
var cronJobs =
new BatchV1beta1Api(client)
Set<String> v1CronJobNames = Collections.emptySet();

var v1CronJobs =
initBatchV1Api()
.listNamespacedCronJob(
namespace, null, null, null, null, null, null, null, null, null, null);
var set =
cronJobs.getItems().stream()
v1CronJobNames =
v1CronJobs.getItems().stream()
.map(j -> j.getMetadata().getName())
.collect(Collectors.toSet());
log.debug("K8s cron jobs: {}", set);
return set;
log.debug("K8s V1 cron jobs: {}", v1CronJobNames);

var v1BetaCronJobs =
initBatchV1beta1Api()
.listNamespacedCronJob(
namespace, null, null, null, null, null, null, null, null, null, null);
var v1BetaCronJobNames =
v1BetaCronJobs.getItems().stream()
.map(j -> j.getMetadata().getName())
.collect(Collectors.toSet());
log.debug("K8s V1Beta cron jobs: {}", v1BetaCronJobNames);
return Stream.concat(v1CronJobNames.stream(), v1BetaCronJobNames.stream())
.collect(Collectors.toSet());
}

public void createCronJob(
Expand Down Expand Up @@ -903,7 +939,7 @@ public void createV1beta1CronJob(
Map<String, String> jobLabels,
List<String> imagePullSecrets)
throws ApiException {
log.debug("Creating k8s cron job name:{}, image:{}", name, image);
log.debug("Creating k8s V1beta1 cron job name:{}, image:{}", name, image);
var cronJob =
v1beta1CronJobFromTemplate(
name,
Expand All @@ -920,10 +956,11 @@ public void createV1beta1CronJob(
V1beta1CronJob nsJob =
new BatchV1beta1Api(client)
.createNamespacedCronJob(namespace, cronJob, null, null, null, null);
log.debug("Created k8s cron job: {}", nsJob);
log.debug("Created k8s V1beta1 cron job: {}", nsJob);
log.debug(
"Created k8s cron job name: {}, uid:{}, link:{}",
"Created k8s cron job name: {}, api_version:{}, uid:{}, link:{}",
nsJob.getMetadata().getName(),
nsJob.getApiVersion(),
nsJob.getMetadata().getUid(),
nsJob.getMetadata().getSelfLink());
}
Expand All @@ -948,7 +985,7 @@ public void createV1CronJob(
Map<String, String> jobLabels,
List<String> imagePullSecrets)
throws ApiException {
log.debug("Creating k8s cron job name:{}, image:{}", name, image);
log.debug("Creating k8s V1 cron job name:{}, image:{}", name, image);
var cronJob =
v1CronJobFromTemplate(
name,
Expand All @@ -964,10 +1001,11 @@ public void createV1CronJob(
imagePullSecrets);
V1CronJob nsJob =
new BatchV1Api(client).createNamespacedCronJob(namespace, cronJob, null, null, null, null);
log.debug("Created k8s cron job: {}", nsJob);
log.debug("Created k8s V1 cron job: {}", nsJob);
log.debug(
"Created k8s cron job name: {}, uid:{}, link:{}",
"Created k8s cron job name: {}, api_version: {}, uid:{}, link:{}",
nsJob.getMetadata().getName(),
nsJob.getApiVersion(),
nsJob.getMetadata().getUid(),
nsJob.getMetadata().getSelfLink());
}
Expand Down Expand Up @@ -1117,7 +1155,7 @@ public void updateV1beta1CronJob(
new BatchV1beta1Api(client)
.replaceNamespacedCronJob(name, namespace, cronJob, null, null, null, null);
log.debug(
"Updated k8s cron job status for name:{}, image:{}, uid:{}, link:{}",
"Updated k8s V1beta1 cron job status for name:{}, image:{}, uid:{}, link:{}",
name,
image,
nsJob.getMetadata().getUid(),
Expand Down Expand Up @@ -1159,7 +1197,7 @@ public void updateV1CronJob(
new BatchV1Api(client)
.replaceNamespacedCronJob(name, namespace, cronJob, null, null, null, null);
log.debug(
"Updated k8s cron job status for name:{}, image:{}, uid:{}, link:{}",
"Updated k8s V1 cron job status for name:{}, image:{}, uid:{}, link:{}",
name,
image,
nsJob.getMetadata().getUid(),
Expand All @@ -1168,9 +1206,25 @@ public void updateV1CronJob(

public void deleteCronJob(String name) throws ApiException {
log.debug("Deleting k8s cron job: {}", name);

// If the V1 Cronjob API is enabled, we try to delete the cronjob with it and exit the method.
// If, however, the cronjob cannot be deleted, this means that it might have been created
// with the V1Beta1 API, so we need to try again with the beta API.
if (getK8sSupportsV1CronJob()) {
try {
new BatchV1Api(client)
.deleteNamespacedCronJob(name, namespace, null, null, null, null, null, null);
log.debug("Deleted k8s V1 cron job: {}", name);
return;
} catch (Exception e) {
log.debug("An exception occurred while trying to delete cron job. Message was: ", e);
}
}

try {
new BatchV1beta1Api(client)
.deleteNamespacedCronJob(name, namespace, null, null, null, null, null, null);
log.debug("Deleted k8s V1beta1 cron job: {}", name);
} catch (JsonSyntaxException e) {
if (e.getCause() instanceof IllegalStateException) {
IllegalStateException ise = (IllegalStateException) e.getCause();
Expand Down Expand Up @@ -2434,6 +2488,14 @@ private V1Secret buildV1Secret(String name, Map<String, byte[]> data) {
private Optional<JobDeploymentStatus> mapV1beta1CronJobToDeploymentStatus(
V1beta1CronJob cronJob, String cronJobName) {
JobDeploymentStatus deployment = null;
String apiVersion = null;

try {
apiVersion = cronJob.getApiVersion();
} catch (NullPointerException e) {
log.debug("Could not get API version for cronjob {}", cronJobName);
}

if (cronJob != null) {
deployment = new JobDeploymentStatus();
deployment.setEnabled(!cronJob.getSpec().getSuspend());
Expand Down Expand Up @@ -2487,7 +2549,15 @@ private Optional<JobDeploymentStatus> mapV1beta1CronJobToDeploymentStatus(
private Optional<JobDeploymentStatus> mapV1CronJobToDeploymentStatus(
V1CronJob cronJob, String cronJobName) {
JobDeploymentStatus deployment = null;
if (cronJob != null) {
String apiVersion = null;

try {
apiVersion = cronJob.getApiVersion();
} catch (NullPointerException e) {
log.debug("Could not get API version for cronjob {}", cronJobName);
}

if (cronJob != null && apiVersion != null && apiVersion.equals("batch/v1")) {
deployment = new JobDeploymentStatus();
deployment.setEnabled(!cronJob.getSpec().getSuspend());
deployment.setDataJobName(cronJob.getMetadata().getName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0

apiVersion: batch/v1
kind: CronJob
metadata:
annotations: # merged with additional annotations from TPCS
name: cronjob-template-name # overridden by TPCS
spec:
concurrencyPolicy: Forbid
failedJobsHistoryLimit: 2
schedule: "*/10 * * * *" # overridden by TPCS
startingDeadlineSeconds: 1800
successfulJobsHistoryLimit: 1
suspend: false # overridden by TPCS
jobTemplate:
metadata:
annotations: # merged with additional annotations from TPCS
labels: # merged with additional labels from TPCS
spec:
activeDeadlineSeconds: 43200
backoffLimit: 3
template:
metadata:
labels: # merged with additional labels from TPCS
spec:
containers: # overridden by TPCS
- command:
- /bin/sh
- -c
- date; echo '************** Cronjob Template ******************'
name: cronjob-template-container-name
image: busybox
imagePullPolicy: IfNotPresent
restartPolicy: OnFailure
securityContext:
runAsUser: 1000
runAsGroup: 1000
fsGroup: 1000
ttlSecondsAfterFinished: 600
Loading

0 comments on commit e538bc7

Please sign in to comment.