Skip to content

Commit

Permalink
dataflow_flex_template_job_environment_fields.patch
Browse files Browse the repository at this point in the history
Make the same changes in
hashicorp/terraform-provider-google-beta#6357
to update google_dataflow_flex_template_job to send SdkPipeline parameters
via environment block fields, and to fix the issue that the returned
maxWorkers was always 0.
  • Loading branch information
Shuxian Cai committed Nov 17, 2023
1 parent 2f4c911 commit 5d80869
Showing 1 changed file with 220 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"log"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -111,6 +112,7 @@ func ResourceDataflowFlexTemplateJob() *schema.Resource {
Optional: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
Computed: true,
Description: `The initial number of Google Compute Engine instances for the job.`,
},

Expand All @@ -119,6 +121,7 @@ func ResourceDataflowFlexTemplateJob() *schema.Resource {
Optional: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
Computed: true,
Description: `The maximum number of Google Compute Engine instances to be made available to your pipeline during execution, from 1 to 1000.`,
},

Expand Down Expand Up @@ -146,32 +149,37 @@ func ResourceDataflowFlexTemplateJob() *schema.Resource {
"sdk_container_image": {
Type: schema.TypeString,
Optional: true,
Computed: true,
Description: `Docker registry location of container image to use for the 'worker harness. Default is the container for the version of the SDK. Note this field is only valid for portable pipelines.`,
},

"network": {
Type: schema.TypeString,
Optional: true,
Computed: true,
DiffSuppressFunc: tpgresource.CompareSelfLinkOrResourceName,
Description: `The network to which VMs will be assigned. If it is not provided, "default" will be used.`,
},

"subnetwork": {
Type: schema.TypeString,
Optional: true,
Computed: true,
DiffSuppressFunc: tpgresource.CompareSelfLinkOrResourceName,
Description: `The subnetwork to which VMs will be assigned. Should be of the form "regions/REGION/subnetworks/SUBNETWORK".`,
},

"machine_type": {
Type: schema.TypeString,
Optional: true,
Computed: true,
Description: `The machine type to use for the job.`,
},

"kms_key_name": {
Type: schema.TypeString,
Optional: true,
Computed: true,
Description: `The name for the Cloud KMS key for the job. Key format is: projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY`,
},

Expand Down Expand Up @@ -202,12 +210,14 @@ func ResourceDataflowFlexTemplateJob() *schema.Resource {
"autoscaling_algorithm": {
Type: schema.TypeString,
Optional: true,
Computed: true,
Description: `The algorithm to use for autoscaling`,
},

"launcher_machine_type": {
Type: schema.TypeString,
Optional: true,
Computed: true,
Description: `The machine type to use for launching the job. The default is n1-standard-1.`,
},

Expand Down Expand Up @@ -240,7 +250,7 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
return err
}

env, err := resourceDataflowFlexJobSetupEnv(d, config)
env, updatedParameters, err := resourceDataflowFlexJobSetupEnv(d, config)
if err != nil {
return err
}
Expand All @@ -249,7 +259,7 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
LaunchParameter: &dataflow.LaunchFlexTemplateParameter{
ContainerSpecGcsPath: d.Get("container_spec_gcs_path").(string),
JobName: d.Get("name").(string),
Parameters: tpgresource.ExpandStringMap(d, "parameters"),
Parameters: updatedParameters,
Environment: &env,
},
}
Expand All @@ -275,29 +285,92 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
return resourceDataflowFlexTemplateJobRead(d, meta)
}

func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.FlexTemplateRuntimeEnvironment, error) {
func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.FlexTemplateRuntimeEnvironment, map[string]string, error) {

updatedParameters := tpgresource.ExpandStringMap(d, "parameters")

additionalExperiments := tpgresource.ConvertStringSet(d.Get("additional_experiments").(*schema.Set))

var autoscalingAlgorithm string
autoscalingAlgorithm, updatedParameters = dataflowFlexJobTypeTransferVar("autoscaling_algorithm", "autoscalingAlgorithm", updatedParameters, d)

var numWorkers int
if p, ok := d.GetOk("parameters.numWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.numWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "numWorkers")
numWorkers = number
} else {
if v, ok := d.GetOk("num_workers"); ok {
numWorkers = v.(int)
}
}

var maxNumWorkers int
if p, ok := d.GetOk("parameters.maxNumWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "maxNumWorkers")
maxNumWorkers = number
} else {
if v, ok := d.GetOk("max_workers"); ok {
maxNumWorkers = v.(int)
}
}

network, updatedParameters := dataflowFlexJobTypeTransferVar("network", "network", updatedParameters, d)

serviceAccountEmail, updatedParameters := dataflowFlexJobTypeTransferVar("service_account_email", "serviceAccountEmail", updatedParameters, d)

subnetwork, updatedParameters := dataflowFlexJobTypeTransferVar("subnetwork", "subnetwork", updatedParameters, d)

tempLocation, updatedParameters := dataflowFlexJobTypeTransferVar("temp_location", "tempLocation", updatedParameters, d)

stagingLocation, updatedParameters := dataflowFlexJobTypeTransferVar("staging_location", "stagingLocation", updatedParameters, d)

machineType, updatedParameters := dataflowFlexJobTypeTransferVar("machine_type", "workerMachineType", updatedParameters, d)

kmsKeyName, updatedParameters := dataflowFlexJobTypeTransferVar("kms_key_name", "kmsKeyName", updatedParameters, d)

ipConfiguration, updatedParameters := dataflowFlexJobTypeTransferVar("ip_configuration", "ipConfiguration", updatedParameters, d)

var enableStreamingEngine bool
if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
delete(updatedParameters, "enableStreamingEngine")
enableStreamingEngine = p.(bool)
} else {
if v, ok := d.GetOk("enable_streaming_engine"); ok {
enableStreamingEngine = v.(bool)
}
}

sdkContainerImage, updatedParameters := dataflowFlexJobTypeTransferVar("sdk_container_image", "sdkContainerImage", updatedParameters, d)

launcherMachineType, updatedParameters := dataflowFlexJobTypeTransferVar("launcher_machine_type", "launcherMachineType", updatedParameters, d)

env := dataflow.FlexTemplateRuntimeEnvironment{
AdditionalUserLabels: tpgresource.ExpandStringMap(d, "labels"),
AutoscalingAlgorithm: d.Get("autoscaling_algorithm").(string),
NumWorkers: int64(d.Get("num_workers").(int)),
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_location").(string),
StagingLocation: d.Get("staging_location").(string),
MachineType: d.Get("machine_type").(string),
KmsKeyName: d.Get("kms_key_name").(string),
IpConfiguration: d.Get("ip_configuration").(string),
EnableStreamingEngine: d.Get("enable_streaming_engine").(bool),
AutoscalingAlgorithm: autoscalingAlgorithm,
NumWorkers: int64(numWorkers),
MaxWorkers: int64(maxNumWorkers),
Network: network,
ServiceAccountEmail: serviceAccountEmail,
Subnetwork: subnetwork,
TempLocation: tempLocation,
StagingLocation: stagingLocation,
MachineType: machineType,
KmsKeyName: kmsKeyName,
IpConfiguration: ipConfiguration,
EnableStreamingEngine: enableStreamingEngine,
AdditionalExperiments: additionalExperiments,
SdkContainerImage: d.Get("sdk_container_image").(string),
LauncherMachineType: d.Get("launcher_machine_type").(string),
SdkContainerImage: sdkContainerImage,
LauncherMachineType: launcherMachineType,
}
return env, nil
return env, updatedParameters, nil
}

// resourceDataflowFlexTemplateJobRead reads a Flex Template Job resource.
Expand Down Expand Up @@ -368,7 +441,7 @@ func resourceDataflowFlexTemplateJobRead(d *schema.ResourceData, meta interface{
if err := d.Set("num_workers", optionsMap["numWorkers"]); err != nil {
return fmt.Errorf("Error setting num_workers: %s", err)
}
if err := d.Set("max_workers", optionsMap["maxWorkers"]); err != nil {
if err := d.Set("max_workers", optionsMap["maxNumWorkers"]); err != nil {
return fmt.Errorf("Error setting max_workers: %s", err)
}
if err := d.Set("staging_location", optionsMap["stagingLocation"]); err != nil {
Expand Down Expand Up @@ -453,7 +526,7 @@ func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interfac

tnamemapping := tpgresource.ExpandStringMap(d, "transform_name_mapping")

env, err := resourceDataflowFlexJobSetupEnv(d, config)
env, updatedParameters, err := resourceDataflowFlexJobSetupEnv(d, config)
if err != nil {
return err
}
Expand All @@ -469,7 +542,7 @@ func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interfac

ContainerSpecGcsPath: d.Get("container_spec_gcs_path").(string),
JobName: d.Get("name").(string),
Parameters: tpgresource.ExpandStringMap(d, "parameters"),
Parameters: updatedParameters,
TransformNameMappings: tnamemapping,
Environment: &env,
Update: true,
Expand Down Expand Up @@ -582,6 +655,100 @@ func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interfac
}

func resourceDataflowFlexJobTypeCustomizeDiff(_ context.Context, d *schema.ResourceDiff, meta interface{}) error {

err := dataflowFlexJobTypeParameterOverride("autoscaling_algorithm", "autoscalingAlgorithm", d)
if err != nil {
return err
}

if p, ok := d.GetOk("parameters.numWorkers"); ok {
if d.HasChange("num_workers") {
e := d.Get("num_workers")
return fmt.Errorf("Error setting num_workers, value is supplied twice: num_workers=%d, parameters.numWorkers=%d", e.(int), p.(int))
} else {
p := d.Get("parameters.numWorkers")
number, err := strconv.Atoi(p.(string))
if err != nil {
return fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
d.SetNew("num_workers", number)
}
}

if p, ok := d.GetOk("parameters.maxNumWorkers"); ok {
if d.HasChange("max_workers") {
e := d.Get("max_workers")
return fmt.Errorf("Error setting max_workers, value is supplied twice: max_workers=%d, parameters.maxNumWorkers=%d", e.(int), p.(int))
} else {
p := d.Get("parameters.maxNumWorkers")
number, err := strconv.Atoi(p.(string))
if err != nil {
return fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
d.SetNew("max_workers", number)
}
}

err = dataflowFlexJobTypeParameterOverride("network", "network", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("service_account_email", "serviceAccountEmail", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("subnetwork", "subnetwork", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("temp_location", "tempLocation", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("staging_location", "stagingLocation", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("machine_type", "workerMachineType", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("kms_key_name", "kmsKeyName", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("ip_configuration", "ipConfiguration", d)
if err != nil {
return err
}

if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
if d.HasChange("enable_streaming_engine") {
e := d.Get("enable_streaming_engine")
return fmt.Errorf("Error setting enable_streaming_engine, value is supplied twice: enable_streaming_engine=%t, parameters.enableStreamingEngine=%t", e.(bool), p.(bool))
} else {
p := d.Get("parameters.enableStreamingEngine")
d.SetNew("enable_streaming_engine", p.(string))
}
}

err = dataflowFlexJobTypeParameterOverride("sdk_container_image", "sdkContainerImage", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("launcher_machine_type", "launcherMachineType", d)
if err != nil {
return err
}

// All non-virtual fields are ForceNew for batch jobs
if d.Get("type") == "JOB_TYPE_BATCH" {
resourceSchema := ResourceDataflowFlexTemplateJob().Schema
Expand All @@ -604,3 +771,35 @@ func resourceDataflowFlexJobTypeCustomizeDiff(_ context.Context, d *schema.Resou

return nil
}

func dataflowFlexJobTypeTransferVar(ename, pname string, updatedParameters map[string]string, d *schema.ResourceData) (string, map[string]string) {

pstring := fmt.Sprintf("parameters.%s", pname)

if p, ok := d.GetOk(pstring); ok {
delete(updatedParameters, pname)
return p.(string), updatedParameters
} else {
if v, ok := d.GetOk(ename); ok {
return v.(string), updatedParameters
} else {
return "", updatedParameters
}
}
}

func dataflowFlexJobTypeParameterOverride(ename, pname string, d *schema.ResourceDiff) error {

pstring := fmt.Sprintf("parameters.%s", pname)

if p, ok := d.GetOk(pstring); ok {
if d.HasChange(ename) {
e := d.Get(ename)
return fmt.Errorf("Error setting %s, value is supplied twice: %s=\"%s\", %s=\"%s\"", ename, ename, e.(string), pstring, p.(string))
} else {
p := d.Get(pstring)
d.SetNew(ename, p.(string))
}
}
return nil
}

0 comments on commit 5d80869

Please sign in to comment.