Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the google_dataflow_flex_template_job resource #3772

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add a correct flex template, implement separate delete method.
  • Loading branch information
rileykarson committed Jul 22, 2020
commit 7d9a8e18f0a53213130913c2bf25130eee0bc43d
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ package google
import (
"fmt"
"log"
"strings"
"time"

"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
"google.golang.org/api/googleapi"

"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"
dataflow "google.golang.org/api/dataflow/v1b3"
Expand All @@ -21,8 +25,8 @@ func resourceDataflowFlexTemplateJob() *schema.Resource {
return &schema.Resource{
Create: resourceDataflowFlexTemplateJobCreate,
Read: resourceDataflowFlexTemplateJobRead,
Update: resourceDataflowFlexTemplateJobUpdateByReplacement,
Delete: resourceDataflowJobDelete,
Update: resourceDataflowFlexTemplateJobUpdate,
Delete: resourceDataflowFlexTemplateJobDelete,
Schema: map[string]*schema.Schema{

"container_spec_gcs_path": {
Expand All @@ -41,14 +45,14 @@ func resourceDataflowFlexTemplateJob() *schema.Resource {
Type: schema.TypeString,
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false),
Optional: true,
Default: "drain",
Default: "cancel",
},

"labels": {
Type: schema.TypeMap,
Optional: true,
DiffSuppressFunc: resourceDataflowJobLabelDiffSuppress,
ForceNew: true,
ForceNew: true,
},

"parameters": {
Expand Down Expand Up @@ -99,10 +103,12 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
Parameters: expandStringMap(d, "parameters"),
},
}

response, err := config.clientDataflow.Projects.Locations.FlexTemplates.Launch(project, region, &request).Do()
if err != nil {
return err
}

job := response.Job
d.SetId(job.Id)
d.Set("job_id", job.Id)
Expand Down Expand Up @@ -141,15 +147,91 @@ func resourceDataflowFlexTemplateJobRead(d *schema.ResourceData, meta interface{
d.SetId("")
return nil
}
d.SetId(job.Id)
d.Set("job_id", job.Id)

return nil
}

// resourceDataflowFlexTemplateJobUpdateByReplacement will be the method for updating Flex-Template jobs
func resourceDataflowFlexTemplateJobUpdateByReplacement(d *schema.ResourceData, meta interface{}) error {
// resourceDataflowFlexTemplateJobUpdate is a blank method to enable updating
// the on_delete virtual field
func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interface{}) error {
return nil
}

func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

id := d.Id()

requestedState, err := resourceDataflowJobMapRequestedState(d.Get("on_delete").(string))
if err != nil {
return err
}

// Retry updating the state while the job is not ready to be canceled/drained.
err = resource.Retry(time.Minute*time.Duration(15), func() *resource.RetryError {
// To terminate a dataflow job, we update the job with a requested
// terminal state.
job := &dataflow.Job{
RequestedState: requestedState,
}

_, updateErr := resourceDataflowJobUpdateJob(config, project, region, id, job)
if updateErr != nil {
gerr, isGoogleErr := updateErr.(*googleapi.Error)
if !isGoogleErr {
// If we have an error and it's not a google-specific error, we should go ahead and return.
return resource.NonRetryableError(updateErr)
}

if strings.Contains(gerr.Message, "not yet ready for canceling") {
// Retry cancelling job if it's not ready.
// Sleep to avoid hitting update quota with repeated attempts.
time.Sleep(5 * time.Second)
return resource.RetryableError(updateErr)
}

if strings.Contains(gerr.Message, "Job has terminated") {
// Job has already been terminated, skip.
return nil
}
}

return nil
})
if err != nil {
return err
}

// Wait for state to reach terminal state (canceled/drained/done)
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
for !ok {
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
time.Sleep(5 * time.Second)

err = resourceDataflowFlexTemplateJobRead(d, meta)
if err != nil {
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
}
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
}

// Only remove the job from state if it's actually successfully canceled.
if _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok {
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
d.SetId("")
return nil
}
return fmt.Errorf("Unable to cancel the dataflow job '%s' - final state was %q.", d.Id(), d.Get("state").(string))
}

<% end -%>
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ func TestAccDataflowFlexTemplateJob_simple(t *testing.T) {
{
Config: testAccDataflowFlowFlexTemplateJob_basic(bucket, job),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(t, "google_dataflow_job.big_data"),
testAccDataflowJobExists(t, "google_dataflow_flex_template_job.big_data"),
),
},
},
})
}

// note: this config creates a job that doesn't actually do anything
func testAccDataflowFlowFlexTemplateJob_basic(bucket, job string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
Expand All @@ -40,45 +41,50 @@ resource "google_storage_bucket" "temp" {

resource "google_storage_bucket_object" "flex_template" {
name = "flex_template.json"
bucket = "%s"
content = "%s"
bucket = google_storage_bucket.temp.name
content = <<EOF
{
"image": "my-image",
"metadata": {
"description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
"name": "Streaming Beam SQL",
"parameters": [
{
"helpText": "Pub/Sub subscription to read from.",
"label": "Pub/Sub input subscription.",
"name": "inputSubscription",
"regexes": [
"[-_.a-zA-Z0-9]+"
]
},
{
"helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
"is_optional": true,
"label": "BigQuery output table",
"name": "outputTable",
"regexes": [
"[^:]+:[^.]+[.].+"
]
}
]
},
"sdkInfo": {
"language": "JAVA"
}
}
EOF
}

resource "google_dataflow_flex_template_job" "big_data" {
name = "%s"
container_spec_gcs_path = "%s"
container_spec_gcs_path = "${google_storage_bucket.temp.url}/${google_storage_bucket_object.flex_template.name}"
on_delete = "cancel"
parameters = {
inputSubscription = "my-subscription"
outputTable = "my-project:my-dataset.my-table"
}
}
`, bucket, bucket, flexTemplateContent(), bucket, job)
`, bucket, job)
}

func flexTemplateContent() string {
return `
<<EOF
{
"name": "Streaming Beam SQL",
"description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
"parameters": [
{
"name": "inputSubscription",
"label": "Pub/Sub input subscription.",
"helpText": "Pub/Sub subscription to read from.",
"regexes": [
"[-_.a-zA-Z0-9]+"
]
},
{
"name": "outputTable",
"label": "BigQuery output table",
"helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
"is_optional": true,
"regexes": [
"[^:]+:[^.]+[.].+"
]
}
]
}
EOF
`
}
<% end -%>
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ description: |-

# google\_dataflow\_flex\_template\_job

Creates a [Flex Template](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates) job on Dataflow, which is an implementation of Apache Beam running on Google Compute Engine. For more information see
the official documentation for
[Beam](https://beam.apache.org) and [Dataflow](https://cloud.google.com/dataflow/).
Creates a [Flex Template](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates)
job on Dataflow, which is an implementation of Apache Beam running on Google
Compute Engine. For more information see the official documentation for [Beam](https://beam.apache.org)
and [Dataflow](https://cloud.google.com/dataflow/).

## Example Usage

Expand All @@ -28,32 +29,55 @@ resource "google_dataflow_flex_template_job" "big_data_job" {

[ To Come ...]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably want to remove this?

## Note on "destroy" / "apply"
There are many types of Dataflow jobs. Some Dataflow jobs run constantly, getting new data from (e.g.) a GCS bucket, and outputting data continuously. Some jobs process a set amount of data then terminate. All jobs can fail while running due to programming errors or other issues. In this way, Dataflow jobs are different from most other Terraform / Google resources.
There are many types of Dataflow jobs. Some Dataflow jobs run constantly,
getting new data from (e.g.) a GCS bucket, and outputting data continuously.
Some jobs process a set amount of data then terminate. All jobs can fail while
running due to programming errors or other issues. In this way, Dataflow jobs
are different from most other Terraform / Google resources.

The Dataflow resource is considered 'existing' while it is in a nonterminal state. If it reaches a terminal state (e.g. 'FAILED', 'COMPLETE', 'CANCELLED'), it will be recreated on the next 'apply'. This is as expected for jobs which run continuously, but may surprise users who use this resource for other kinds of Dataflow jobs.
The Dataflow resource is considered 'existing' while it is in a nonterminal
state. If it reaches a terminal state (e.g. 'FAILED', 'COMPLETE',
'CANCELLED'), it will be recreated on the next 'apply'. This is as expected for
jobs which run continuously, but may surprise users who use this resource for
other kinds of Dataflow jobs.

A Dataflow job which is 'destroyed' may be "cancelled" or "drained". If "cancelled", the job terminates - any data written remains where it is, but no new data will be processed. If "drained", no new data will enter the pipeline, but any data currently in the pipeline will finish being processed. The default is "cancelled", but if a user sets `on_delete` to `"drain"` in the configuration, you may experience a long wait for your `terraform destroy` to complete.
A Dataflow job which is 'destroyed' may be "cancelled" or "drained". If
"cancelled", the job terminates - any data written remains where it is, but no
new data will be processed. If "drained", no new data will enter the pipeline,
but any data currently in the pipeline will finish being processed. The default
is "cancelled", but if a user sets `on_delete` to `"drain"` in the
configuration, you may experience a long wait for your `terraform destroy` to
complete.

## Argument Reference

The following arguments are supported:

* `name` - (Required) A unique name for the resource, required by Dataflow.
* `container_spec_gcs_path` - (Required) The GCS path to the Dataflow job Flex Template.

* `container_spec_gcs_path` - (Required) The GCS path to the Dataflow job Flex
Template.

- - -

* `parameters` - (Optional) Key/Value pairs to be passed to the Dataflow job (as used in the template).
* `labels` - (Optional) User labels to be specified for the job. Keys and values should follow the restrictions
specified in the [labeling restrictions](https://cloud.google.com/compute/docs/labeling-resources#restrictions) page.
**NOTE**: Google-provided Dataflow templates often provide default labels that begin with `goog-dataflow-provided`.
Unless explicitly set in config, these labels will be ignored to prevent diffs on re-apply.
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of deletion during `terraform destroy`. See above note.
* `project` - (Optional) The project in which the resource belongs. If it is not provided, the provider project is used.
* `parameters` - (Optional) Key/Value pairs to be passed to the Dataflow job (as
used in the template).

* `labels` - (Optional) User labels to be specified for the job. Keys and values
should follow the restrictions specified in the [labeling restrictions](https://cloud.google.com/compute/docs/labeling-resources#restrictions)
page. **NOTE**: Google-provided Dataflow templates often provide default labels
that begin with `goog-dataflow-provided`. Unless explicitly set in config, these
labels will be ignored to prevent diffs on re-apply.

* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of
deletion during `terraform destroy`. See above note.

* `project` - (Optional) The project in which the resource belongs. If it is not
provided, the provider project is used.

## Attributes Reference
In addition to the arguments listed above, the following computed attributes are exported:

* `job_id` - The unique ID of this job.
* `type` - The type of this job, selected from the [JobType enum](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobType)

* `state` - The current state of the resource, selected from the [JobState enum](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState)