Skip to content

Commit

Permalink
control-service: passing data job arguments through execute API (#267)
Browse files Browse the repository at this point in the history
why: As a user I want to be able to pass extra arguments to my
data job executions when running the data job manually through
the execute API, like I can when running the data job locally. This
change enables users to pass arguments to data job executions
through the control-service API for starting data job executions.

what: Added functionality to the KubernetesService class. We
add the extra arguments to the job container's command list.

testing: added unit tests, ci/cd, manual testing - with debugger
made sure that the command flow of added functionality is correct.
Signed-off-by: Momchil Zhivkov [email protected]
  • Loading branch information
Momchil Z authored Sep 28, 2021
1 parent a79362f commit ac7bd62
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2021 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.exception;

import org.springframework.http.HttpStatus;

public class JsonDissectException extends DomainError implements UserFacingError {
public JsonDissectException(Throwable cause) {
super("Control service failed to parse provided arguments to a valid JSON string.",
String.format("The internal parser threw an exception because: %s", cause.getMessage()),
"The requested system call will not complete.",
"Inspect the cause and re-try the call with arguments that can be parsed to JSON.",
cause);
}

@Override
public HttpStatus getHttpStatus() {
return HttpStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2021 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.exception;

import org.springframework.http.HttpStatus;

public class KubernetesJobDefinitionException extends SystemError implements UserFacingError {
public KubernetesJobDefinitionException(String jobName) {
super(String.format("A configuration error with the current kubernetes job: '%s' definition was found.", jobName),
"Likely due to recent changes in the internal implementation.",
"The current call will not be processed.",
"Please open a new GitHub issue with the details of this error.",
null);
}

@Override
public HttpStatus getHttpStatus() {
return HttpStatus.INTERNAL_SERVER_ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
package com.vmware.taurus.service;


import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Charsets;
import com.google.common.collect.Iterables;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;
import com.vmware.taurus.exception.KubernetesException;
import com.vmware.taurus.exception.*;
import com.vmware.taurus.service.deploy.JobCommandProvider;
import com.vmware.taurus.service.model.JobAnnotation;
import com.vmware.taurus.service.model.JobDeploymentStatus;
import com.vmware.taurus.service.model.JobLabel;
Expand Down Expand Up @@ -217,6 +220,9 @@ public enum PodTerminationMessage {
@Autowired
private UserAgentService userAgentService;

@Autowired
private JobCommandProvider jobCommandProvider;

/**
*
* @param namespace the namespace where the kubernetes operation will act on. leave empty to infer from kubeconfig
Expand Down Expand Up @@ -434,7 +440,8 @@ public List<JobDeploymentStatus> readJobDeploymentStatuses() {
.collect(Collectors.toList());
}

public void startNewCronJobExecution(String cronJobName, String executionId, Map<String, String> annotations, Map<String, String> envs) throws ApiException {
public void startNewCronJobExecution(String cronJobName, String executionId, Map<String, String> annotations,
Map<String, String> envs, Map<String, Object> extraJobArguments, String jobName) throws ApiException {
var cron = initBatchV1beta1Api().readNamespacedCronJob(cronJobName, namespace, null, null, null);

Optional<V1beta1JobTemplateSpec> jobTemplateSpec = Optional.ofNullable(cron)
Expand Down Expand Up @@ -464,14 +471,36 @@ public void startNewCronJobExecution(String cronJobName, String executionId, Map
.map(v1PodSpec -> v1PodSpec.getContainers())
.map(v1Containers -> v1Containers.get(0))
.orElseThrow(() -> new ApiException(String.format("K8S Cron Job '%s' is not properly defined.", cronJobName)));

if (!CollectionUtils.isEmpty(envs)) {
envs.forEach((name, value) -> v1Container.addEnvItem(new V1EnvVar().name(name).value(value)));
}

if (!CollectionUtils.isEmpty(extraJobArguments)) {
addExtraJobArgumentsToVdkContainer(v1Container, extraJobArguments, jobName);
}

createNewJob(executionId, jobSpec, jobLabels, jobAnnotations);
}

private void addExtraJobArgumentsToVdkContainer(V1Container container, Map<String, Object> extraJobArguments,
String jobName) {
var commandList = new ArrayList<>(container.getCommand()); // create a new List, since old might be immutable.

if (commandList.size() < 3 || !Iterables.getLast(commandList).contains("vdk run")) {
// If current job template definition changes we will throw an exception and will have to change this implementation.
log.debug("Command list: {}", commandList);
throw new KubernetesJobDefinitionException(jobName);
}

try {
var newCommand = jobCommandProvider.getJobCommand(jobName, extraJobArguments); // vdk run command is last in the list
container.setCommand(newCommand);
} catch (JsonProcessingException e) {
log.debug("JsonProcessingException", e);
throw new JsonDissectException(e);
}
}

public void cancelRunningCronJob(String teamName, String jobName, String executionId) throws ApiException {
log.info("K8S deleting job for team: {} data job name: {} execution: {}", teamName, jobName, executionId);
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2021 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.service.deploy;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

/**
* This class provides the command list for the K8S' data job
* container. The command list executes the data job, by calling
* vdk run ...
*/
@Component
public class JobCommandProvider {
private List<String> command;

public JobCommandProvider() {

this.command = List.of(
"/bin/bash",
"-c",
"export PYTHONPATH=/usr/local/lib/python3.7/site-packages:/vdk/site-packages/ && /vdk/vdk run"
);
}

private String getJobNameArgument(String jobName) {
return String.format(" ./%s", jobName);
}

private String getExtraArguments(String arguments) {
return String.format(" --arguments '%s'", arguments);
}

public List<String> getJobCommand(String jobName) {

return List.of(
command.get(0),
command.get(1),
command.get(2) + getJobNameArgument(jobName)
);
}

public List<String> getJobCommand(String jobName, Map<String, Object> extraArguments) throws JsonProcessingException {
var arguments = new ObjectMapper().writeValueAsString(extraArguments);
return List.of(
command.get(0),
command.get(1),
command.get(2) + getJobNameArgument(jobName) + getExtraArguments(arguments)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@

package com.vmware.taurus.service.deploy;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import com.google.gson.Gson;
import com.vmware.taurus.exception.KubernetesException;
import com.vmware.taurus.service.KubernetesService;
import com.vmware.taurus.service.credentials.JobCredentialsService;
import com.vmware.taurus.service.kubernetes.DataJobsKubernetesService;
import com.vmware.taurus.service.model.*;
import com.vmware.taurus.service.notification.NotificationContent;
import io.kubernetes.client.ApiException;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -25,18 +21,9 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.vmware.taurus.exception.KubernetesException;
import com.vmware.taurus.service.KubernetesService;
import com.vmware.taurus.service.credentials.JobCredentialsService;
import com.vmware.taurus.service.kubernetes.DataJobsKubernetesService;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.model.DeploymentStatus;
import com.vmware.taurus.service.model.JobAnnotation;
import com.vmware.taurus.service.model.JobConfig;
import com.vmware.taurus.service.model.JobDeployment;
import com.vmware.taurus.service.model.JobDeploymentStatus;
import com.vmware.taurus.service.model.JobLabel;
import com.vmware.taurus.service.notification.NotificationContent;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.*;

/**
* Takes care of deploying a Data Job in Kubernetes and scheduling it to run.
Expand Down Expand Up @@ -65,6 +52,7 @@ public class JobImageDeployer {
private final DataJobDefaultConfigurations defaultConfigurations;
private final DeploymentProgress deploymentProgress;
private final KubernetesResources kubernetesResources;
private final JobCommandProvider jobCommandProvider;

public Optional<JobDeploymentStatus> readScheduledJob(String dataJobName) {
Optional<JobDeploymentStatus> jobDeployment = dataJobsKubernetesService.readCronJob(getCronJobName(dataJobName));
Expand Down Expand Up @@ -197,18 +185,14 @@ private void updateCronJob(DataJob dataJob,
jobContainerEnvVars.putAll(vdkEnvs);
jobContainerEnvVars.putAll(jobConfigBasedEnvVars(dataJob.getJobConfig()));

var jobCommand = List.of(
"/bin/bash",
"-c",
String.format("export PYTHONPATH=/usr/local/lib/python3.7/site-packages:/vdk/site-packages/ && /vdk/vdk run ./%s", jobName));
var jobCommand = jobCommandProvider.getJobCommand(jobName);

// The job name is used as the container name. This is something that we rely on later,
// when watching for pod modifications in DataJobStatusMonitor.watchPods
var jobContainer = KubernetesService.container(jobName, jobDeployment.getImageName(), false,
jobContainerEnvVars, List.of(),
List.of(volumeMount, secretVolumeMount), "Always", defaultConfigurations.dataJobRequests(),
defaultConfigurations.dataJobLimits(), null, jobCommand);

var vdkCommand = List.of(
"/bin/bash",
"-c",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public enum ExecutionType {
public String startDataJobExecution(String teamName, String jobName, String deploymentId, DataJobExecutionRequest jobExecutionRequest) {
// TODO: deployment ID support
// TODO: dataJobExecutionRequest args are ignored currently

var extraJobArguments = jobExecutionRequest.getArgs();
DataJob dataJob = jobsService.getByNameAndTeam(jobName, teamName).orElseThrow(() -> new DataJobNotFoundException(jobName));

JobDeploymentStatus jobDeploymentStatus = deploymentService.readDeployment(jobName.toLowerCase())
Expand All @@ -110,7 +110,6 @@ public String startDataJobExecution(String teamName, String jobName, String depl
if (dataJobsKubernetesService.isRunningJob(jobName)) {
throw new DataJobAlreadyRunningException(jobName);
}

Map<String, String> annotations = new LinkedHashMap<>();

String opId = operationContext.getOpId();
Expand All @@ -127,7 +126,7 @@ public String startDataJobExecution(String teamName, String jobName, String depl
envs.put(JobEnvVar.VDK_OP_ID.getValue(), opId);

// Start K8S Job
dataJobsKubernetesService.startNewCronJobExecution(jobDeploymentStatus.getCronJobName(), executionId, annotations, envs);
dataJobsKubernetesService.startNewCronJobExecution(jobDeploymentStatus.getCronJobName(), executionId, annotations, envs, extraJobArguments, jobName);

// Save Data Job execution
saveDataJobExecution(dataJob, executionId, opId, com.vmware.taurus.service.model.ExecutionType.MANUAL, ExecutionStatus.SUBMITTED, startedByBuilt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class JobImageDeployerTest {
private DataJobDefaultConfigurations dataJobDefaultConfigurations;
private DeploymentProgress deploymentProgress;
private KubernetesResources kubernetesResources;
private JobCommandProvider jobCommandProvider;

@BeforeEach
public void setUp() {
Expand All @@ -41,9 +42,10 @@ public void setUp() {
dataJobDefaultConfigurations = Mockito.mock(DataJobDefaultConfigurations.class);
deploymentProgress = Mockito.mock(DeploymentProgress.class);
kubernetesResources = Mockito.mock(KubernetesResources.class);
jobCommandProvider = Mockito.mock(JobCommandProvider.class);

jobImageDeployer = new JobImageDeployer(jobCredentialsService, dataJobsKubernetesService, vdkOptionsReader,
dataJobDefaultConfigurations, deploymentProgress, kubernetesResources);
dataJobDefaultConfigurations, deploymentProgress, kubernetesResources, jobCommandProvider);

}

Expand Down
Loading

0 comments on commit ac7bd62

Please sign in to comment.