diff --git a/dataproc/pom.xml b/dataproc/pom.xml new file mode 100644 index 00000000000..60f66e3301e --- /dev/null +++ b/dataproc/pom.xml @@ -0,0 +1,65 @@ + + + 4.0.0 + com.example.dataproc + dataproc-snippets + jar + Google Dataproc Snippets + https://github.com/GoogleCloudPlatform/java-docs-samples/tree/main/dataproc + + + + com.google.cloud.samples + shared-configuration + 1.2.0 + + + + 1.8 + 1.8 + UTF-8 + + + + + + + + com.google.cloud + libraries-bom + 26.1.3 + pom + import + + + + + + + com.google.cloud + google-cloud-dataproc + + + + com.google.cloud + google-cloud-storage + + + junit + junit + 4.13.2 + test + + + com.google.truth + truth + 1.1.3 + test + + + + + diff --git a/dataproc/src/main/java/CreateCluster.java b/dataproc/src/main/java/CreateCluster.java new file mode 100644 index 00000000000..0623c8cc465 --- /dev/null +++ b/dataproc/src/main/java/CreateCluster.java @@ -0,0 +1,84 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// [START dataproc_create_cluster] +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.Cluster; +import com.google.cloud.dataproc.v1.ClusterConfig; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.cloud.dataproc.v1.InstanceGroupConfig; +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +public class CreateCluster { + + public static void createCluster() throws IOException, InterruptedException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String region = "your-project-region"; + String clusterName = "your-cluster-name"; + createCluster(projectId, region, clusterName); + } + + public static void createCluster(String projectId, String region, String clusterName) + throws IOException, InterruptedException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region); + + // Configure the settings for the cluster controller client. + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Create a cluster controller client with the configured settings. The client only needs to be + // created once and can be reused for multiple requests. Using a try-with-resources + // closes the client, but this can also be done manually with the .close() method. + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + // Configure the settings for our cluster. + InstanceGroupConfig masterConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-2") + .setNumInstances(1) + .build(); + InstanceGroupConfig workerConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-2") + .setNumInstances(2) + .build(); + ClusterConfig clusterConfig = + ClusterConfig.newBuilder() + .setMasterConfig(masterConfig) + .setWorkerConfig(workerConfig) + .build(); + // Create the cluster object with the desired cluster config. + Cluster cluster = + Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build(); + + // Create the Cloud Dataproc cluster. + OperationFuture createClusterAsyncRequest = + clusterControllerClient.createClusterAsync(projectId, region, cluster); + Cluster response = createClusterAsyncRequest.get(); + + // Print out a success message. + System.out.printf("Cluster created successfully: %s", response.getClusterName()); + + } catch (ExecutionException e) { + System.err.println(String.format("Error executing createCluster: %s ", e.getMessage())); + } + } +} +// [END dataproc_create_cluster] diff --git a/dataproc/src/main/java/CreateClusterWithAutoscaling.java b/dataproc/src/main/java/CreateClusterWithAutoscaling.java new file mode 100644 index 00000000000..981721a497a --- /dev/null +++ b/dataproc/src/main/java/CreateClusterWithAutoscaling.java @@ -0,0 +1,174 @@ +/* +* Copyright 2020 Google LLC +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* This sample creates a Dataproc cluster with an autoscaling policy enabled. +* The policy we will be creating mirrors the following YAML representation: +* + workerConfig: + minInstances: 2 + maxInstances: 100 + weight: 1 + secondaryWorkerConfig: + minInstances: 0 + maxInstances: 100 + weight: 1 + basicAlgorithm: + cooldownPeriod: 4m + yarnConfig: + scaleUpFactor: 0.05 + scaleDownFactor: 1.0 + scaleUpMinWorkerFraction: 0.0 + scaleDownMinWorkerFraction: 0.0 + gracefulDecommissionTimeout: 1h +*/ + +// [START dataproc_create_autoscaling_cluster] + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.AutoscalingConfig; +import com.google.cloud.dataproc.v1.AutoscalingPolicy; +import com.google.cloud.dataproc.v1.AutoscalingPolicyServiceClient; +import com.google.cloud.dataproc.v1.AutoscalingPolicyServiceSettings; +import com.google.cloud.dataproc.v1.BasicAutoscalingAlgorithm; +import com.google.cloud.dataproc.v1.BasicYarnAutoscalingConfig; +import com.google.cloud.dataproc.v1.Cluster; +import com.google.cloud.dataproc.v1.ClusterConfig; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.cloud.dataproc.v1.InstanceGroupAutoscalingPolicyConfig; +import com.google.cloud.dataproc.v1.InstanceGroupConfig; +import com.google.cloud.dataproc.v1.RegionName; +import com.google.protobuf.Duration; +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +public class CreateClusterWithAutoscaling { + + public static void createClusterwithAutoscaling() throws IOException, InterruptedException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String region = "your-project-region"; + String clusterName = "your-cluster-name"; + String autoscalingPolicyName = "your-autoscaling-policy"; + createClusterwithAutoscaling(projectId, region, clusterName, autoscalingPolicyName); + } + + public static void createClusterwithAutoscaling( + String projectId, String region, String clusterName, String autoscalingPolicyName) + throws IOException, InterruptedException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region); + + // Configure the settings for the cluster controller client. + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Configure the settings for the autoscaling policy service client. + AutoscalingPolicyServiceSettings autoscalingPolicyServiceSettings = + AutoscalingPolicyServiceSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Create a cluster controller client and an autoscaling controller client with the configured + // settings. The clients only need to be created once and can be reused for multiple requests. + // Using a + // try-with-resources closes the client, but this can also be done manually with the .close() + // method. + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings); + AutoscalingPolicyServiceClient autoscalingPolicyServiceClient = + AutoscalingPolicyServiceClient.create(autoscalingPolicyServiceSettings)) { + + // Create the Autoscaling policy. + InstanceGroupAutoscalingPolicyConfig workerInstanceGroupAutoscalingPolicyConfig = + InstanceGroupAutoscalingPolicyConfig.newBuilder() + .setMinInstances(2) + .setMaxInstances(100) + .setWeight(1) + .build(); + InstanceGroupAutoscalingPolicyConfig secondaryWorkerInstanceGroupAutoscalingPolicyConfig = + InstanceGroupAutoscalingPolicyConfig.newBuilder() + .setMinInstances(0) + .setMaxInstances(100) + .setWeight(1) + .build(); + BasicYarnAutoscalingConfig basicYarnApplicationConfig = + BasicYarnAutoscalingConfig.newBuilder() + .setScaleUpFactor(0.05) + .setScaleDownFactor(1.0) + .setScaleUpMinWorkerFraction(0.0) + .setScaleUpMinWorkerFraction(0.0) + .setGracefulDecommissionTimeout(Duration.newBuilder().setSeconds(3600).build()) + .build(); + BasicAutoscalingAlgorithm basicAutoscalingAlgorithm = + BasicAutoscalingAlgorithm.newBuilder() + .setCooldownPeriod(Duration.newBuilder().setSeconds(240).build()) + .setYarnConfig(basicYarnApplicationConfig) + .build(); + AutoscalingPolicy autoscalingPolicy = + AutoscalingPolicy.newBuilder() + .setId(autoscalingPolicyName) + .setWorkerConfig(workerInstanceGroupAutoscalingPolicyConfig) + .setSecondaryWorkerConfig(secondaryWorkerInstanceGroupAutoscalingPolicyConfig) + .setBasicAlgorithm(basicAutoscalingAlgorithm) + .build(); + RegionName parent = RegionName.of(projectId, region); + + // Policy is uploaded here. + autoscalingPolicyServiceClient.createAutoscalingPolicy(parent, autoscalingPolicy); + + // Now the policy can be referenced when creating a cluster. + String autoscalingPolicyUri = + String.format( + "projects/%s/locations/%s/autoscalingPolicies/%s", + projectId, region, autoscalingPolicyName); + AutoscalingConfig autoscalingConfig = + AutoscalingConfig.newBuilder().setPolicyUri(autoscalingPolicyUri).build(); + + // Configure the settings for our cluster. + InstanceGroupConfig masterConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-2") + .setNumInstances(1) + .build(); + InstanceGroupConfig workerConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-2") + .setNumInstances(2) + .build(); + ClusterConfig clusterConfig = + ClusterConfig.newBuilder() + .setMasterConfig(masterConfig) + .setWorkerConfig(workerConfig) + .setAutoscalingConfig(autoscalingConfig) + .build(); + + // Create the cluster object with the desired cluster config. + Cluster cluster = + Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build(); + + // Create the Dataproc cluster. + OperationFuture createClusterAsyncRequest = + clusterControllerClient.createClusterAsync(projectId, region, cluster); + Cluster response = createClusterAsyncRequest.get(); + + // Print out a success message. + System.out.printf("Cluster created successfully: %s", response.getClusterName()); + + } catch (ExecutionException e) { + // If cluster creation does not complete successfully, print the error message. + System.err.println(String.format("createClusterWithAutoscaling: %s ", e.getMessage())); + } + } +} +// [END dataproc_create_autoscaling_cluster] diff --git a/dataproc/src/main/java/InstantiateInlineWorkflowTemplate.java b/dataproc/src/main/java/InstantiateInlineWorkflowTemplate.java new file mode 100644 index 00000000000..4f3e73799c7 --- /dev/null +++ b/dataproc/src/main/java/InstantiateInlineWorkflowTemplate.java @@ -0,0 +1,121 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// [START dataproc_instantiate_inline_workflow_template] +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.ClusterConfig; +import com.google.cloud.dataproc.v1.GceClusterConfig; +import com.google.cloud.dataproc.v1.HadoopJob; +import com.google.cloud.dataproc.v1.ManagedCluster; +import com.google.cloud.dataproc.v1.OrderedJob; +import com.google.cloud.dataproc.v1.RegionName; +import com.google.cloud.dataproc.v1.WorkflowMetadata; +import com.google.cloud.dataproc.v1.WorkflowTemplate; +import com.google.cloud.dataproc.v1.WorkflowTemplatePlacement; +import com.google.cloud.dataproc.v1.WorkflowTemplateServiceClient; +import com.google.cloud.dataproc.v1.WorkflowTemplateServiceSettings; +import com.google.protobuf.Empty; +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +public class InstantiateInlineWorkflowTemplate { + + public static void instantiateInlineWorkflowTemplate() throws IOException, InterruptedException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String region = "your-project-region"; + instantiateInlineWorkflowTemplate(projectId, region); + } + + public static void instantiateInlineWorkflowTemplate(String projectId, String region) + throws IOException, InterruptedException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region); + + // Configure the settings for the workflow template service client. + WorkflowTemplateServiceSettings workflowTemplateServiceSettings = + WorkflowTemplateServiceSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Create a workflow template service client with the configured settings. The client only + // needs to be created once and can be reused for multiple requests. Using a try-with-resources + // closes the client, but this can also be done manually with the .close() method. + try (WorkflowTemplateServiceClient workflowTemplateServiceClient = + WorkflowTemplateServiceClient.create(workflowTemplateServiceSettings)) { + + // Configure the jobs within the workflow. + HadoopJob teragenHadoopJob = + HadoopJob.newBuilder() + .setMainJarFileUri("file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar") + .addArgs("teragen") + .addArgs("1000") + .addArgs("hdfs:///gen/") + .build(); + OrderedJob teragen = + OrderedJob.newBuilder().setHadoopJob(teragenHadoopJob).setStepId("teragen").build(); + + HadoopJob terasortHadoopJob = + HadoopJob.newBuilder() + .setMainJarFileUri("file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar") + .addArgs("terasort") + .addArgs("hdfs:///gen/") + .addArgs("hdfs:///sort/") + .build(); + OrderedJob terasort = + OrderedJob.newBuilder() + .setHadoopJob(terasortHadoopJob) + .addPrerequisiteStepIds("teragen") + .setStepId("terasort") + .build(); + + // Configure the cluster placement for the workflow. + // Leave "ZoneUri" empty for "Auto Zone Placement". + // GceClusterConfig gceClusterConfig = + // GceClusterConfig.newBuilder().setZoneUri("").build(); + GceClusterConfig gceClusterConfig = + GceClusterConfig.newBuilder().setZoneUri("us-central1-a").build(); + ClusterConfig clusterConfig = + ClusterConfig.newBuilder().setGceClusterConfig(gceClusterConfig).build(); + ManagedCluster managedCluster = + ManagedCluster.newBuilder() + .setClusterName("my-managed-cluster") + .setConfig(clusterConfig) + .build(); + WorkflowTemplatePlacement workflowTemplatePlacement = + WorkflowTemplatePlacement.newBuilder().setManagedCluster(managedCluster).build(); + + // Create the inline workflow template. + WorkflowTemplate workflowTemplate = + WorkflowTemplate.newBuilder() + .addJobs(teragen) + .addJobs(terasort) + .setPlacement(workflowTemplatePlacement) + .build(); + + // Submit the instantiated inline workflow template request. + String parent = RegionName.format(projectId, region); + OperationFuture instantiateInlineWorkflowTemplateAsync = + workflowTemplateServiceClient.instantiateInlineWorkflowTemplateAsync( + parent, workflowTemplate); + instantiateInlineWorkflowTemplateAsync.get(); + + // Print out a success message. + System.out.printf("Workflow ran successfully."); + + } catch (ExecutionException e) { + System.err.println(String.format("Error running workflow: %s ", e.getMessage())); + } + } +} +// [END dataproc_instantiate_inline_workflow_template] diff --git a/dataproc/src/main/java/Quickstart.java b/dataproc/src/main/java/Quickstart.java new file mode 100644 index 00000000000..f7911313cf1 --- /dev/null +++ b/dataproc/src/main/java/Quickstart.java @@ -0,0 +1,151 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// [START dataproc_quickstart] +/* This quickstart sample walks a user through creating a Cloud Dataproc + * cluster, submitting a PySpark job from Google Cloud Storage to the + * cluster, reading the output of the job and deleting the cluster, all + * using the Java client library. + * + * Usage: + * mvn clean package -DskipTests + * + * mvn exec:java -Dexec.args=" " + * + * You can also set these arguments in the main function instead of providing them via the CLI. + */ + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.Cluster; +import com.google.cloud.dataproc.v1.ClusterConfig; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.cloud.dataproc.v1.InstanceGroupConfig; +import com.google.cloud.dataproc.v1.Job; +import com.google.cloud.dataproc.v1.JobControllerClient; +import com.google.cloud.dataproc.v1.JobControllerSettings; +import com.google.cloud.dataproc.v1.JobMetadata; +import com.google.cloud.dataproc.v1.JobPlacement; +import com.google.cloud.dataproc.v1.PySparkJob; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.protobuf.Empty; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class Quickstart { + + public static void quickstart( + String projectId, String region, String clusterName, String jobFilePath) + throws IOException, InterruptedException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region); + + // Configure the settings for the cluster controller client. + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Configure the settings for the job controller client. + JobControllerSettings jobControllerSettings = + JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Create both a cluster controller client and job controller client with the + // configured settings. The client only needs to be created once and can be reused for + // multiple requests. Using a try-with-resources closes the client, but this can also be done + // manually with the .close() method. + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings); + JobControllerClient jobControllerClient = + JobControllerClient.create(jobControllerSettings)) { + // Configure the settings for our cluster. + InstanceGroupConfig masterConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-2") + .setNumInstances(1) + .build(); + InstanceGroupConfig workerConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-2") + .setNumInstances(2) + .build(); + ClusterConfig clusterConfig = + ClusterConfig.newBuilder() + .setMasterConfig(masterConfig) + .setWorkerConfig(workerConfig) + .build(); + // Create the cluster object with the desired cluster config. + Cluster cluster = + Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build(); + + // Create the Cloud Dataproc cluster. + OperationFuture createClusterAsyncRequest = + clusterControllerClient.createClusterAsync(projectId, region, cluster); + Cluster clusterResponse = createClusterAsyncRequest.get(); + System.out.println( + String.format("Cluster created successfully: %s", clusterResponse.getClusterName())); + + // Configure the settings for our job. + JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build(); + PySparkJob pySparkJob = PySparkJob.newBuilder().setMainPythonFileUri(jobFilePath).build(); + Job job = Job.newBuilder().setPlacement(jobPlacement).setPysparkJob(pySparkJob).build(); + + // Submit an asynchronous request to execute the job. + OperationFuture submitJobAsOperationAsyncRequest = + jobControllerClient.submitJobAsOperationAsync(projectId, region, job); + Job jobResponse = submitJobAsOperationAsyncRequest.get(); + + // Print output from Google Cloud Storage. + Matcher matches = + Pattern.compile("gs://(.*?)/(.*)").matcher(jobResponse.getDriverOutputResourceUri()); + matches.matches(); + + Storage storage = StorageOptions.getDefaultInstance().getService(); + Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2))); + + System.out.println( + String.format("Job finished successfully: %s", new String(blob.getContent()))); + + // Delete the cluster. + OperationFuture deleteClusterAsyncRequest = + clusterControllerClient.deleteClusterAsync(projectId, region, clusterName); + deleteClusterAsyncRequest.get(); + System.out.println(String.format("Cluster \"%s\" successfully deleted.", clusterName)); + + } catch (ExecutionException e) { + System.err.println(String.format("quickstart: %s ", e.getMessage())); + } + } + + public static void main(String... args) throws IOException, InterruptedException { + if (args.length != 4) { + System.err.println( + "Insufficient number of parameters provided. Please make sure a " + + "PROJECT_ID, REGION, CLUSTER_NAME and JOB_FILE_PATH are provided, in this order."); + return; + } + + String projectId = args[0]; // project-id of project to create the cluster in + String region = args[1]; // region to create the cluster + String clusterName = args[2]; // name of the cluster + String jobFilePath = args[3]; // location in GCS of the PySpark job + + quickstart(projectId, region, clusterName, jobFilePath); + } +} +// [END dataproc_quickstart] diff --git a/dataproc/src/main/java/SubmitHadoopFsJob.java b/dataproc/src/main/java/SubmitHadoopFsJob.java new file mode 100644 index 00000000000..0c26416c41b --- /dev/null +++ b/dataproc/src/main/java/SubmitHadoopFsJob.java @@ -0,0 +1,101 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// [START dataproc_submit_hadoop_fs_job] + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.HadoopJob; +import com.google.cloud.dataproc.v1.Job; +import com.google.cloud.dataproc.v1.JobControllerClient; +import com.google.cloud.dataproc.v1.JobControllerSettings; +import com.google.cloud.dataproc.v1.JobMetadata; +import com.google.cloud.dataproc.v1.JobPlacement; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.ExecutionException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SubmitHadoopFsJob { + + public static ArrayList stringToList(String s) { + return new ArrayList<>(Arrays.asList(s.split(" "))); + } + + public static void submitHadoopFsJob() throws IOException, InterruptedException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String region = "your-project-region"; + String clusterName = "your-cluster-name"; + String hadoopFsQuery = "your-hadoop-fs-query"; + submitHadoopFsJob(projectId, region, clusterName, hadoopFsQuery); + } + + public static void submitHadoopFsJob( + String projectId, String region, String clusterName, String hadoopFsQuery) + throws IOException, InterruptedException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region); + + // Configure the settings for the job controller client. + JobControllerSettings jobControllerSettings = + JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Create a job controller client with the configured settings. Using a try-with-resources + // closes the client, + // but this can also be done manually with the .close() method. + try (JobControllerClient jobControllerClient = + JobControllerClient.create(jobControllerSettings)) { + + // Configure cluster placement for the job. + JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build(); + + // Configure Hadoop job settings. The HadoopFS query is set here. + HadoopJob hadoopJob = + HadoopJob.newBuilder() + .setMainClass("org.apache.hadoop.fs.FsShell") + .addAllArgs(stringToList(hadoopFsQuery)) + .build(); + + Job job = Job.newBuilder().setPlacement(jobPlacement).setHadoopJob(hadoopJob).build(); + + // Submit an asynchronous request to execute the job. + OperationFuture submitJobAsOperationAsyncRequest = + jobControllerClient.submitJobAsOperationAsync(projectId, region, job); + + Job response = submitJobAsOperationAsyncRequest.get(); + + // Print output from Google Cloud Storage. + Matcher matches = + Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri()); + matches.matches(); + + Storage storage = StorageOptions.getDefaultInstance().getService(); + Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2))); + + System.out.println( + String.format("Job finished successfully: %s", new String(blob.getContent()))); + + } catch (ExecutionException e) { + // If the job does not complete successfully, print the error message. + System.err.println(String.format("submitHadoopFSJob: %s ", e.getMessage())); + } + } +} +// [END dataproc_submit_hadoop_fs_job] diff --git a/dataproc/src/main/java/SubmitJob.java b/dataproc/src/main/java/SubmitJob.java new file mode 100644 index 00000000000..93193a7aca5 --- /dev/null +++ b/dataproc/src/main/java/SubmitJob.java @@ -0,0 +1,94 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// [START dataproc_submit_job] + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.Job; +import com.google.cloud.dataproc.v1.JobControllerClient; +import com.google.cloud.dataproc.v1.JobControllerSettings; +import com.google.cloud.dataproc.v1.JobMetadata; +import com.google.cloud.dataproc.v1.JobPlacement; +import com.google.cloud.dataproc.v1.SparkJob; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SubmitJob { + + public static void submitJob() throws IOException, InterruptedException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String region = "your-project-region"; + String clusterName = "your-cluster-name"; + submitJob(projectId, region, clusterName); + } + + public static void submitJob(String projectId, String region, String clusterName) + throws IOException, InterruptedException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region); + + // Configure the settings for the job controller client. + JobControllerSettings jobControllerSettings = + JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Create a job controller client with the configured settings. Using a try-with-resources + // closes the client, + // but this can also be done manually with the .close() method. + try (JobControllerClient jobControllerClient = + JobControllerClient.create(jobControllerSettings)) { + + // Configure cluster placement for the job. + JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build(); + + // Configure Spark job settings. + SparkJob sparkJob = + SparkJob.newBuilder() + .setMainClass("org.apache.spark.examples.SparkPi") + .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar") + .addArgs("1000") + .build(); + + Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build(); + + // Submit an asynchronous request to execute the job. + OperationFuture submitJobAsOperationAsyncRequest = + jobControllerClient.submitJobAsOperationAsync(projectId, region, job); + + Job response = submitJobAsOperationAsyncRequest.get(); + + // Print output from Google Cloud Storage. + Matcher matches = + Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri()); + matches.matches(); + + Storage storage = StorageOptions.getDefaultInstance().getService(); + Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2))); + + System.out.println( + String.format("Job finished successfully: %s", new String(blob.getContent()))); + + } catch (ExecutionException e) { + // If the job does not complete successfully, print the error message. + System.err.println(String.format("submitJob: %s ", e.getMessage())); + } + } +} +// [END dataproc_submit_job] diff --git a/dataproc/src/test/java/CreateClusterTest.java b/dataproc/src/test/java/CreateClusterTest.java new file mode 100644 index 00000000000..9e8cb1affc1 --- /dev/null +++ b/dataproc/src/test/java/CreateClusterTest.java @@ -0,0 +1,87 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static junit.framework.TestCase.assertNotNull; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.protobuf.Empty; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class CreateClusterTest { + + private static final String CLUSTER_NAME = + String.format("java-cc-test-%s", UUID.randomUUID().toString()); + private static final String REGION = "us-central1"; + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + // private static final String PROJECT_ID = "gcloud-devel"; + private ByteArrayOutputStream bout; + + private static void requireEnv(String varName) { + assertNotNull( + String.format("Environment variable '%s' is required to perform these tests.", varName), + System.getenv(varName)); + } + /* + @BeforeClass + public static void checkRequirements() { + requireEnv("GOOGLE_APPLICATION_CREDENTIALS"); + requireEnv("GOOGLE_CLOUD_PROJECT"); + }*/ + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bout)); + } + + @Test + public void createClusterTest() throws IOException, InterruptedException { + CreateCluster.createCluster(PROJECT_ID, REGION, CLUSTER_NAME); + String output = bout.toString(); + + assertThat(output, CoreMatchers.containsString(CLUSTER_NAME)); + } + + @After + public void tearDown() throws IOException, InterruptedException, ExecutionException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", REGION); + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + OperationFuture deleteClusterAsyncRequest = + clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME); + deleteClusterAsyncRequest.get(); + } + } +} diff --git a/dataproc/src/test/java/CreateClusterWithAutoscalingTest.java b/dataproc/src/test/java/CreateClusterWithAutoscalingTest.java new file mode 100644 index 00000000000..1b8f15058b3 --- /dev/null +++ b/dataproc/src/test/java/CreateClusterWithAutoscalingTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static junit.framework.TestCase.assertNotNull; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.AutoscalingPolicyName; +import com.google.cloud.dataproc.v1.AutoscalingPolicyServiceClient; +import com.google.cloud.dataproc.v1.AutoscalingPolicyServiceSettings; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.protobuf.Empty; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class CreateClusterWithAutoscalingTest { + + private static final String CLUSTER_NAME = + String.format("java-as-test-%s", UUID.randomUUID().toString()); + private static final String REGION = "us-central1"; + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String AUTOSCALING_POLICY_NAME = + String.format("java-as-test-%s", UUID.randomUUID().toString()); + + private ByteArrayOutputStream bout; + + private static void requireEnv(String varName) { + assertNotNull( + String.format("Environment variable '%s' is required to perform these tests.", varName), + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnv("GOOGLE_APPLICATION_CREDENTIALS"); + requireEnv("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bout)); + } + + @After + public void tearDown() throws IOException, InterruptedException, ExecutionException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", REGION); + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + AutoscalingPolicyServiceSettings autoscalingPolicyServiceSettings = + AutoscalingPolicyServiceSettings.newBuilder().setEndpoint(myEndpoint).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings); + AutoscalingPolicyServiceClient autoscalingPolicyServiceClient = + AutoscalingPolicyServiceClient.create(autoscalingPolicyServiceSettings)) { + + OperationFuture deleteClusterAsyncRequest = + clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME); + deleteClusterAsyncRequest.get(); + + AutoscalingPolicyName name = + AutoscalingPolicyName.ofProjectLocationAutoscalingPolicyName( + PROJECT_ID, REGION, AUTOSCALING_POLICY_NAME); + autoscalingPolicyServiceClient.deleteAutoscalingPolicy(name); + } + } + + @Test + public void createClusterWithAutoscalingTest() throws IOException, InterruptedException { + CreateClusterWithAutoscaling.createClusterwithAutoscaling( + PROJECT_ID, REGION, CLUSTER_NAME, AUTOSCALING_POLICY_NAME); + String output = bout.toString(); + + assertThat(output, CoreMatchers.containsString(CLUSTER_NAME)); + } +} diff --git a/dataproc/src/test/java/InstantiateInlineWorkflowTemplateTest.java b/dataproc/src/test/java/InstantiateInlineWorkflowTemplateTest.java new file mode 100644 index 00000000000..0216ff36bea --- /dev/null +++ b/dataproc/src/test/java/InstantiateInlineWorkflowTemplateTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static junit.framework.TestCase.assertNotNull; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import org.hamcrest.CoreMatchers; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class InstantiateInlineWorkflowTemplateTest { + + private static final String REGION = "us-central1"; + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + + private static void requireEnv(String varName) { + assertNotNull( + String.format("Environment variable '%s' is required to perform these tests.", varName), + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnv("GOOGLE_APPLICATION_CREDENTIALS"); + requireEnv("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bout)); + } + + @Test + public void instanstiateInlineWorkflowTest() throws IOException, InterruptedException { + InstantiateInlineWorkflowTemplate.instantiateInlineWorkflowTemplate(PROJECT_ID, REGION); + String output = bout.toString(); + + assertThat(output, CoreMatchers.containsString("successfully")); + } +} diff --git a/dataproc/src/test/java/QuickstartTest.java b/dataproc/src/test/java/QuickstartTest.java new file mode 100644 index 00000000000..eff7ed05dd3 --- /dev/null +++ b/dataproc/src/test/java/QuickstartTest.java @@ -0,0 +1,117 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.nio.charset.StandardCharsets.UTF_8; +import static junit.framework.TestCase.assertNotNull; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.Cluster; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.cloud.testing.junit4.StdOutCaptureRule; +import com.google.protobuf.Empty; +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class QuickstartTest { + + private static final String MY_UUID = UUID.randomUUID().toString(); + private static final String REGION = "us-central1"; + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String ENDPOINT = String.format("%s-dataproc.googleapis.com:443", REGION); + private static final String CLUSTER_NAME = String.format("java-qs-test-%s", MY_UUID); + private static final String BUCKET_NAME = String.format("java-dataproc-qs-test-%s", MY_UUID); + private static final String JOB_FILE_NAME = "sum.py"; + private static final String JOB_FILE_PATH = + String.format("gs://%s/%s", BUCKET_NAME, JOB_FILE_NAME); + private static final String SORT_CODE = + "import pyspark\n" + + "sc = pyspark.SparkContext()\n" + + "rdd = sc.parallelize((1,2,3,4,5))\n" + + "sum = rdd.reduce(lambda x, y: x + y)\n"; + + @Rule public StdOutCaptureRule stdOutCapture = new StdOutCaptureRule(); + private Bucket bucket; + private Blob blob; + + private static void requireEnv(String varName) { + assertNotNull( + String.format("Environment variable '%s' is required to perform these tests.", varName), + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnv("GOOGLE_APPLICATION_CREDENTIALS"); + requireEnv("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() { + Storage storage = StorageOptions.getDefaultInstance().getService(); + bucket = storage.create(BucketInfo.of(BUCKET_NAME)); + blob = bucket.create(JOB_FILE_NAME, SORT_CODE.getBytes(UTF_8), "text/plain"); + } + + @Test + public void quickstartTest() throws IOException, InterruptedException { + Quickstart.main(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH); + String output = stdOutCapture.getCapturedOutputAsUtf8String(); + + assertThat(output, CoreMatchers.containsString("Cluster created successfully")); + assertThat(output, CoreMatchers.containsString("Job finished successfully:")); + assertThat(output, CoreMatchers.containsString("successfully deleted")); + } + + @After + public void teardown() throws IOException, InterruptedException, ExecutionException { + blob.delete(); + bucket.delete(); + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + for (Cluster element : + clusterControllerClient.listClusters(PROJECT_ID, REGION).iterateAll()) { + if (element.getClusterName() == CLUSTER_NAME) { + OperationFuture deleteClusterAsyncRequest = + clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME); + deleteClusterAsyncRequest.get(); + break; + } + } + } + } +} diff --git a/dataproc/src/test/java/SubmitHadoopFsJobTest.java b/dataproc/src/test/java/SubmitHadoopFsJobTest.java new file mode 100644 index 00000000000..341a3aab7ac --- /dev/null +++ b/dataproc/src/test/java/SubmitHadoopFsJobTest.java @@ -0,0 +1,121 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static junit.framework.TestCase.assertNotNull; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.Cluster; +import com.google.cloud.dataproc.v1.ClusterConfig; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.cloud.dataproc.v1.InstanceGroupConfig; +import com.google.protobuf.Empty; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SubmitHadoopFsJobTest { + + private static final String CLUSTER_NAME = + String.format("java-fs-test--%s", UUID.randomUUID().toString()); + private static final String REGION = "us-central1"; + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String ENDPOINT = String.format("%s-dataproc.googleapis.com:443", REGION); + private static final String HADOOP_FS_QUERY = "-ls /"; + + private ByteArrayOutputStream bout; + + private static void requireEnv(String varName) { + assertNotNull( + String.format("Environment variable '%s' is required to perform these tests.", varName), + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnv("GOOGLE_APPLICATION_CREDENTIALS"); + requireEnv("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() throws IOException, ExecutionException, InterruptedException { + bout = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bout)); + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + // Configure the settings for our cluster. + InstanceGroupConfig masterConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-2") + .setNumInstances(1) + .build(); + InstanceGroupConfig workerConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-2") + .setNumInstances(2) + .build(); + ClusterConfig clusterConfig = + ClusterConfig.newBuilder() + .setMasterConfig(masterConfig) + .setWorkerConfig(workerConfig) + .build(); + // Create the Dataproc cluster. + Cluster cluster = + Cluster.newBuilder().setClusterName(CLUSTER_NAME).setConfig(clusterConfig).build(); + OperationFuture createClusterAsyncRequest = + clusterControllerClient.createClusterAsync(PROJECT_ID, REGION, cluster); + createClusterAsyncRequest.get(); + } + } + + @Test + public void submitHadoopFsJobTest() throws IOException, InterruptedException { + SubmitHadoopFsJob.submitHadoopFsJob(PROJECT_ID, REGION, CLUSTER_NAME, HADOOP_FS_QUERY); + String output = bout.toString(); + + assertThat(output, CoreMatchers.containsString("/tmp")); + } + + @After + public void tearDown() throws IOException, InterruptedException, ExecutionException { + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + OperationFuture deleteClusterAsyncRequest = + clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME); + deleteClusterAsyncRequest.get(); + } + } +} diff --git a/dataproc/src/test/java/SubmitJobTest.java b/dataproc/src/test/java/SubmitJobTest.java new file mode 100644 index 00000000000..837a4afcb0e --- /dev/null +++ b/dataproc/src/test/java/SubmitJobTest.java @@ -0,0 +1,120 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static junit.framework.TestCase.assertNotNull; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.Cluster; +import com.google.cloud.dataproc.v1.ClusterConfig; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.cloud.dataproc.v1.InstanceGroupConfig; +import com.google.protobuf.Empty; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SubmitJobTest { + + private static final String CLUSTER_NAME = + String.format("java-sj-test--%s", UUID.randomUUID().toString()); + private static final String REGION = "us-central1"; + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String ENDPOINT = String.format("%s-dataproc.googleapis.com:443", REGION); + + private ByteArrayOutputStream bout; + + private static void requireEnv(String varName) { + assertNotNull( + String.format("Environment variable '%s' is required to perform these tests.", varName), + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnv("GOOGLE_APPLICATION_CREDENTIALS"); + requireEnv("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() throws IOException, ExecutionException, InterruptedException { + bout = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bout)); + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + // Configure the settings for our cluster. + InstanceGroupConfig masterConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-2") + .setNumInstances(1) + .build(); + InstanceGroupConfig workerConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-2") + .setNumInstances(2) + .build(); + ClusterConfig clusterConfig = + ClusterConfig.newBuilder() + .setMasterConfig(masterConfig) + .setWorkerConfig(workerConfig) + .build(); + // Create the Dataproc cluster. + Cluster cluster = + Cluster.newBuilder().setClusterName(CLUSTER_NAME).setConfig(clusterConfig).build(); + OperationFuture createClusterAsyncRequest = + clusterControllerClient.createClusterAsync(PROJECT_ID, REGION, cluster); + createClusterAsyncRequest.get(); + } + } + + @Test + public void submitJobTest() throws IOException, InterruptedException { + SubmitJob.submitJob(PROJECT_ID, REGION, CLUSTER_NAME); + String output = bout.toString(); + + assertThat(output, CoreMatchers.containsString("Job finished successfully")); + } + + @After + public void tearDown() throws IOException, InterruptedException, ExecutionException { + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + OperationFuture deleteClusterAsyncRequest = + clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME); + deleteClusterAsyncRequest.get(); + } + } +}