Skip to content

Commit

Permalink
Dataflow client library (#2450)
Browse files Browse the repository at this point in the history
* Updated requirements

* Update service naming convention

* Prefer client libraries over shell commands

* Update README format
  • Loading branch information
davidcavazos authored Oct 8, 2019
1 parent 2b2cef9 commit cbb910e
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 75 deletions.
90 changes: 38 additions & 52 deletions dataflow/run_template/README.md
Original file line number Diff line number Diff line change
@@ -1,55 +1,52 @@
# Run template

[`main.py`](main.py) - Script to run an [Apache Beam] template on [Google Cloud Dataflow].
[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/editor)

The following examples show how to run the [`Word_Count` template], but you can run any other template.
This sample demonstrate how to run an
[Apache Beam](https://beam.apache.org/)
template on [Google Cloud Dataflow](https://cloud.google.com/dataflow/docs/).
For more information, see the
[Running templates](https://cloud.google.com/dataflow/docs/guides/templates/running-templates)
docs page.

For the `Word_Count` template, we require to pass an `output` Cloud Storage path prefix, and optionally we can pass an `inputFile` Cloud Storage file pattern for the inputs.
The following examples show how to run the
[`Word_Count` template](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/WordCount.java),
but you can run any other template.

For the `Word_Count` template, we require to pass an `output` Cloud Storage path prefix,
and optionally we can pass an `inputFile` Cloud Storage file pattern for the inputs.
If `inputFile` is not passed, it will take `gs://apache-beam-samples/shakespeare/kinglear.txt` as default.

## Before you begin

1. Install the [Cloud SDK].

1. [Create a new project].

1. [Enable billing].

1. [Enable the APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_component,storage_api,bigquery,pubsub,datastore.googleapis.com,cloudfunctions.googleapis.com,cloudresourcemanager.googleapis.com): Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Datastore, Cloud Functions, and Cloud Resource Manager.

1. Setup the Cloud SDK to your GCP project.

```bash
gcloud init
```
Follow the
[Getting started with Google Cloud Dataflow](../README.md)
page, and make sure you have a Google Cloud project with billing enabled
and a *service account JSON key* set up in your `GOOGLE_APPLICATION_CREDENTIALS` environment variable.
Additionally, for this sample you need the following:

1. Create a Cloud Storage bucket.

```bash
gsutil mb gs://your-gcs-bucket
```sh
export BUCKET=your-gcs-bucket
gsutil mb gs://$BUCKET
```

## Setup

The following instructions will help you prepare your development environment.

1. [Install Python and virtualenv].

1. Clone the `python-docs-samples` repository.

```bash
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
```
```sh
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
```

1. Navigate to the sample code directory.

```bash
```sh
cd python-docs-samples/dataflow/run_template
```

1. Create a virtual environment and activate it.

```bash
```sh
virtualenv env
source env/bin/activate
```
Expand All @@ -58,18 +55,18 @@ The following instructions will help you prepare your development environment.
1. Install the sample requirements.

```bash
```sh
pip install -U -r requirements.txt
```

## Running locally

To run a Dataflow template from the command line.
* [`main.py`](main.py)
* [REST API dataflow/projects.templates.launch](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.templates/launch)

> NOTE: To run locally, you'll need to [create a service account key] as a JSON file.
> Then export an environment variable called `GOOGLE_APPLICATION_CREDENTIALS` pointing it to your service account file.
To run a Dataflow template from the command line.

```bash
```sh
python main.py \
--project <your-gcp-project> \
--job wordcount-$(date +'%Y%m%d-%H%M%S') \
Expand All @@ -80,10 +77,10 @@ python main.py \

## Running in Python

To run a Dataflow template from Python.
* [`main.py`](main.py)
* [REST API dataflow/projects.templates.launch](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.templates/launch)

> NOTE: To run locally, you'll need to [create a service account key] as a JSON file.
> Then export an environment variable called `GOOGLE_APPLICATION_CREDENTIALS` pointing it to your service account file.
To run a Dataflow template from Python.

```py
import main as run_template
Expand All @@ -101,9 +98,12 @@ run_template.run(

## Running in Cloud Functions

* [`main.py`](main.py)
* [REST API dataflow/projects.templates.launch](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.templates/launch)

To deploy this into a Cloud Function and run a Dataflow template via an HTTP request as a REST API.

```bash
```sh
PROJECT=$(gcloud config get-value project) \
REGION=$(gcloud config get-value functions/region)

Expand All @@ -121,17 +121,3 @@ curl -X POST "https://$REGION-$PROJECT.cloudfunctions.net/run_template" \
-d inputFile=gs://apache-beam-samples/shakespeare/kinglear.txt \
-d output=gs://<your-gcs-bucket>/wordcount/outputs
```

[Apache Beam]: https://beam.apache.org/
[Google Cloud Dataflow]: https://cloud.google.com/dataflow/docs/
[`Word_Count` template]: https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/WordCount.java

[Cloud SDK]: https://cloud.google.com/sdk/docs/
[Create a new project]: https://console.cloud.google.com/projectcreate
[Enable billing]: https://cloud.google.com/billing/docs/how-to/modify-project
[Create a service account key]: https://console.cloud.google.com/apis/credentials/serviceaccountkey
[Creating and managing service accounts]: https://cloud.google.com/iam/docs/creating-managing-service-accounts
[GCP Console IAM page]: https://console.cloud.google.com/iam-admin/iam
[Granting roles to service accounts]: https://cloud.google.com/iam/docs/granting-roles-to-service-accounts

[Install Python and virtualenv]: https://cloud.google.com/python/setup
4 changes: 2 additions & 2 deletions dataflow/run_template/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def run(project, job, template, parameters=None):
# 'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

service = build('dataflow', 'v1b3')
request = service.projects().templates().launch(
dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().templates().launch(
projectId=project,
gcsPath=template,
body={
Expand Down
64 changes: 47 additions & 17 deletions dataflow/run_template/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,62 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# To run the tests:
# nox -s "lint(sample='./dataflow/run_template')"
# nox -s "py27(sample='./dataflow/run_template')"
# nox -s "py36(sample='./dataflow/run_template')"

import flask
import json
import os
import pytest
import subprocess as sp
import time

from datetime import datetime
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from werkzeug.urls import url_encode

import main

PROJECT = os.environ['GCLOUD_PROJECT']
BUCKET = os.environ['CLOUD_STORAGE_BUCKET']

# Wait time until a job can be cancelled, as a best effort.
# If it fails to be cancelled, the job will run for ~8 minutes.
WAIT_TIME = 5 # seconds
dataflow = build('dataflow', 'v1b3')

# Create a fake "app" for generating test request contexts.
@pytest.fixture(scope="module")
def app():
return flask.Flask(__name__)


def test_run_template_empty_args(app):
def test_run_template_python_empty_args(app):
project = PROJECT
job = datetime.now().strftime('test_run_template_python-%Y%m%d-%H%M%S')
template = 'gs://dataflow-templates/latest/Word_Count'
with pytest.raises(HttpError):
main.run(project, job, template)


def test_run_template_python(app):
project = PROJECT
job = datetime.now().strftime('test_run_template_python-%Y%m%d-%H%M%S')
template = 'gs://dataflow-templates/latest/Word_Count'
parameters = {
'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt',
'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET),
}
res = main.run(project, job, template, parameters)
dataflow_jobs_cancel(res['job']['id'])


def test_run_template_http_empty_args(app):
with app.test_request_context():
with pytest.raises(KeyError):
main.run_template(flask.request)


def test_run_template_url(app):
def test_run_template_http_url(app):
args = {
'project': PROJECT,
'job': datetime.now().strftime('test_run_template_url-%Y%m%d-%H%M%S'),
Expand All @@ -54,12 +78,10 @@ def test_run_template_url(app):
with app.test_request_context('/?' + url_encode(args)):
res = main.run_template(flask.request)
data = json.loads(res)
job_id = data['job']['id']
time.sleep(WAIT_TIME)
assert sp.call(['gcloud', 'dataflow', 'jobs', 'cancel', job_id]) == 0
dataflow_jobs_cancel(data['job']['id'])


def test_run_template_data(app):
def test_run_template_http_data(app):
args = {
'project': PROJECT,
'job': datetime.now().strftime('test_run_template_data-%Y%m%d-%H%M%S'),
Expand All @@ -70,12 +92,10 @@ def test_run_template_data(app):
with app.test_request_context(data=args):
res = main.run_template(flask.request)
data = json.loads(res)
job_id = data['job']['id']
time.sleep(WAIT_TIME)
assert sp.call(['gcloud', 'dataflow', 'jobs', 'cancel', job_id]) == 0
dataflow_jobs_cancel(data['job']['id'])


def test_run_template_json(app):
def test_run_template_http_json(app):
args = {
'project': PROJECT,
'job': datetime.now().strftime('test_run_template_json-%Y%m%d-%H%M%S'),
Expand All @@ -86,6 +106,16 @@ def test_run_template_json(app):
with app.test_request_context(json=args):
res = main.run_template(flask.request)
data = json.loads(res)
job_id = data['job']['id']
time.sleep(WAIT_TIME)
assert sp.call(['gcloud', 'dataflow', 'jobs', 'cancel', job_id]) == 0
dataflow_jobs_cancel(data['job']['id'])


def dataflow_jobs_cancel(job_id):
# Wait time until a job can be cancelled, as a best effort.
# If it fails to be cancelled, the job will run for ~8 minutes.
time.sleep(5) # seconds
request = dataflow.projects().jobs().update(
projectId=PROJECT,
jobId=job_id,
body={'requestedState': 'JOB_STATE_CANCELLED'}
)
request.execute()
2 changes: 1 addition & 1 deletion dataflow/run_template/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
google-api-python-client==1.7.9
google-api-python-client==1.7.11
6 changes: 3 additions & 3 deletions testing/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
beautifulsoup4==4.8.0
beautifulsoup4==4.8.1
coverage==4.5.4
flaky==3.6.1
funcsigs==1.0.2
mock==3.0.5
mysql-python==1.2.5; python_version < "3.0"
PyCrypto==2.6.1
pytest-cov==2.7.1
pytest==4.6.5
pytest-cov==2.8.1
pytest==5.2.1
pyyaml==5.1.2
responses==0.10.6
WebTest==2.0.33
Expand Down

0 comments on commit cbb910e

Please sign in to comment.