Skip to content

Commit

Permalink
addressing comments and refactoring new logic
Browse files Browse the repository at this point in the history
Signed-off-by: mrMoZ1 <[email protected]>
  • Loading branch information
mrMoZ1 committed Sep 27, 2021
1 parent 1b8af93 commit ca79a52
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
public class JsonDissectException extends DomainError implements UserFacingError {
public JsonDissectException(Throwable cause) {
super("Control service failed to parse provided arguments to a valid JSON string.",
"The internal parser threw an exception.",
String.format("The internal parser threw an exception because: %s", cause.getMessage()),
"The requested system call will not complete.",
"Inspect the cause and re-try the call with arguments that can be parsed to JSON.",
cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@


import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.collect.Iterables;
import com.google.gson.JsonSyntaxException;
Expand Down Expand Up @@ -221,6 +220,9 @@ public enum PodTerminationMessage {
@Autowired
private UserAgentService userAgentService;

@Autowired
private JobCommandProvider jobCommandProvider;

/**
*
* @param namespace the namespace where the kubernetes operation will act on. leave empty to infer from kubeconfig
Expand Down Expand Up @@ -491,10 +493,7 @@ private void addExtraJobArgumentsToVdkContainer(V1Container container, Map<Strin
}

try {
var jobCommandProvider = new JobCommandProvider(jobName);
var jobArgumentsJsonString = new ObjectMapper().writeValueAsString(extraJobArguments);
var newCommand = jobCommandProvider.getJobCommand(jobArgumentsJsonString); // vdk run command is last in the list

var newCommand = jobCommandProvider.getJobCommand(jobName, extraJobArguments); // vdk run command is last in the list
container.setCommand(newCommand);
} catch (JsonProcessingException e) {
log.debug("JsonProcessingException", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,54 @@

package com.vmware.taurus.service.deploy;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

/**
* This class provides the command list for the K8S' data job
* container. The command list executes the data job, by calling
* vdk run ...
*/
@Component
public class JobCommandProvider {
private List<String> command;

public JobCommandProvider(String jobName) {
public JobCommandProvider() {

this.command = List.of(
"/bin/bash",
"-c",
String.format("export PYTHONPATH=/usr/local/lib/python3.7/site-packages:/vdk/site-packages/ && /vdk/vdk run ./%s", jobName)
"export PYTHONPATH=/usr/local/lib/python3.7/site-packages:/vdk/site-packages/ && /vdk/vdk run"
);
}

public List<String> getJobCommand() {
return command;
private String getJobNameArgument(String jobName) {
return String.format(" ./%s", jobName);
}

private String getExtraArguments(String arguments) {
return String.format(" --arguments '%s'", arguments);
}

public List<String> getJobCommand(String jobName) {

return List.of(
command.get(0),
command.get(1),
command.get(2) + getJobNameArgument(jobName)
);
}

public List<String> getJobCommand(String extraArguments) {
public List<String> getJobCommand(String jobName, Map<String, Object> extraArguments) throws JsonProcessingException {
var arguments = new ObjectMapper().writeValueAsString(extraArguments);
return List.of(
command.get(0),
command.get(1),
command.get(2) + String.format(" --arguments '%s'", extraArguments)
command.get(2) + getJobNameArgument(jobName) + getExtraArguments(arguments)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -65,6 +66,7 @@ public class JobImageDeployer {
private final DataJobDefaultConfigurations defaultConfigurations;
private final DeploymentProgress deploymentProgress;
private final KubernetesResources kubernetesResources;
private final JobCommandProvider jobCommandProvider;

public Optional<JobDeploymentStatus> readScheduledJob(String dataJobName) {
Optional<JobDeploymentStatus> jobDeployment = dataJobsKubernetesService.readCronJob(getCronJobName(dataJobName));
Expand Down Expand Up @@ -197,8 +199,7 @@ private void updateCronJob(DataJob dataJob,
jobContainerEnvVars.putAll(vdkEnvs);
jobContainerEnvVars.putAll(jobConfigBasedEnvVars(dataJob.getJobConfig()));

var jobCommandProvider = new JobCommandProvider(jobName);
var jobCommand = jobCommandProvider.getJobCommand();
var jobCommand = jobCommandProvider.getJobCommand(jobName);

// The job name is used as the container name. This is something that we rely on later,
// when watching for pod modifications in DataJobStatusMonitor.watchPods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class JobImageDeployerTest {
private DataJobDefaultConfigurations dataJobDefaultConfigurations;
private DeploymentProgress deploymentProgress;
private KubernetesResources kubernetesResources;
private JobCommandProvider jobCommandProvider;

@BeforeEach
public void setUp() {
Expand All @@ -41,9 +42,10 @@ public void setUp() {
dataJobDefaultConfigurations = Mockito.mock(DataJobDefaultConfigurations.class);
deploymentProgress = Mockito.mock(DeploymentProgress.class);
kubernetesResources = Mockito.mock(KubernetesResources.class);
jobCommandProvider = Mockito.mock(JobCommandProvider.class);

jobImageDeployer = new JobImageDeployer(jobCredentialsService, dataJobsKubernetesService, vdkOptionsReader,
dataJobDefaultConfigurations, deploymentProgress, kubernetesResources);
dataJobDefaultConfigurations, deploymentProgress, kubernetesResources, jobCommandProvider);

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2021 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.service;

import com.vmware.taurus.service.deploy.JobCommandProvider;
import com.vmware.taurus.service.kubernetes.DataJobsKubernetesService;
import io.kubernetes.client.apis.BatchV1beta1Api;
import io.kubernetes.client.models.V1JobSpec;
import io.kubernetes.client.models.V1beta1CronJob;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import static org.mockito.ArgumentMatchers.any;


public class KubernetesServiceStartJobWithArgumentsIT {

private KubernetesService kubernetesService;

@BeforeEach
public void getMockKubernetesServiceForVdkRunExtraArgsTests() throws Exception {

kubernetesService = Mockito.mock(KubernetesService.class);
V1beta1CronJob internalCronjobTemplate = getValidCronJobForVdkRunExtraArgsTests();
BatchV1beta1Api mockBatch = Mockito.mock(BatchV1beta1Api.class);
Mockito.when(kubernetesService.initBatchV1beta1Api()).thenReturn(mockBatch);
Mockito.when(mockBatch.readNamespacedCronJob(any(), any(), any(), any(), any())).thenReturn(internalCronjobTemplate);
Mockito.doNothing().when(kubernetesService).createNewJob(any(), any(), any(), any());
Mockito.doCallRealMethod().when(kubernetesService).startNewCronJobExecution(any(), any(), any(), any(), any(), any());
kubernetesService.afterPropertiesSet();
// We have to set this field manually which would normally get injected by Spring, since
// Kubernetes Service is an abstract class and a lot of the methods which depend on this field and are mocked here
// cannot be mocked or accessed by the extending classes since they are not public.
var f1 = kubernetesService.getClass().getSuperclass().getDeclaredField("jobCommandProvider");
f1.setAccessible(true);
f1.set(kubernetesService, new JobCommandProvider());
}

@Test
public void testStartCronJobWithExtraArgumentForVdkRun() {

try {
ArgumentCaptor<V1JobSpec> specCaptor = ArgumentCaptor.forClass(V1JobSpec.class);
Map<String, Object> extraArgs = Map.of("argument1", "value1");
kubernetesService.startNewCronJobExecution("test-job", "test-id", new HashMap<>(), new HashMap<>(), extraArgs, "test-job");
Mockito.verify(kubernetesService).createNewJob(Mockito.any(), specCaptor.capture(), Mockito.any(), Mockito.any());
var capturedSpec = specCaptor.getValue();
var capturedCommand = capturedSpec.getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
//check if command arg starts correctly
Assertions.assertEquals("export PYTHONPATH=/usr/local/lib/python3.7/site-packages:/vdk/" +
"site-packages/ && /vdk/vdk run ./test-job --arguments '{\"argument1\":\"value1\"}'", capturedCommand);

} catch (Exception e) {
e.printStackTrace();
Assertions.fail(e.getMessage());
}
}

@Test
public void testStartCronJobWithExtraArgumentsForVdkRun() {

try {
ArgumentCaptor<V1JobSpec> specCaptor = ArgumentCaptor.forClass(V1JobSpec.class);
Map<String, Object> extraArgs = Map.of("argument1", "value1", "argument2", "value2");
kubernetesService.startNewCronJobExecution("test-job", "test-id", new HashMap<>(), new HashMap<>(), extraArgs, "test-job");
Mockito.verify(kubernetesService).createNewJob(Mockito.any(), specCaptor.capture(), Mockito.any(), Mockito.any());
var capturedSpec = specCaptor.getValue();
var capturedCommand = capturedSpec.getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
//check if command arg starts correctly
Assertions.assertTrue(capturedCommand.startsWith("export PYTHONPATH=/usr/local/lib/python3.7/si" +
"te-packages:/vdk/site-packages/ && /vdk/vdk run ./test-job --arguments '{"), "Vdk run command string invalid.");
//extra arguments passed as a map, print order might be different.
Assertions.assertTrue(capturedCommand.contains("\"argument2\":\"value2\""), "Second argument not present.");
Assertions.assertTrue(capturedCommand.contains("\"argument1\":\"value1\""), "First argument not present.");

} catch (Exception e) {
e.printStackTrace();
Assertions.fail(e.getMessage());
}
}

@Test
public void testStartCronJobWithNullArgumentsForVdkRun() {

try {
ArgumentCaptor<V1JobSpec> specCaptor = ArgumentCaptor.forClass(V1JobSpec.class);
kubernetesService.startNewCronJobExecution("test-job", "test-id", new HashMap<>(), new HashMap<>(), null, "test-job");

Mockito.verify(kubernetesService).createNewJob(Mockito.any(), specCaptor.capture(), Mockito.any(), Mockito.any());
var capturedSpec = specCaptor.getValue();
var capturedCommand = capturedSpec.getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
//check if command arg hasn't changed.
Assertions.assertEquals("export PYTHONPATH=/usr/local/lib/python3.7/site-packages:/vdk/" +
"site-packages/ && /vdk/vdk run ./test-job", capturedCommand);

} catch (Exception e) {
e.printStackTrace();
Assertions.fail(e.getMessage());
}
}

@Test
public void testStartCronJobWithEmptyArgumentsForVdkRun() {

try {
ArgumentCaptor<V1JobSpec> specCaptor = ArgumentCaptor.forClass(V1JobSpec.class);
kubernetesService.startNewCronJobExecution("test-job", "test-id", new HashMap<>(), new HashMap<>(), Map.of(), "test-job");

Mockito.verify(kubernetesService).createNewJob(Mockito.any(), specCaptor.capture(), Mockito.any(), Mockito.any());
var capturedSpec = specCaptor.getValue();
var capturedCommand = capturedSpec.getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
//check if command arg hasn't changed.
Assertions.assertEquals("export PYTHONPATH=/usr/local/lib/python3.7/site-packages:/vdk/" +
"site-packages/ && /vdk/vdk run ./test-job", capturedCommand);

} catch (Exception e) {
e.printStackTrace();
Assertions.fail(e.getMessage());
}
}

private V1beta1CronJob getValidCronJobForVdkRunExtraArgsTests() throws Exception {
KubernetesService service = new DataJobsKubernetesService("default", "someConfig");
// V1betaCronJob initializing snippet copied from tests above, using reflection
service.afterPropertiesSet();
Method loadInternalCronjobTemplate = KubernetesService.class.getDeclaredMethod("loadInternalCronjobTemplate");
if (loadInternalCronjobTemplate == null) {
Assertions.fail("The method 'loadInternalCronjobTemplate' does not exist.");
}
loadInternalCronjobTemplate.setAccessible(true);
V1beta1CronJob internalCronjobTemplate = (V1beta1CronJob) loadInternalCronjobTemplate.invoke(service);
var container = internalCronjobTemplate.getSpec()
.getJobTemplate()
.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0);

var newCommand = new ArrayList<>(container.getCommand());
newCommand.set(2, "export PYTHONPATH=/usr/local/lib/python3.7/site-packages:/vdk/site-packages/ && /vdk/vdk run ./test-job");
container.setCommand(newCommand);
return internalCronjobTemplate;
}

}
Loading

0 comments on commit ca79a52

Please sign in to comment.