diff --git a/dataproc/snippets/src/main/java/CreateCluster.java b/dataproc/snippets/src/main/java/CreateCluster.java index f68852e8e2e..0815a35ffe0 100644 --- a/dataproc/snippets/src/main/java/CreateCluster.java +++ b/dataproc/snippets/src/main/java/CreateCluster.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Google Inc. + * Copyright 2019 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,57 +27,58 @@ public class CreateCluster { + public static void createCluster() throws IOException, InterruptedException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String region = "your-project-region"; + String clusterName = "your-cluster-name"; + createCluster(projectId, region, clusterName); + } + public static void createCluster(String projectId, String region, String clusterName) throws IOException, InterruptedException { String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region); - // Configure the settings for the cluster controller client + // Configure the settings for the cluster controller client. ClusterControllerSettings clusterControllerSettings = ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); - // Create a cluster controller client with the configured settings. We only need to create - // the client once, and can be reused for multiple requests. Using a try-with-resources - // will close the client for us, but this can also be done manually with the .close() method. - try (ClusterControllerClient clusterControllerClient = ClusterControllerClient - .create(clusterControllerSettings)) { - // Configure the settings for our cluster - InstanceGroupConfig masterConfig = InstanceGroupConfig.newBuilder() - .setMachineTypeUri("n1-standard-1") - .setNumInstances(1) - .build(); - InstanceGroupConfig workerConfig = InstanceGroupConfig.newBuilder() - .setMachineTypeUri("n1-standard-1") - .setNumInstances(2) - .build(); - ClusterConfig clusterConfig = ClusterConfig.newBuilder() - .setMasterConfig(masterConfig) - .setWorkerConfig(workerConfig) - .build(); - // Create the cluster object with the desired cluster config - Cluster cluster = Cluster.newBuilder() - .setClusterName(clusterName) - .setConfig(clusterConfig) - .build(); + // Create a cluster controller client with the configured settings. The client only needs to be + // created once and can be reused for multiple requests. Using a try-with-resources + // closes the client, but this can also be done manually with the .close() method. + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + // Configure the settings for our cluster. + InstanceGroupConfig masterConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-1") + .setNumInstances(1) + .build(); + InstanceGroupConfig workerConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-1") + .setNumInstances(2) + .build(); + ClusterConfig clusterConfig = + ClusterConfig.newBuilder() + .setMasterConfig(masterConfig) + .setWorkerConfig(workerConfig) + .build(); + // Create the cluster object with the desired cluster config. + Cluster cluster = + Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build(); - // Create the Cloud Dataproc cluster + // Create the Cloud Dataproc cluster. OperationFuture createClusterAsyncRequest = clusterControllerClient.createClusterAsync(projectId, region, cluster); Cluster response = createClusterAsyncRequest.get(); - // Print out a success message - System.out.println( - String.format("Cluster created successfully: %s", response.getClusterName()) - ); + // Print out a success message. + System.out.printf("Cluster created successfully: %s", response.getClusterName()); - } catch (IOException e) { - // Likely this would occur due to issues authenticating with GCP. Make sure the environment - // variable GOOGLE_APPLICATION_CREDENTIALS is configured. - System.out.println("Error creating the cluster controller client: \n" + e.toString()); } catch (ExecutionException e) { - // Common issues for this include needing to increase compute engine quotas or a cluster of - // the same name already exists. - System.out.println("Error during cluster creation request: \n" + e.toString()); + System.err.println(String.format("Error executing createCluster: %s ", e.getMessage())); } } } -// [END dataproc_create_cluster] \ No newline at end of file +// [END dataproc_create_cluster] diff --git a/dataproc/snippets/src/main/java/Quickstart.java b/dataproc/snippets/src/main/java/Quickstart.java new file mode 100644 index 00000000000..0a0d177b1d0 --- /dev/null +++ b/dataproc/snippets/src/main/java/Quickstart.java @@ -0,0 +1,167 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// [START dataproc_quickstart] +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.Cluster; +import com.google.cloud.dataproc.v1.ClusterConfig; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.cloud.dataproc.v1.InstanceGroupConfig; +import com.google.cloud.dataproc.v1.Job; +import com.google.cloud.dataproc.v1.JobControllerClient; +import com.google.cloud.dataproc.v1.JobControllerSettings; +import com.google.cloud.dataproc.v1.JobPlacement; +import com.google.cloud.dataproc.v1.PySparkJob; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.protobuf.Empty; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class Quickstart { + + public static Job waitForJobCompletion( + JobControllerClient jobControllerClient, String projectId, String region, String jobId) { + while (true) { + // Poll the service periodically until the Job is in a finished state. + Job jobInfo = jobControllerClient.getJob(projectId, region, jobId); + switch (jobInfo.getStatus().getState()) { + case DONE: + case CANCELLED: + case ERROR: + return jobInfo; + default: + try { + // Wait a second in between polling attempts. + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + public static void quickstart() throws IOException, InterruptedException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String region = "your-project-region"; + String clusterName = "your-cluster-name"; + String jobFilePath = "your-job-file-path"; + quickstart(projectId, region, clusterName, jobFilePath); + } + + public static void quickstart( + String projectId, String region, String clusterName, String jobFilePath) + throws IOException, InterruptedException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region); + + // Configure the settings for the cluster controller client. + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Configure the settings for the job controller client. + JobControllerSettings jobControllerSettings = + JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Create both a cluster controller client and job controller client with the configured + // settings. The client only needs to be created once and can be reused for multiple requests. + // Using a try-with-resources closes the client, but this can also be done manually with + // the .close() method. + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings); + JobControllerClient jobControllerClient = + JobControllerClient.create(jobControllerSettings)) { + // Configure the settings for our cluster. + InstanceGroupConfig masterConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-1") + .setNumInstances(1) + .build(); + InstanceGroupConfig workerConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-1") + .setNumInstances(2) + .build(); + ClusterConfig clusterConfig = + ClusterConfig.newBuilder() + .setMasterConfig(masterConfig) + .setWorkerConfig(workerConfig) + .build(); + // Create the cluster object with the desired cluster config. + Cluster cluster = + Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build(); + + // Create the Cloud Dataproc cluster. + OperationFuture createClusterAsyncRequest = + clusterControllerClient.createClusterAsync(projectId, region, cluster); + Cluster response = createClusterAsyncRequest.get(); + System.out.printf("Cluster created successfully: %s", response.getClusterName()); + + // Configure the settings for our job. + JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build(); + PySparkJob pySparkJob = PySparkJob.newBuilder().setMainPythonFileUri(jobFilePath).build(); + Job job = Job.newBuilder().setPlacement(jobPlacement).setPysparkJob(pySparkJob).build(); + + // Submit an asynchronous request to execute the job. + Job request = jobControllerClient.submitJob(projectId, region, job); + String jobId = request.getReference().getJobId(); + System.out.println(String.format("Submitted job \"%s\"", jobId)); + + // Wait for the job to finish. + CompletableFuture finishedJobFuture = + CompletableFuture.supplyAsync( + () -> waitForJobCompletion(jobControllerClient, projectId, region, jobId)); + int timeout = 10; + try { + Job jobInfo = finishedJobFuture.get(timeout, TimeUnit.MINUTES); + System.out.printf("Job %s finished successfully.", jobId); + + // Cloud Dataproc job output gets saved to a GCS bucket allocated to it. + Cluster clusterInfo = clusterControllerClient.getCluster(projectId, region, clusterName); + Storage storage = StorageOptions.getDefaultInstance().getService(); + Blob blob = + storage.get( + clusterInfo.getConfig().getConfigBucket(), + String.format( + "google-cloud-dataproc-metainfo/%s/jobs/%s/driveroutput.000000000", + clusterInfo.getClusterUuid(), jobId)); + System.out.println( + String.format( + "Job \"%s\" finished with state %s:\n%s", + jobId, jobInfo.getStatus().getState(), new String(blob.getContent()))); + } catch (TimeoutException e) { + System.err.println( + String.format("Job timed out after %d minutes: %s", timeout, e.getMessage())); + } + + // Delete the cluster. + OperationFuture deleteClusterAsyncRequest = + clusterControllerClient.deleteClusterAsync(projectId, region, clusterName); + deleteClusterAsyncRequest.get(); + System.out.println(String.format("Cluster \"%s\" successfully deleted.", clusterName)); + + } catch (ExecutionException e) { + System.err.println(String.format("Error executing quickstart: %s ", e.getMessage())); + } + } +} +// [END dataproc_quickstart] diff --git a/dataproc/snippets/src/test/java/CreateClusterTest.java b/dataproc/snippets/src/test/java/CreateClusterTest.java index 800856b02dc..539e0e2b624 100644 --- a/dataproc/snippets/src/test/java/CreateClusterTest.java +++ b/dataproc/snippets/src/test/java/CreateClusterTest.java @@ -38,18 +38,17 @@ @RunWith(JUnit4.class) public class CreateClusterTest { - private static final String BASE_CLUSTER_NAME = "test-cluster"; + private static final String CLUSTER_NAME = + String.format("java-cc-test-%s", UUID.randomUUID().toString()); private static final String REGION = "us-central1"; + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); - private static String projectId = System.getenv("GOOGLE_CLOUD_PROJECT"); - private String clusterName; private ByteArrayOutputStream bout; private static void requireEnv(String varName) { assertNotNull( - System.getenv(varName), - String.format("Environment variable '%s' is required to perform these tests.", varName) - ); + String.format("Environment variable '%s' is required to perform these tests.", varName), + System.getenv(varName)); } @BeforeClass @@ -60,35 +59,30 @@ public static void checkRequirements() { @Before public void setUp() { - clusterName = String.format("%s-%s", BASE_CLUSTER_NAME, UUID.randomUUID().toString()); - bout = new ByteArrayOutputStream(); System.setOut(new PrintStream(bout)); } @Test public void createClusterTest() throws IOException, InterruptedException { - CreateCluster.createCluster(projectId, REGION, clusterName); + CreateCluster.createCluster(PROJECT_ID, REGION, CLUSTER_NAME); String output = bout.toString(); - assertThat(output, CoreMatchers.containsString(clusterName)); + assertThat(output, CoreMatchers.containsString(CLUSTER_NAME)); } @After - public void tearDown() throws IOException, InterruptedException { + public void tearDown() throws IOException, InterruptedException, ExecutionException { String myEndpoint = String.format("%s-dataproc.googleapis.com:443", REGION); ClusterControllerSettings clusterControllerSettings = ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); - try (ClusterControllerClient clusterControllerClient = ClusterControllerClient - .create(clusterControllerSettings)) { + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { OperationFuture deleteClusterAsyncRequest = - clusterControllerClient.deleteClusterAsync(projectId, REGION, clusterName); + clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME); deleteClusterAsyncRequest.get(); - - } catch (ExecutionException e) { - System.out.println("Error during cluster deletion: \n" + e.toString()); } } -} \ No newline at end of file +} diff --git a/dataproc/snippets/src/test/java/QuickstartTest.java b/dataproc/snippets/src/test/java/QuickstartTest.java new file mode 100644 index 00000000000..3296e224828 --- /dev/null +++ b/dataproc/snippets/src/test/java/QuickstartTest.java @@ -0,0 +1,121 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.nio.charset.StandardCharsets.UTF_8; +import static junit.framework.TestCase.assertNotNull; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.Cluster; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.protobuf.Empty; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class QuickstartTest { + + private static final String MY_UUID = UUID.randomUUID().toString(); + private static final String REGION = "us-central1"; + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String ENDPOINT = String.format("%s-dataproc.googleapis.com:443", REGION); + private static final String CLUSTER_NAME = String.format("java-qs-test-%s", MY_UUID); + private static final String BUCKET_NAME = String.format("java-dataproc-qs-test-%s", MY_UUID); + private static final String JOB_FILE_NAME = "sum.py"; + private static final String JOB_FILE_PATH = + String.format("gs://%s/%s", BUCKET_NAME, JOB_FILE_NAME); + private static final String SORT_CODE = + "import pyspark\n" + + "sc = pyspark.SparkContext()\n" + + "rdd = sc.parallelize((1,2,3,4,5))\n" + + "sum = rdd.reduce(lambda x, y: x + y)\n"; + + private ByteArrayOutputStream bout; + private Bucket bucket; + private Blob blob; + + private static void requireEnv(String varName) { + assertNotNull( + String.format("Environment variable '%s' is required to perform these tests.", varName), + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnv("GOOGLE_APPLICATION_CREDENTIALS"); + requireEnv("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bout)); + + Storage storage = StorageOptions.getDefaultInstance().getService(); + bucket = storage.create(BucketInfo.of(BUCKET_NAME)); + blob = bucket.create(JOB_FILE_NAME, SORT_CODE.getBytes(UTF_8), "text/plain"); + } + + @Test + public void quickstartTest() throws IOException, InterruptedException { + Quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH); + String output = bout.toString(); + + assertThat(output, CoreMatchers.containsString("Cluster created successfully")); + assertThat(output, CoreMatchers.containsString("Submitted job")); + assertThat(output, CoreMatchers.containsString("finished with state DONE:")); + assertThat(output, CoreMatchers.containsString("successfully deleted")); + } + + @After + public void teardown() throws IOException, InterruptedException, ExecutionException { + blob.delete(); + bucket.delete(); + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + for (Cluster element : + clusterControllerClient.listClusters(PROJECT_ID, REGION).iterateAll()) { + if (element.getClusterName() == CLUSTER_NAME) { + OperationFuture deleteClusterAsyncRequest = + clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME); + deleteClusterAsyncRequest.get(); + break; + } + } + } + } +}