Skip to content

Commit

Permalink
feat: add cli functionality to dataproc quickstart (#2047)
Browse files Browse the repository at this point in the history
* Changed quickstart to be an executable program
  • Loading branch information
bradmiro authored Feb 3, 2020
1 parent 4dc3deb commit 6dd3dba
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 17 deletions.
11 changes: 10 additions & 1 deletion dataproc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<configuration>
<mainClass>Quickstart</mainClass>
</configuration>
</plugin>
</plugins>
</build>

<dependencyManagement>
Expand Down Expand Up @@ -65,5 +75,4 @@
<scope>test</scope>
</dependency>
</dependencies>

</project>
51 changes: 36 additions & 15 deletions dataproc/src/main/java/Quickstart.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@
*/

// [START dataproc_quickstart]
/* This quickstart sample walks a user through creating a Cloud Dataproc
* cluster, submitting a PySpark job from Google Cloud Storage to the
* cluster, reading the output of the job and deleting the cluster, all
* using the Java client library.
*
* Usage:
* mvn clean package -DskipTests
*
* mvn exec:java -Dexec.args="<PROJECT_ID> <REGION> <CLUSTER_NAME> <GCS_JOB_FILE_PATH>"
*
* You can also set these arguments in the main function instead of providing them via the CLI.
*/

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.dataproc.v1.Cluster;
import com.google.cloud.dataproc.v1.ClusterConfig;
Expand Down Expand Up @@ -60,15 +73,6 @@ public static Job waitForJobCompletion(
}
}

public static void quickstart() throws IOException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String region = "your-project-region";
String clusterName = "your-cluster-name";
String jobFilePath = "your-job-file-path";
quickstart(projectId, region, clusterName, jobFilePath);
}

public static void quickstart(
String projectId, String region, String clusterName, String jobFilePath)
throws IOException, InterruptedException {
Expand All @@ -82,10 +86,10 @@ public static void quickstart(
JobControllerSettings jobControllerSettings =
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();

// Create both a cluster controller client and job controller client with the configured
// settings. The client only needs to be created once and can be reused for multiple requests.
// Using a try-with-resources closes the client, but this can also be done manually with
// the .close() method.
// Create both a cluster controller client and job controller client with the
// configured settings. The client only needs to be created once and can be reused for
// multiple requests. Using a try-with-resources closes the client, but this can also be done
// manually with the .close() method.
try (ClusterControllerClient clusterControllerClient =
ClusterControllerClient.create(clusterControllerSettings);
JobControllerClient jobControllerClient =
Expand Down Expand Up @@ -114,7 +118,8 @@ public static void quickstart(
OperationFuture<Cluster, ClusterOperationMetadata> createClusterAsyncRequest =
clusterControllerClient.createClusterAsync(projectId, region, cluster);
Cluster response = createClusterAsyncRequest.get();
System.out.printf("Cluster created successfully: %s", response.getClusterName());
System.out.println(
String.format("Cluster created successfully: %s", response.getClusterName()));

// Configure the settings for our job.
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
Expand All @@ -133,7 +138,7 @@ public static void quickstart(
int timeout = 10;
try {
Job jobInfo = finishedJobFuture.get(timeout, TimeUnit.MINUTES);
System.out.printf("Job %s finished successfully.", jobId);
System.out.println(String.format("Job %s finished successfully.", jobId));

// Cloud Dataproc job output gets saved to a GCS bucket allocated to it.
Cluster clusterInfo = clusterControllerClient.getCluster(projectId, region, clusterName);
Expand Down Expand Up @@ -163,5 +168,21 @@ public static void quickstart(
System.err.println(String.format("Error executing quickstart: %s ", e.getMessage()));
}
}

public static void main(String... args) throws IOException, InterruptedException {
if (args.length != 4) {
System.err.println(
"Insufficient number of parameters provided. Please make sure a "
+ "PROJECT_ID, REGION, CLUSTER_NAME and JOB_FILE_PATH are provided, in this order.");
return;
}

String projectId = args[0]; // project-id of project to create the cluster in
String region = args[1]; // region to create the cluster
String clusterName = args[2]; // name of the cluster
String jobFilePath = args[3]; // location in GCS of the PySpark job

quickstart(projectId, region, clusterName, jobFilePath);
}
}
// [END dataproc_quickstart]
2 changes: 1 addition & 1 deletion dataproc/src/test/java/QuickstartTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void setUp() {

@Test
public void quickstartTest() throws IOException, InterruptedException {
Quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH);
Quickstart.main(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH);
String output = bout.toString();

assertThat(output, CoreMatchers.containsString("Cluster created successfully"));
Expand Down

0 comments on commit 6dd3dba

Please sign in to comment.