-
Notifications
You must be signed in to change notification settings - Fork 9.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EMR Cluster Import Support #6498
Changes from 14 commits
4fc9827
8c54895
1637bb3
6cbb601
2c1c04e
015ae93
d5760ad
ff3c579
2d2bb36
b359cd4
9e50594
063db7f
4387c3a
c833636
2fa32bd
adffd0d
a381f6b
d3c6e20
c2c9d2a
c352a2d
566e5b1
c0af366
609bb1e
8e76a7f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,6 +1,7 @@ | ||||||
package aws | ||||||
|
||||||
import ( | ||||||
"bytes" | ||||||
"log" | ||||||
|
||||||
"encoding/json" | ||||||
|
@@ -14,6 +15,7 @@ import ( | |||||
"github.com/aws/aws-sdk-go/aws/awserr" | ||||||
"github.com/aws/aws-sdk-go/private/protocol/json/jsonutil" | ||||||
"github.com/aws/aws-sdk-go/service/emr" | ||||||
"github.com/hashicorp/terraform/helper/hashcode" | ||||||
"github.com/hashicorp/terraform/helper/resource" | ||||||
"github.com/hashicorp/terraform/helper/schema" | ||||||
"github.com/hashicorp/terraform/helper/structure" | ||||||
|
@@ -26,6 +28,10 @@ func resourceAwsEMRCluster() *schema.Resource { | |||||
Read: resourceAwsEMRClusterRead, | ||||||
Update: resourceAwsEMRClusterUpdate, | ||||||
Delete: resourceAwsEMRClusterDelete, | ||||||
Importer: &schema.ResourceImporter{ | ||||||
State: schema.ImportStatePassthrough, | ||||||
}, | ||||||
|
||||||
Schema: map[string]*schema.Schema{ | ||||||
"name": { | ||||||
Type: schema.TypeString, | ||||||
|
@@ -39,9 +45,9 @@ func resourceAwsEMRCluster() *schema.Resource { | |||||
}, | ||||||
"master_instance_type": { | ||||||
Type: schema.TypeString, | ||||||
Required: false, | ||||||
Optional: true, | ||||||
ForceNew: true, | ||||||
Computed: true, | ||||||
}, | ||||||
"additional_info": { | ||||||
Type: schema.TypeString, | ||||||
|
@@ -61,8 +67,10 @@ func resourceAwsEMRCluster() *schema.Resource { | |||||
Computed: true, | ||||||
}, | ||||||
"core_instance_count": { | ||||||
Type: schema.TypeInt, | ||||||
Optional: true, | ||||||
Type: schema.TypeInt, | ||||||
Optional: true, | ||||||
ValidateFunc: validation.IntAtLeast(1), | ||||||
Computed: true, | ||||||
}, | ||||||
"cluster_state": { | ||||||
Type: schema.TypeString, | ||||||
|
@@ -194,6 +202,7 @@ func resourceAwsEMRCluster() *schema.Resource { | |||||
Type: schema.TypeSet, | ||||||
Optional: true, | ||||||
ForceNew: true, | ||||||
Computed: true, | ||||||
Elem: &schema.Resource{ | ||||||
Schema: map[string]*schema.Schema{ | ||||||
"bid_price": { | ||||||
|
@@ -202,14 +211,16 @@ func resourceAwsEMRCluster() *schema.Resource { | |||||
Required: false, | ||||||
}, | ||||||
"ebs_config": { | ||||||
Type: schema.TypeSet, | ||||||
Type: schema.TypeList, | ||||||
Optional: true, | ||||||
ForceNew: true, | ||||||
Computed: true, | ||||||
Elem: &schema.Resource{ | ||||||
Schema: map[string]*schema.Schema{ | ||||||
"iops": { | ||||||
Type: schema.TypeInt, | ||||||
Optional: true, | ||||||
Computed: true, | ||||||
}, | ||||||
"size": { | ||||||
Type: schema.TypeInt, | ||||||
|
@@ -238,6 +249,7 @@ func resourceAwsEMRCluster() *schema.Resource { | |||||
Optional: true, | ||||||
DiffSuppressFunc: suppressEquivalentJsonDiffs, | ||||||
ValidateFunc: validation.ValidateJsonString, | ||||||
Computed: true, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm trying not to introduce too much further scope creep into this pull request, but can you please add acceptance testing that attempts to update I'm also not opposed to updating the resource documentation to discourage the usage of |
||||||
StateFunc: func(v interface{}) string { | ||||||
jsonString, _ := structure.NormalizeJsonString(v) | ||||||
return jsonString | ||||||
|
@@ -264,6 +276,7 @@ func resourceAwsEMRCluster() *schema.Resource { | |||||
}, | ||||||
}, | ||||||
}, | ||||||
Set: resourceAwsEMRClusterHash, | ||||||
}, | ||||||
"bootstrap_action": { | ||||||
Type: schema.TypeSet, | ||||||
|
@@ -430,14 +443,31 @@ func resourceAwsEMRClusterCreate(d *schema.ResourceData, meta interface{}) error | |||||
} | ||||||
|
||||||
if v, ok := d.GetOk("master_instance_type"); ok { | ||||||
instanceConfig.MasterInstanceType = aws.String(v.(string)) | ||||||
instanceConfig.SlaveInstanceType = aws.String(v.(string)) | ||||||
masterInstanceGroupConfig := &emr.InstanceGroupConfig{ | ||||||
InstanceRole: aws.String("MASTER"), | ||||||
InstanceType: aws.String(v.(string)), | ||||||
InstanceCount: aws.Int64(1), | ||||||
} | ||||||
instanceConfig.InstanceGroups = append(instanceConfig.InstanceGroups, masterInstanceGroupConfig) | ||||||
} | ||||||
|
||||||
var coreInstanceType string | ||||||
if v, ok := d.GetOk("core_instance_type"); ok { | ||||||
instanceConfig.SlaveInstanceType = aws.String(v.(string)) | ||||||
coreInstanceType = v.(string) | ||||||
} | ||||||
var coreInstanceCount int64 | ||||||
if v, ok := d.GetOk("core_instance_count"); ok { | ||||||
instanceConfig.InstanceCount = aws.Int64(int64(v.(int))) | ||||||
coreInstanceCount = int64(v.(int)) | ||||||
} | ||||||
if (coreInstanceCount == 0 && coreInstanceType != "") || (coreInstanceCount > 0 && coreInstanceType == "") { | ||||||
return fmt.Errorf("Must specify both `core_instance_count` and `core_instance_type`") | ||||||
} else if coreInstanceCount > 0 && coreInstanceType != "" { | ||||||
coreInstanceGroupConfig := &emr.InstanceGroupConfig{ | ||||||
InstanceCount: aws.Int64(int64(d.Get("core_instance_count").(int))), | ||||||
InstanceRole: aws.String("CORE"), | ||||||
InstanceType: aws.String(d.Get("core_instance_type").(string)), | ||||||
} | ||||||
instanceConfig.InstanceGroups = append(instanceConfig.InstanceGroups, coreInstanceGroupConfig) | ||||||
} | ||||||
|
||||||
var instanceProfile string | ||||||
|
@@ -451,9 +481,6 @@ func resourceAwsEMRClusterCreate(d *schema.ResourceData, meta interface{}) error | |||||
if v, ok := attributes["subnet_id"]; ok { | ||||||
instanceConfig.Ec2SubnetId = aws.String(v.(string)) | ||||||
} | ||||||
if v, ok := attributes["subnet_id"]; ok { | ||||||
instanceConfig.Ec2SubnetId = aws.String(v.(string)) | ||||||
} | ||||||
|
||||||
if v, ok := attributes["additional_master_security_groups"]; ok { | ||||||
strSlice := strings.Split(v.(string), ",") | ||||||
|
@@ -601,6 +628,8 @@ func resourceAwsEMRClusterCreate(d *schema.ResourceData, meta interface{}) error | |||||
} | ||||||
|
||||||
d.SetId(*resp.JobFlowId) | ||||||
// This value can only be obtained through a deprecated function | ||||||
d.Set("keep_job_flow_alive_when_no_steps", params.Instances.KeepJobFlowAliveWhenNoSteps) | ||||||
|
||||||
log.Println("[INFO] Waiting for EMR Cluster to be available") | ||||||
|
||||||
|
@@ -664,9 +693,18 @@ func resourceAwsEMRClusterRead(d *schema.ResourceData, meta interface{}) error { | |||||
coreGroup := emrCoreInstanceGroup(instanceGroups) | ||||||
if coreGroup != nil { | ||||||
d.Set("core_instance_type", coreGroup.InstanceType) | ||||||
d.Set("core_instance_count", coreGroup.RequestedInstanceCount) | ||||||
} | ||||||
if err := d.Set("instance_group", flattenInstanceGroups(instanceGroups)); err != nil { | ||||||
log.Printf("[ERR] Error setting EMR instance groups: %s", err) | ||||||
masterGroup := findMasterGroup(instanceGroups) | ||||||
if masterGroup != nil { | ||||||
d.Set("master_instance_type", masterGroup.InstanceType) | ||||||
} | ||||||
flattenedInstanceGroups, err := flattenInstanceGroups(instanceGroups) | ||||||
if err != nil { | ||||||
return fmt.Errorf("error flattening instance groups: %+v", err) | ||||||
} | ||||||
if err := d.Set("instance_group", flattenedInstanceGroups); err != nil { | ||||||
return fmt.Errorf("[ERR] Error setting EMR instance groups: %s", err) | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -682,19 +720,14 @@ func resourceAwsEMRClusterRead(d *schema.ResourceData, meta interface{}) error { | |||||
d.Set("tags", tagsToMapEMR(cluster.Tags)) | ||||||
d.Set("ebs_root_volume_size", cluster.EbsRootVolumeSize) | ||||||
d.Set("scale_down_behavior", cluster.ScaleDownBehavior) | ||||||
d.Set("termination_protection", cluster.TerminationProtected) | ||||||
|
||||||
if cluster.CustomAmiId != nil { | ||||||
d.Set("custom_ami_id", cluster.CustomAmiId) | ||||||
} | ||||||
|
||||||
if err := d.Set("applications", flattenApplications(cluster.Applications)); err != nil { | ||||||
log.Printf("[ERR] Error setting EMR Applications for cluster (%s): %s", d.Id(), err) | ||||||
} | ||||||
|
||||||
// Configurations is a JSON document. It's built with an expand method but a | ||||||
// simple string should be returned as JSON | ||||||
if err := d.Set("configurations", cluster.Configurations); err != nil { | ||||||
log.Printf("[ERR] Error setting EMR configurations for cluster (%s): %s", d.Id(), err) | ||||||
return fmt.Errorf("error setting EMR Applications for cluster (%s): %s", d.Id(), err) | ||||||
} | ||||||
|
||||||
if _, ok := d.GetOk("configurations_json"); ok { | ||||||
|
@@ -708,7 +741,7 @@ func resourceAwsEMRClusterRead(d *schema.ResourceData, meta interface{}) error { | |||||
} | ||||||
|
||||||
if err := d.Set("ec2_attributes", flattenEc2Attributes(cluster.Ec2InstanceAttributes)); err != nil { | ||||||
log.Printf("[ERR] Error setting EMR Ec2 Attributes: %s", err) | ||||||
return fmt.Errorf("error setting EMR Ec2 Attributes: %s", err) | ||||||
} | ||||||
|
||||||
if err := d.Set("kerberos_attributes", flattenEmrKerberosAttributes(d, cluster.KerberosAttributes)); err != nil { | ||||||
|
@@ -719,11 +752,11 @@ func resourceAwsEMRClusterRead(d *schema.ResourceData, meta interface{}) error { | |||||
ClusterId: cluster.Id, | ||||||
}) | ||||||
if err != nil { | ||||||
log.Printf("[WARN] Error listing bootstrap actions: %s", err) | ||||||
return fmt.Errorf("error listing bootstrap actions: %s", err) | ||||||
} | ||||||
|
||||||
if err := d.Set("bootstrap_action", flattenBootstrapArguments(respBootstraps.BootstrapActions)); err != nil { | ||||||
log.Printf("[WARN] Error setting Bootstrap Actions: %s", err) | ||||||
return fmt.Errorf("error setting Bootstrap Actions: %s", err) | ||||||
} | ||||||
|
||||||
var stepSummaries []*emr.StepSummary | ||||||
|
@@ -901,7 +934,7 @@ func resourceAwsEMRClusterDelete(d *schema.ResourceData, meta interface{}) error | |||||
}) | ||||||
|
||||||
if err != nil { | ||||||
log.Printf("[ERR] Error waiting for EMR Cluster (%s) Instances to drain", d.Id()) | ||||||
return fmt.Errorf("error waiting for EMR Cluster (%s) Instances to drain", d.Id()) | ||||||
} | ||||||
|
||||||
return nil | ||||||
|
@@ -1043,7 +1076,7 @@ func flattenEmrStepSummary(stepSummary *emr.StepSummary) map[string]interface{} | |||||
return m | ||||||
} | ||||||
|
||||||
func flattenInstanceGroups(igs []*emr.InstanceGroup) []map[string]interface{} { | ||||||
func flattenInstanceGroups(igs []*emr.InstanceGroup) ([]map[string]interface{}, error) { | ||||||
result := make([]map[string]interface{}, 0) | ||||||
|
||||||
for _, ig := range igs { | ||||||
|
@@ -1053,36 +1086,49 @@ func flattenInstanceGroups(igs []*emr.InstanceGroup) []map[string]interface{} { | |||||
} else { | ||||||
attrs["bid_price"] = "" | ||||||
} | ||||||
ebsConfig := make([]map[string]interface{}, 0) | ||||||
for _, ebs := range ig.EbsBlockDevices { | ||||||
ebsAttrs := make(map[string]interface{}) | ||||||
if ebs.VolumeSpecification.Iops != nil { | ||||||
ebsAttrs["iops"] = *ebs.VolumeSpecification.Iops | ||||||
} else { | ||||||
ebsAttrs["iops"] = "" | ||||||
} | ||||||
ebsAttrs["size"] = *ebs.VolumeSpecification.SizeInGB | ||||||
ebsAttrs["type"] = *ebs.VolumeSpecification.VolumeType | ||||||
ebsAttrs["volumes_per_instance"] = 1 | ||||||
|
||||||
ebsConfig = append(ebsConfig, ebsAttrs) | ||||||
if ig.EbsBlockDevices != nil { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
ebsConfig := make([]map[string]interface{}, 0) | ||||||
for _, ebs := range ig.EbsBlockDevices { | ||||||
ebsAttrs := make(map[string]interface{}) | ||||||
if ebs.VolumeSpecification.Iops != nil { | ||||||
ebsAttrs["iops"] = *ebs.VolumeSpecification.Iops | ||||||
} | ||||||
if ebs.VolumeSpecification.SizeInGB != nil { | ||||||
ebsAttrs["size"] = *ebs.VolumeSpecification.SizeInGB | ||||||
} | ||||||
if ebs.VolumeSpecification.VolumeType != nil { | ||||||
ebsAttrs["type"] = *ebs.VolumeSpecification.VolumeType | ||||||
} | ||||||
ebsAttrs["volumes_per_instance"] = 1 | ||||||
|
||||||
ebsConfig = append(ebsConfig, ebsAttrs) | ||||||
} | ||||||
attrs["ebs_config"] = ebsConfig | ||||||
} | ||||||
attrs["ebs_config"] = ebsConfig | ||||||
|
||||||
attrs["instance_count"] = *ig.RequestedInstanceCount | ||||||
attrs["instance_role"] = *ig.InstanceGroupType | ||||||
attrs["instance_type"] = *ig.InstanceType | ||||||
|
||||||
if ig.AutoScalingPolicy != nil { | ||||||
attrs["autoscaling_policy"] = *ig.AutoScalingPolicy | ||||||
autoscalingPolicyBytes, err := json.Marshal(ig.AutoScalingPolicy) | ||||||
autoscalingPolicyString := string(autoscalingPolicyBytes) | ||||||
if err != nil { | ||||||
return nil, err | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: We should probably provide some context about this error here:
Suggested change
|
||||||
} | ||||||
attrs["autoscaling_policy"] = autoscalingPolicyString | ||||||
} else { | ||||||
attrs["autoscaling_policy"] = "" | ||||||
} | ||||||
|
||||||
attrs["name"] = *ig.Name | ||||||
if attrs["name"] != nil { | ||||||
attrs["name"] = *ig.Name | ||||||
} | ||||||
result = append(result, attrs) | ||||||
} | ||||||
|
||||||
return result | ||||||
return result, nil | ||||||
} | ||||||
|
||||||
func flattenBootstrapArguments(actions []*emr.Command) []map[string]interface{} { | ||||||
|
@@ -1337,7 +1383,7 @@ func applyEbsConfig(configAttributes map[string]interface{}, config *emr.Instanc | |||||
ebsConfig := &emr.EbsConfiguration{} | ||||||
|
||||||
ebsBlockDeviceConfigs := make([]*emr.EbsBlockDeviceConfig, 0) | ||||||
for _, rawEbsConfig := range rawEbsConfigs.(*schema.Set).List() { | ||||||
for _, rawEbsConfig := range rawEbsConfigs.([]interface{}) { | ||||||
rawEbsConfig := rawEbsConfig.(map[string]interface{}) | ||||||
ebsBlockDeviceConfig := &emr.EbsBlockDeviceConfig{ | ||||||
VolumesPerInstance: aws.Int64(int64(rawEbsConfig["volumes_per_instance"].(int))), | ||||||
|
@@ -1469,3 +1515,20 @@ func resourceAwsEMRClusterStateRefreshFunc(d *schema.ResourceData, meta interfac | |||||
return resp.Cluster, state, nil | ||||||
} | ||||||
} | ||||||
|
||||||
func findMasterGroup(instanceGroups []*emr.InstanceGroup) *emr.InstanceGroup { | ||||||
for _, group := range instanceGroups { | ||||||
if *group.InstanceGroupType == emr.InstanceRoleTypeMaster { | ||||||
return group | ||||||
} | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
func resourceAwsEMRClusterHash(v interface{}) int { | ||||||
var buf bytes.Buffer | ||||||
m := v.(map[string]interface{}) | ||||||
buf.WriteString(fmt.Sprintf("%s-", m["instance_role"].(string))) | ||||||
buf.WriteString(fmt.Sprintf("%s", m["name"].(string))) | ||||||
return hashcode.String(buf.String()) | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this change intentional? If so, we should also set
MaxItems: 1
as well - otherwise we should undo this change as its not reliable to migrate a Terraform state and configuration from a multiple elementTypeSet
toTypeList
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was intentional because of another issue that cropped up. I've since fixed both of these issues and we're back to TypeSet on
ebs_config