Skip to content

Commit

Permalink
Add the google_dataflow_flex_template_job resource (#3772)
Browse files Browse the repository at this point in the history
Co-authored-by: Cameron Thornton <[email protected]>
Co-authored-by: eric-hole <[email protected]>
  • Loading branch information
3 people authored Jul 24, 2020
1 parent 746c913 commit 9d02849
Show file tree
Hide file tree
Showing 4 changed files with 411 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
<% autogen_exception -%>
package google
<% unless version == 'ga' -%>

import (
"fmt"
"log"
"strings"
"time"

"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
"google.golang.org/api/googleapi"

"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"
dataflow "google.golang.org/api/dataflow/v1b3"
)

// NOTE: resource_dataflow_flex_template currently does not support updating existing jobs.
// Changing any non-computed field will result in the job being deleted (according to its
// on_delete policy) and recreated with the updated parameters.

// resourceDataflowFlexTemplateJob defines the schema for Dataflow FlexTemplate jobs.
func resourceDataflowFlexTemplateJob() *schema.Resource {
return &schema.Resource{
Create: resourceDataflowFlexTemplateJobCreate,
Read: resourceDataflowFlexTemplateJobRead,
Update: resourceDataflowFlexTemplateJobUpdate,
Delete: resourceDataflowFlexTemplateJobDelete,
Schema: map[string]*schema.Schema{

"container_spec_gcs_path": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"on_delete": {
Type: schema.TypeString,
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false),
Optional: true,
Default: "cancel",
},

"labels": {
Type: schema.TypeMap,
Optional: true,
DiffSuppressFunc: resourceDataflowJobLabelDiffSuppress,
ForceNew: true,
},

"parameters": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},

"project": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ForceNew: true,
},

"job_id": {
Type: schema.TypeString,
Computed: true,
},

"state": {
Type: schema.TypeString,
Computed: true,
},
},
}
}

// resourceDataflowFlexTemplateJobCreate creates a Flex Template Job from TF code.
func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

request := dataflow.LaunchFlexTemplateRequest{
LaunchParameter: &dataflow.LaunchFlexTemplateParameter{
ContainerSpecGcsPath: d.Get("container_spec_gcs_path").(string),
JobName: d.Get("name").(string),
Parameters: expandStringMap(d, "parameters"),
},
}

response, err := config.clientDataflow.Projects.Locations.FlexTemplates.Launch(project, region, &request).Do()
if err != nil {
return err
}

job := response.Job
d.SetId(job.Id)
d.Set("job_id", job.Id)

return resourceDataflowFlexTemplateJobRead(d, meta)
}

// resourceDataflowFlexTemplateJobRead reads a Flex Template Job resource.
func resourceDataflowFlexTemplateJobRead(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

jobId := d.Id()

job, err := resourceDataflowJobGetJob(config, project, region, jobId)
if err != nil {
return handleNotFoundError(err, d, fmt.Sprintf("Dataflow job %s", jobId))
}

d.Set("state", job.CurrentState)
d.Set("name", job.Name)
d.Set("project", project)
d.Set("labels", job.Labels)

if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
d.SetId("")
return nil
}

return nil
}

// resourceDataflowFlexTemplateJobUpdate is a blank method to enable updating
// the on_delete virtual field
func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interface{}) error {
return nil
}

func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

id := d.Id()

requestedState, err := resourceDataflowJobMapRequestedState(d.Get("on_delete").(string))
if err != nil {
return err
}

// Retry updating the state while the job is not ready to be canceled/drained.
err = resource.Retry(time.Minute*time.Duration(15), func() *resource.RetryError {
// To terminate a dataflow job, we update the job with a requested
// terminal state.
job := &dataflow.Job{
RequestedState: requestedState,
}

_, updateErr := resourceDataflowJobUpdateJob(config, project, region, id, job)
if updateErr != nil {
gerr, isGoogleErr := updateErr.(*googleapi.Error)
if !isGoogleErr {
// If we have an error and it's not a google-specific error, we should go ahead and return.
return resource.NonRetryableError(updateErr)
}

if strings.Contains(gerr.Message, "not yet ready for canceling") {
// Retry cancelling job if it's not ready.
// Sleep to avoid hitting update quota with repeated attempts.
time.Sleep(5 * time.Second)
return resource.RetryableError(updateErr)
}

if strings.Contains(gerr.Message, "Job has terminated") {
// Job has already been terminated, skip.
return nil
}
}

return nil
})
if err != nil {
return err
}

// Wait for state to reach terminal state (canceled/drained/done)
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
for !ok {
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
time.Sleep(5 * time.Second)

err = resourceDataflowFlexTemplateJobRead(d, meta)
if err != nil {
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
}
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
}

// Only remove the job from state if it's actually successfully canceled.
if _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok {
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
d.SetId("")
return nil
}
return fmt.Errorf("Unable to cancel the dataflow job '%s' - final state was %q.", d.Id(), d.Get("state").(string))
}

<% end -%>
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<% autogen_exception -%>
package google
<% unless version == 'ga' -%>

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
)

func TestAccDataflowFlexTemplateJob_basic(t *testing.T) {
t.Parallel()

randStr := randString(t, 10)
bucket := "tf-test-dataflow-gcs-" + randStr
job := "tf-test-dataflow-job-" + randStr

vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowFlowFlexTemplateJob_basic(bucket, job),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(t, "google_dataflow_flex_template_job.big_data"),
),
},
},
})
}

// note: this config creates a job that doesn't actually do anything
func testAccDataflowFlowFlexTemplateJob_basic(bucket, job string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "%s"
force_destroy = true
}

resource "google_storage_bucket_object" "flex_template" {
name = "flex_template.json"
bucket = google_storage_bucket.temp.name
content = <<EOF
{
"image": "my-image",
"metadata": {
"description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
"name": "Streaming Beam SQL",
"parameters": [
{
"helpText": "Pub/Sub subscription to read from.",
"label": "Pub/Sub input subscription.",
"name": "inputSubscription",
"regexes": [
"[-_.a-zA-Z0-9]+"
]
},
{
"helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
"is_optional": true,
"label": "BigQuery output table",
"name": "outputTable",
"regexes": [
"[^:]+:[^.]+[.].+"
]
}
]
},
"sdkInfo": {
"language": "JAVA"
}
}
EOF
}

resource "google_dataflow_flex_template_job" "big_data" {
name = "%s"
container_spec_gcs_path = "${google_storage_bucket.temp.url}/${google_storage_bucket_object.flex_template.name}"
on_delete = "cancel"
parameters = {
inputSubscription = "my-subscription"
outputTable = "my-project:my-dataset.my-table"
}
}
`, bucket, job)
}

<% end -%>
3 changes: 3 additions & 0 deletions third_party/terraform/utils/provider.go.erb
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ end # products.each do
"google_container_node_pool": resourceContainerNodePool(),
"google_container_registry": resourceContainerRegistry(),
"google_dataflow_job": resourceDataflowJob(),
<% unless version == 'ga' -%>
"google_dataflow_flex_template_job": resourceDataflowFlexTemplateJob(),
<% end -%>
"google_dataproc_cluster": resourceDataprocCluster(),
"google_dataproc_cluster_iam_binding": ResourceIamBinding(IamDataprocClusterSchema, NewDataprocClusterUpdater, DataprocClusterIdParseFunc),
"google_dataproc_cluster_iam_member": ResourceIamMember(IamDataprocClusterSchema, NewDataprocClusterUpdater, DataprocClusterIdParseFunc),
Expand Down
Loading

0 comments on commit 9d02849

Please sign in to comment.