Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control-service: passing data job arguments through execute API #267

Merged
merged 5 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2021 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.exception;

import org.springframework.http.HttpStatus;

public class JsonDissectException extends DomainError implements UserFacingError {
public JsonDissectException(Throwable cause) {
super("Control service failed to parse provided arguments to a valid JSON string.",
String.format("The internal parser threw an exception because: %s", cause.getMessage()),
"The requested system call will not complete.",
"Inspect the cause and re-try the call with arguments that can be parsed to JSON.",
cause);
}

@Override
public HttpStatus getHttpStatus() {
return HttpStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2021 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.exception;

import org.springframework.http.HttpStatus;

public class KubernetesJobDefinitionException extends SystemError implements UserFacingError {
public KubernetesJobDefinitionException(String jobName) {
super(String.format("A configuration error with the current kubernetes job: '%s' definition was found.", jobName),
"Likely due to recent changes in the internal implementation.",
"The current call will not be processed.",
"Please open a new GitHub issue with the details of this error.",
null);
}

@Override
public HttpStatus getHttpStatus() {
return HttpStatus.INTERNAL_SERVER_ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
package com.vmware.taurus.service;


import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Charsets;
import com.google.common.collect.Iterables;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;
import com.vmware.taurus.exception.KubernetesException;
import com.vmware.taurus.exception.*;
import com.vmware.taurus.service.deploy.JobCommandProvider;
import com.vmware.taurus.service.model.JobAnnotation;
import com.vmware.taurus.service.model.JobDeploymentStatus;
import com.vmware.taurus.service.model.JobLabel;
Expand Down Expand Up @@ -217,6 +220,9 @@ public enum PodTerminationMessage {
@Autowired
private UserAgentService userAgentService;

@Autowired
private JobCommandProvider jobCommandProvider;

/**
*
* @param namespace the namespace where the kubernetes operation will act on. leave empty to infer from kubeconfig
Expand Down Expand Up @@ -434,7 +440,8 @@ public List<JobDeploymentStatus> readJobDeploymentStatuses() {
.collect(Collectors.toList());
}

public void startNewCronJobExecution(String cronJobName, String executionId, Map<String, String> annotations, Map<String, String> envs) throws ApiException {
public void startNewCronJobExecution(String cronJobName, String executionId, Map<String, String> annotations,
Map<String, String> envs, Map<String, Object> extraJobArguments, String jobName) throws ApiException {
var cron = initBatchV1beta1Api().readNamespacedCronJob(cronJobName, namespace, null, null, null);

Optional<V1beta1JobTemplateSpec> jobTemplateSpec = Optional.ofNullable(cron)
Expand Down Expand Up @@ -464,14 +471,36 @@ public void startNewCronJobExecution(String cronJobName, String executionId, Map
.map(v1PodSpec -> v1PodSpec.getContainers())
.map(v1Containers -> v1Containers.get(0))
.orElseThrow(() -> new ApiException(String.format("K8S Cron Job '%s' is not properly defined.", cronJobName)));

if (!CollectionUtils.isEmpty(envs)) {
envs.forEach((name, value) -> v1Container.addEnvItem(new V1EnvVar().name(name).value(value)));
}

if (!CollectionUtils.isEmpty(extraJobArguments)) {
addExtraJobArgumentsToVdkContainer(v1Container, extraJobArguments, jobName);
}

createNewJob(executionId, jobSpec, jobLabels, jobAnnotations);
}

private void addExtraJobArgumentsToVdkContainer(V1Container container, Map<String, Object> extraJobArguments,
String jobName) {
var commandList = new ArrayList<>(container.getCommand()); // create a new List, since old might be immutable.

if (commandList.size() < 3 || !Iterables.getLast(commandList).contains("vdk run")) {
// If current job template definition changes we will throw an exception and will have to change this implementation.
log.debug("Command list: {}", commandList);
throw new KubernetesJobDefinitionException(jobName);
}

try {
var newCommand = jobCommandProvider.getJobCommand(jobName, extraJobArguments); // vdk run command is last in the list
container.setCommand(newCommand);
} catch (JsonProcessingException e) {
log.debug("JsonProcessingException", e);
throw new JsonDissectException(e);
}
}

public void cancelRunningCronJob(String teamName, String jobName, String executionId) throws ApiException {
log.info("K8S deleting job for team: {} data job name: {} execution: {}", teamName, jobName, executionId);
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2021 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.service.deploy;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

/**
* This class provides the command list for the K8S' data job
* container. The command list executes the data job, by calling
* vdk run ...
*/
@Component
public class JobCommandProvider {
private List<String> command;

public JobCommandProvider() {

this.command = List.of(
"/bin/bash",
"-c",
"export PYTHONPATH=/usr/local/lib/python3.7/site-packages:/vdk/site-packages/ && /vdk/vdk run"
);
}

private String getJobNameArgument(String jobName) {
return String.format(" ./%s", jobName);
}

private String getExtraArguments(String arguments) {
return String.format(" --arguments '%s'", arguments);
}

public List<String> getJobCommand(String jobName) {

return List.of(
command.get(0),
command.get(1),
command.get(2) + getJobNameArgument(jobName)
);
}

public List<String> getJobCommand(String jobName, Map<String, Object> extraArguments) throws JsonProcessingException {
var arguments = new ObjectMapper().writeValueAsString(extraArguments);
return List.of(
command.get(0),
command.get(1),
command.get(2) + getJobNameArgument(jobName) + getExtraArguments(arguments)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@

package com.vmware.taurus.service.deploy;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import com.google.gson.Gson;
import com.vmware.taurus.exception.KubernetesException;
import com.vmware.taurus.service.KubernetesService;
import com.vmware.taurus.service.credentials.JobCredentialsService;
import com.vmware.taurus.service.kubernetes.DataJobsKubernetesService;
import com.vmware.taurus.service.model.*;
import com.vmware.taurus.service.notification.NotificationContent;
import io.kubernetes.client.ApiException;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -25,18 +21,9 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.vmware.taurus.exception.KubernetesException;
import com.vmware.taurus.service.KubernetesService;
import com.vmware.taurus.service.credentials.JobCredentialsService;
import com.vmware.taurus.service.kubernetes.DataJobsKubernetesService;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.model.DeploymentStatus;
import com.vmware.taurus.service.model.JobAnnotation;
import com.vmware.taurus.service.model.JobConfig;
import com.vmware.taurus.service.model.JobDeployment;
import com.vmware.taurus.service.model.JobDeploymentStatus;
import com.vmware.taurus.service.model.JobLabel;
import com.vmware.taurus.service.notification.NotificationContent;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.*;

/**
* Takes care of deploying a Data Job in Kubernetes and scheduling it to run.
Expand Down Expand Up @@ -65,6 +52,7 @@ public class JobImageDeployer {
private final DataJobDefaultConfigurations defaultConfigurations;
private final DeploymentProgress deploymentProgress;
private final KubernetesResources kubernetesResources;
private final JobCommandProvider jobCommandProvider;

public Optional<JobDeploymentStatus> readScheduledJob(String dataJobName) {
Optional<JobDeploymentStatus> jobDeployment = dataJobsKubernetesService.readCronJob(getCronJobName(dataJobName));
Expand Down Expand Up @@ -197,18 +185,14 @@ private void updateCronJob(DataJob dataJob,
jobContainerEnvVars.putAll(vdkEnvs);
jobContainerEnvVars.putAll(jobConfigBasedEnvVars(dataJob.getJobConfig()));

var jobCommand = List.of(
"/bin/bash",
"-c",
String.format("export PYTHONPATH=/usr/local/lib/python3.7/site-packages:/vdk/site-packages/ && /vdk/vdk run ./%s", jobName));
var jobCommand = jobCommandProvider.getJobCommand(jobName);

// The job name is used as the container name. This is something that we rely on later,
// when watching for pod modifications in DataJobStatusMonitor.watchPods
var jobContainer = KubernetesService.container(jobName, jobDeployment.getImageName(), false,
jobContainerEnvVars, List.of(),
List.of(volumeMount, secretVolumeMount), "Always", defaultConfigurations.dataJobRequests(),
defaultConfigurations.dataJobLimits(), null, jobCommand);

var vdkCommand = List.of(
"/bin/bash",
"-c",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public enum ExecutionType {
public String startDataJobExecution(String teamName, String jobName, String deploymentId, DataJobExecutionRequest jobExecutionRequest) {
// TODO: deployment ID support
// TODO: dataJobExecutionRequest args are ignored currently

var extraJobArguments = jobExecutionRequest.getArgs();
DataJob dataJob = jobsService.getByNameAndTeam(jobName, teamName).orElseThrow(() -> new DataJobNotFoundException(jobName));

JobDeploymentStatus jobDeploymentStatus = deploymentService.readDeployment(jobName.toLowerCase())
Expand All @@ -110,7 +110,6 @@ public String startDataJobExecution(String teamName, String jobName, String depl
if (dataJobsKubernetesService.isRunningJob(jobName)) {
throw new DataJobAlreadyRunningException(jobName);
}

Map<String, String> annotations = new LinkedHashMap<>();

String opId = operationContext.getOpId();
Expand All @@ -127,7 +126,7 @@ public String startDataJobExecution(String teamName, String jobName, String depl
envs.put(JobEnvVar.VDK_OP_ID.getValue(), opId);

// Start K8S Job
dataJobsKubernetesService.startNewCronJobExecution(jobDeploymentStatus.getCronJobName(), executionId, annotations, envs);
dataJobsKubernetesService.startNewCronJobExecution(jobDeploymentStatus.getCronJobName(), executionId, annotations, envs, extraJobArguments, jobName);

// Save Data Job execution
saveDataJobExecution(dataJob, executionId, opId, com.vmware.taurus.service.model.ExecutionType.MANUAL, ExecutionStatus.SUBMITTED, startedByBuilt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class JobImageDeployerTest {
private DataJobDefaultConfigurations dataJobDefaultConfigurations;
private DeploymentProgress deploymentProgress;
private KubernetesResources kubernetesResources;
private JobCommandProvider jobCommandProvider;

@BeforeEach
public void setUp() {
Expand All @@ -41,9 +42,10 @@ public void setUp() {
dataJobDefaultConfigurations = Mockito.mock(DataJobDefaultConfigurations.class);
deploymentProgress = Mockito.mock(DeploymentProgress.class);
kubernetesResources = Mockito.mock(KubernetesResources.class);
jobCommandProvider = Mockito.mock(JobCommandProvider.class);

jobImageDeployer = new JobImageDeployer(jobCredentialsService, dataJobsKubernetesService, vdkOptionsReader,
dataJobDefaultConfigurations, deploymentProgress, kubernetesResources);
dataJobDefaultConfigurations, deploymentProgress, kubernetesResources, jobCommandProvider);

}

Expand Down
Loading