Skip to content

Commit

Permalink
feat: replace arguments by commands
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Jan 14, 2025
1 parent ac2e18b commit f8b182b
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 26 deletions.
9 changes: 2 additions & 7 deletions src/main/java/io/kestra/plugin/aws/emr/AddJobFlowsSteps.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,8 @@
- name: Spark_job_test
jar: "command-runner.jar"
actionOnFailure: CONTINUE
arguments:
- "spark-submit"
- "s3://my-bucket/health_violations.py"
- "--data_source"
- "s3://my-bucket/food_establishment_data.csv"
- "--output_uri"
- "s3://my-bucket/test-emr-output"
commands:
- spark-submit s3://mybucket/health_violations.py --data_source s3://mybucket/food_establishment_data.csv --output_uri s3://mybucket/test-emr-output
"""
)
}
Expand Down
9 changes: 2 additions & 7 deletions src/main/java/io/kestra/plugin/aws/emr/CreateCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,8 @@
- name: Spark_job_test
jar: "command-runner.jar"
actionOnFailure: CONTINUE
arguments:
- "spark-submit"
- "s3://my-bucket/health_violations.py"
- "--data_source"
- "s3://my-bucket/food_establishment_data.csv"
- "--output_uri"
- "s3://my-bucket/test-emr-output"
commands:
- spark-submit s3://mybucket/health_violations.py --data_source s3://mybucket/food_establishment_data.csv --output_uri s3://mybucket/test-emr-output
wait: true
"""
)
Expand Down
19 changes: 16 additions & 3 deletions src/main/java/io/kestra/plugin/aws/emr/models/StepConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.aws.emr.models;

import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
Expand All @@ -10,6 +11,8 @@
import lombok.Getter;
import lombok.extern.jackson.Jacksonized;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static io.kestra.core.utils.Rethrow.throwConsumer;
Expand All @@ -26,8 +29,8 @@ public class StepConfig {
@Schema(title = "Main class.", description = "The name of the main class in the specified Java file. If not specified, the JAR file should specify a Main-Class in its manifest file.")
private Property<String> mainClass;

@Schema(title = "Arguments." , description = "A list of command line arguments passed to the JAR file's main function when executed.")
private Property<List<String>> arguments;
@Schema(title = "Commands." , description = "A list of commands that will be passed to the JAR file's main function when executed.")
private Property<List<String>> commands;

@Schema(title = "Step configuration name.", description = "Ex: \"Run Spark job\"")
@NotNull
Expand All @@ -44,11 +47,21 @@ public software.amazon.awssdk.services.emr.model.StepConfig toStep(RunContext ru
.hadoopJarStep(throwConsumer(hadoopJarStepBuilder ->
hadoopJarStepBuilder.jar(runContext.render(this.jar).as(String.class).orElseThrow())
.mainClass(runContext.render(this.mainClass).as(String.class).orElse(null))
.args(runContext.render(this.arguments).asList(String.class))
.args(commandToAwsArguments(runContext.render(this.commands).asList(String.class)))
.build()))
.build();
}

@VisibleForTesting
static List<String> commandToAwsArguments(List<String> commands) {
return commands.isEmpty() ? null : commands.stream()
.map(command -> Arrays.stream(command.split(" ")).toList())
.reduce(new ArrayList<>(), (acc, command) -> {
acc.addAll(command);
return acc;
});
}

public enum Action {
TERMINATE_CLUSTER,
CANCEL_AND_WAIT,
Expand Down
12 changes: 3 additions & 9 deletions src/test/java/io/kestra/plugin/aws/emr/EmrIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,9 @@ void addStepsToCluster() throws Exception {
private StepConfig createPythonSparkJob() {
return StepConfig.builder()
.jar(Property.of("command-runner.jar"))
.arguments(Property.of(
List.of(
"spark-submit",
"s3://" + bucketName + "/health_violations.py",
"--data_source",
"s3://" + bucketName + "/food_establishment_data.csv",
"--output_uri",
"s3://" + bucketName + "/test-emr-output"
)
.commands(Property.of(
List.of("spark-submit s3://" + bucketName + "/health_violations.py --data_source s3://"
+ bucketName + "/food_establishment_data.csv --output_uri s3://" + bucketName + "/test-emr-output")
))
.name(Property.of("TEST SPARK JOB UNIT TEST"))
.actionOnFailure(Property.of(StepConfig.Action.CONTINUE))
Expand Down
27 changes: 27 additions & 0 deletions src/test/java/io/kestra/plugin/aws/emr/models/StepConfigTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.kestra.plugin.aws.emr.models;

import org.junit.jupiter.api.Test;

import java.util.List;

import static io.kestra.plugin.aws.emr.models.StepConfig.commandToAwsArguments;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

class StepConfigTest {
@Test
void createArgumentsFromCommandList() {
var commands = List.of("spark-submit s3://mybucket/health_violations.py --data_source s3://mybucket/food_establishment_data.csv --output_uri s3://mybucket/test-emr-output");

var expected = List.of(
"spark-submit",
"s3://mybucket/health_violations.py",
"--data_source",
"s3://mybucket/food_establishment_data.csv",
"--output_uri",
"s3://mybucket/test-emr-output"
);

assertThat(commandToAwsArguments(commands), is(expected));
}
}

0 comments on commit f8b182b

Please sign in to comment.