Skip to content

Commit

Permalink
[Dataflow] Add transformation sample (#1131)
Browse files Browse the repository at this point in the history
* Add transformation example code

* Update README.md to reflect java-docs-samples

* Add sample to parent pom.xml

* minor edits

* adding license

* Update copyright header in log4j

* Fix issue with README command and fixed typo and changed compression

* Update example files to follow schema correctly
  • Loading branch information
frankyn authored Jun 15, 2018
1 parent b683925 commit 466f558
Show file tree
Hide file tree
Showing 12 changed files with 866 additions and 0 deletions.
125 changes: 125 additions & 0 deletions dataflow/transforms/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Data Format Transformations using Cloud Dataflow and Apache Beam

Utility transforms to transform from one file format to another for a large number of files using
[Apache Beam][apache_beam] running on [Google Cloud Dataflow][dataflow].

The transformations supported by this utility are:
- CSV to Avro
- Avro to CSV

## Setup

Setup instructions assume you have an active Google Cloud Project and with an associated billing account.
The following instructions will help you prepare your development environment.

1. Install [Cloud SDK][cloud_sdk].
1. Setup Cloud SDK

gcloud init


1. Select your Google Cloud Project if not already selected

gcloud config set project [PROJECT_ID]

1. Clone repository

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

1. Navigate to the sample code directory

cd dataflow/transforms

## Grant required permissions

The examples are configured for Cloud Dataflow which run on Google Compute Engine.
The Compute Engine default service account requires the permissions
`storage.objects.create`, `storage.objects.get`, and `storage.objects.create` to read and write
objects in your Google Cloud Storage bucket IAM policy.

Learn more about [Cloud Storage IAM Roles][storage_iam_roles] and [Bucket-level IAM][bucket_iam].

The following steps are optional if:

* If the project you use to run these Dataflow transformations also own the buckets used to read/write objects.
* If the bucket you're reading data from is public, e.g., allUsers are granted `roles/storage.objectViewer` viewer.

1. Get the Compute Engine default service account using the following gcloud command:

gcloud compute project-info describe

The default service can be found next to `defaultServiceAccount:` in response after running the command.

1. Grant the `roles/storage.objectViewer` role to the bucket to get and list objects from a Dataflow job:

gsutil --debug iam ch serviceAccount:[COMPUTE_DEFAULT_SERVICE_ACCOUNT]:objectViewer gs://[BUCKET_NAME]

* Replace `[COMPUTE_DEFAULT_SERVICE_ACCOUNT]` with the Compute Engine default service account.
* Replace `[BUCKET_NAME]` with the bucket you use to read your input data.

1. Grant the `roles/storage.objectCreator` role to the bucket to create objects on output from a Dataflow job:

gsutil --debug iam ch serviceAccount:[COMPUTE_DEFAULT_SERVICE_ACCOUNT]:objectCreator gs://[BUCKET_NAME]

* Replace `[COMPUTE_DEFAULT_SERVICE_ACCOUNT]` with the Compute Engine default service account.
* Replace `[BUCKET_NAME]` with the bucket you use to read your input data.

1. If the bucket contains both input and output data, grant the `roles/storage.objectAdmin` role to the default service
account using the gsutil:

gsutil --debug iam ch serviceAccount:[COMPUTE_DEFAULT_SERVICE_ACCOUNT]:objectAdmin gs://[BUCKET_NAME]

* Replace `[COMPUTE_DEFAULT_SERVICE_ACCOUNT]` with the Compute Engine default service account.
* Replace `[BUCKET_NAME]` with the bucket you use to read and write your input and output data respectively.


## Using transformations

### Avro to CSV transformation

To transform Avro formatted files to Csv use the following command:

```bash
# Example

mvn compile exec:java -Dexec.mainClass=com.example.AvroToCsv \
-Dexec.args="--avroSchema=gs://bucket/schema.avsc --inputFile=gs://bucket/*.avro --output=gs://bucket/output --runner=Dataflow"
```

Full description of options can be found by using the following command:

```bash
mvn compile exec:java -Dexec.mainClass=org.solution.example.AvroToCsv -Dexec.args="--help=org.solution.example.SampleOptions"
```

### CSV to Avro transformation

To transform CSV formatted files without a header to Avro use the following command:

```bash
# Example

mvn compile exec:java -Dexec.mainClass=com.example.CsvToAvro \
-Dexec.args="--avroSchema=gs://bucket/schema.avsc --inputFile=gs://bucket/*.csv --output=gs://bucket/output --runner=Dataflow"
```

Full description of options can be found by using the following command:

```bash
mvn compile exec:java -Dexec.mainClass=org.solution.example.CsvToAvro -Dexec.args="--help=org.solution.example.SampleOptions"
```

Existing example does not support headers in a CSV files.

## Run Tests

Tests can be run locally using the DirectRunner.


mvn verify

[storage_iam_roles]: https://cloud.google.com/storage/docs/access-control/iam-roles
[bucket_iam]: https://cloud.google.com/storage/docs/access-control/iam
[cloud_sdk]: https://cloud.google.com/sdk/docs/
[dataflow]: https://cloud.google.com/dataflow/docs/
[apache_beam]: https://beam.apache.org/
Binary file added dataflow/transforms/example_files/input.avro
Binary file not shown.
2 changes: 2 additions & 0 deletions dataflow/transforms/example_files/input.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
frank,natividad,10
Karthi,thyagarajan,10
14 changes: 14 additions & 0 deletions dataflow/transforms/example_files/user.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"type": "record",
"name": "User",
"fields": [{
"name": "first_name",
"type": "string"
}, {
"name": "last_name",
"type": "string"
}, {
"name": "age",
"type": "int"
}]
}
188 changes: 188 additions & 0 deletions dataflow/transforms/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2018 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.example</groupId>
<artifactId>format-transforms</artifactId>
<version>1.0-SNAPSHOT</version>

<packaging>jar</packaging>

<parent>
<groupId>com.google.cloud.samples</groupId>
<artifactId>shared-configuration</artifactId>
<version>1.0.9</version>
</parent>

<properties>
<beam.version>2.4.0</beam.version>

<google-clients.version>1.22.0</google-clients.version>
<hamcrest.version>1.3</hamcrest.version>
<junit.version>4.12</junit.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
<maven-shade-plugin.version>3.0.0</maven-shade-plugin.version>
<slf4j.version>1.7.25</slf4j.version>
<surefire-plugin.version>2.20</surefire-plugin.version>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<parallel>all</parallel>
<threadCount>4</threadCount>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${surefire-plugin.version}</version>
</dependency>
</dependencies>
</plugin>

<!-- Ensure that the Maven jar plugin runs before the Maven
shade plugin by listing the plugin higher within the file. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
</plugin>

<!--
Configures `mvn package` to produce a bundled jar ("fat jar") for runners
that require this for job submission to a cluster.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
<version>${beam.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
</dependency>

<!-- Adds a dependency on the Beam SDK. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>

<!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>

<!-- The DirectRunner is needed for unit tests. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.18.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>${hamcrest.version}</version>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 466f558

Please sign in to comment.