From 55f9421f932e4c3b1186def640b82b4b989788f0 Mon Sep 17 00:00:00 2001 From: Jeremy Barlow Date: Thu, 16 Jan 2020 17:41:09 -0700 Subject: [PATCH] resource/aws_kinesis_firehose_delivery_stream: Allow processor clearing References: * https://github.com/terraform-providers/terraform-provider-aws/issues/11305 If a processor has previously been added to the extended S3 configuration for a Kinesis firehose delivery stream and an attempt is made to remove the processor for a subsequent apply, the stream resource currently attempts to pass in a processor with empty data. The UpdateDestination function in the AWS SDK rejects this processor as invalid per its schema and fails the apply. With the changes in this commit, the stream resource would remove empty processor elements before passing them up to the UpdateDestination call. This allows the empty processor elements to be removed successfully. Output from acceptance testing: ``` make testacc TEST=./aws TESTARGS='-run=TestAccAWSKinesisFirehoseDeliveryStream_' ... --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_missingProcessingConfiguration (72.65s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_OpenXJsonSerDe_Empty (84.32s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_ParquetSerDe_Empty (84.82s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_OrcSerDe_Empty (86.81s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_s3WithCloudwatchLogging (92.66s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_Serializer_Update (99.07s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_basic (100.02s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_ErrorOutputPrefix (100.22s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_Deserializer_Update (101.59s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3KmsKeyArn (104.93s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_HiveJsonSerDe_Empty (109.56s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_s3basicWithTags (112.94s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_ExternalUpdate (114.35s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_s3KinesisStreamSource (117.80s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_SplunkConfigUpdates (125.21s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3Updates (141.99s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_s3basic (77.82s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3basic (88.79s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_s3ConfigUpdates (175.05s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_s3basicWithSSE (199.39s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_DataFormatConversionConfiguration_Enabled (124.94s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_RedshiftConfigUpdates (566.48s) --- PASS: TestAccAWSKinesisFirehoseDeliveryStream_ElasticsearchConfigUpdates (706.62s) ``` --- ...ce_aws_kinesis_firehose_delivery_stream.go | 16 +++-- ...s_kinesis_firehose_delivery_stream_test.go | 58 ++++++++++++++++--- 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/aws/resource_aws_kinesis_firehose_delivery_stream.go b/aws/resource_aws_kinesis_firehose_delivery_stream.go index e1c72d21823b..74d24732cb0f 100644 --- a/aws/resource_aws_kinesis_firehose_delivery_stream.go +++ b/aws/resource_aws_kinesis_firehose_delivery_stream.go @@ -1749,17 +1749,25 @@ func extractProcessors(processingConfigurationProcessors []interface{}) []*fireh processors := []*firehose.Processor{} for _, processor := range processingConfigurationProcessors { - processors = append(processors, extractProcessor(processor.(map[string]interface{}))) + extractedProcessor := extractProcessor(processor.(map[string]interface{})) + if extractedProcessor != nil { + processors = append(processors, extractedProcessor) + } } return processors } func extractProcessor(processingConfigurationProcessor map[string]interface{}) *firehose.Processor { - return &firehose.Processor{ - Type: aws.String(processingConfigurationProcessor["type"].(string)), - Parameters: extractProcessorParameters(processingConfigurationProcessor["parameters"].([]interface{})), + var processor *firehose.Processor + processorType := processingConfigurationProcessor["type"].(string) + if processorType != "" { + processor = &firehose.Processor{ + Type: aws.String(processorType), + Parameters: extractProcessorParameters(processingConfigurationProcessor["parameters"].([]interface{})), + } } + return processor } func extractProcessorParameters(processorParameters []interface{}) []*firehose.ProcessorParameter { diff --git a/aws/resource_aws_kinesis_firehose_delivery_stream_test.go b/aws/resource_aws_kinesis_firehose_delivery_stream_test.go index 9b010c4b93a1..482c24edb1b0 100644 --- a/aws/resource_aws_kinesis_firehose_delivery_stream_test.go +++ b/aws/resource_aws_kinesis_firehose_delivery_stream_test.go @@ -757,11 +757,14 @@ func TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3Updates(t *testing.T) { preConfig := testAccFirehoseAWSLambdaConfigBasic(funcName, policyName, roleName) + fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_extendedS3basic, ri, ri, ri, ri) - postConfig := testAccFirehoseAWSLambdaConfigBasic(funcName, policyName, roleName) + - fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_extendedS3Updates, + firstUpdateConfig := testAccFirehoseAWSLambdaConfigBasic(funcName, policyName, roleName) + + fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_extendedS3Updates_Initial, + ri, ri, ri, ri) + removeProcessorsConfig := testAccFirehoseAWSLambdaConfigBasic(funcName, policyName, roleName) + + fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_extendedS3Updates_RemoveProcessors, ri, ri, ri, ri) - updatedExtendedS3DestinationConfig := &firehose.ExtendedS3DestinationDescription{ + firstUpdateExtendedS3DestinationConfig := &firehose.ExtendedS3DestinationDescription{ BufferingHints: &firehose.BufferingHints{ IntervalInSeconds: aws.Int64(400), SizeInMBs: aws.Int64(10), @@ -783,6 +786,18 @@ func TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3Updates(t *testing.T) { S3BackupMode: aws.String("Enabled"), } + removeProcessorsExtendedS3DestinationConfig := &firehose.ExtendedS3DestinationDescription{ + BufferingHints: &firehose.BufferingHints{ + IntervalInSeconds: aws.Int64(400), + SizeInMBs: aws.Int64(10), + }, + ProcessingConfiguration: &firehose.ProcessingConfiguration{ + Enabled: aws.Bool(false), + Processors: []*firehose.Processor{}, + }, + S3BackupMode: aws.String("Enabled"), + } + resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, Providers: testAccProviders, @@ -801,10 +816,17 @@ func TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3Updates(t *testing.T) { ImportStateVerify: true, }, { - Config: postConfig, + Config: firstUpdateConfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream), + testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, firstUpdateExtendedS3DestinationConfig, nil, nil, nil), + ), + }, + { + Config: removeProcessorsConfig, Check: resource.ComposeTestCheckFunc( testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream), - testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, updatedExtendedS3DestinationConfig, nil, nil, nil), + testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, removeProcessorsExtendedS3DestinationConfig, nil, nil, nil), ), }, }, @@ -1656,7 +1678,7 @@ resource "aws_kinesis_firehose_delivery_stream" "test" { role_arn = "${aws_iam_role.firehose.arn}" bucket_arn = "${aws_s3_bucket.bucket.arn}" processing_configuration { - enabled = false + enabled = true processors { type = "Lambda" parameters { @@ -2000,7 +2022,7 @@ resource "aws_kinesis_firehose_delivery_stream" "test" { } ` -var testAccKinesisFirehoseDeliveryStreamConfig_extendedS3Updates = testAccKinesisFirehoseDeliveryStreamBaseConfig + ` +var testAccKinesisFirehoseDeliveryStreamConfig_extendedS3Updates_Initial = testAccKinesisFirehoseDeliveryStreamBaseConfig + ` resource "aws_kinesis_firehose_delivery_stream" "test" { depends_on = ["aws_iam_role_policy.firehose"] name = "terraform-kinesis-firehose-basictest-%d" @@ -2009,7 +2031,7 @@ resource "aws_kinesis_firehose_delivery_stream" "test" { role_arn = "${aws_iam_role.firehose.arn}" bucket_arn = "${aws_s3_bucket.bucket.arn}" processing_configuration { - enabled = false + enabled = true processors { type = "Lambda" parameters { @@ -2030,6 +2052,26 @@ resource "aws_kinesis_firehose_delivery_stream" "test" { } ` +var testAccKinesisFirehoseDeliveryStreamConfig_extendedS3Updates_RemoveProcessors = testAccKinesisFirehoseDeliveryStreamBaseConfig + ` +resource "aws_kinesis_firehose_delivery_stream" "test" { + depends_on = ["aws_iam_role_policy.firehose"] + name = "terraform-kinesis-firehose-basictest-%d" + destination = "extended_s3" + extended_s3_configuration { + role_arn = "${aws_iam_role.firehose.arn}" + bucket_arn = "${aws_s3_bucket.bucket.arn}" + buffer_size = 10 + buffer_interval = 400 + compression_format = "GZIP" + s3_backup_mode = "Enabled" + s3_backup_configuration { + role_arn = "${aws_iam_role.firehose.arn}" + bucket_arn = "${aws_s3_bucket.bucket.arn}" + } + } +} +` + var testAccKinesisFirehoseDeliveryStreamBaseRedshiftConfig = testAccKinesisFirehoseDeliveryStreamBaseConfig + ` resource "aws_redshift_cluster" "test_cluster" { cluster_identifier = "tf-redshift-cluster-%d"